feat: OpenLineage integration (#15317)

* 🎉 Init OpenLineage connector

Co-authored-by: dechoma <dominik.choma@gmail.com>

* MLH - make linter happy

* review fixes

* 🐛 Fix path for ol event in tests

* 🐛 Fix path for ol event in tests

* Update ingestion/setup.py

Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>

* Update ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>

* Update ingestion/src/metadata/ingestion/source/pipeline/openlineage/models.py

Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>

* review fixes 2

* linter

* review

* review

* make linter happy

* fix test_yield_pipeline_lineage_details test

* make linter happy

* fix tests

* fix tests 2

---------

Co-authored-by: dechoma <dominik.choma@gmail.com>
Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>
This commit is contained in:
mgorsk1 2024-03-12 08:39:25 +01:00 committed by GitHub
parent 36327c2ee9
commit 98850ab5cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 2071 additions and 9 deletions

View File

@ -234,6 +234,7 @@ plugins: Dict[str, Set[str]] = {
"mysql": {VERSIONS["pymysql"]},
"nifi": {}, # uses requests
"okta": {"okta~=2.3"},
"openlineage": {*COMMONS["kafka"]},
"oracle": {"cx_Oracle>=8.3.0,<9", "oracledb~=1.2"},
"pgspider": {"psycopg2-binary", "sqlalchemy-pgspider"},
"pinotdb": {"pinotdb~=0.3"},

View File

@ -0,0 +1,88 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Source connection handler
"""
from typing import Optional
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import TopicPartition
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import (
OpenLineageConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import (
SecurityProtocol as KafkaSecProtocol,
)
from metadata.ingestion.connections.test_connections import (
SourceConnectionException,
test_connection_steps,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
def get_connection(connection: OpenLineageConnection) -> KafkaConsumer:
"""
Create connection
"""
try:
config = {
"bootstrap.servers": connection.brokersUrl,
"group.id": connection.consumerGroupName,
"auto.offset.reset": connection.consumerOffsets.value,
}
if connection.securityProtocol.value == KafkaSecProtocol.SSL.value:
config.update(
{
"security.protocol": connection.securityProtocol.value,
"ssl.ca.location": connection.SSLCALocation,
"ssl.certificate.location": connection.SSLCertificateLocation,
"ssl.key.location": connection.SSLKeyLocation,
}
)
kafka_consumer = KafkaConsumer(config)
kafka_consumer.subscribe([connection.topicName])
return kafka_consumer
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
def test_connection(
metadata: OpenMetadata,
client: KafkaConsumer,
service_connection: OpenLineageConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
def custom_executor():
_ = client.get_watermark_offsets(
TopicPartition(service_connection.topicName, 0)
)
test_fn = {"GetWatermarkOffsets": custom_executor}
test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
)

View File

@ -0,0 +1,520 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenLineage source to extract metadata from Kafka events
"""
import json
import traceback
from collections import defaultdict
from itertools import groupby, product
from typing import Any, Dict, Iterable, List, Optional, Tuple
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import (
OpenLineageConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
Source,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.openlineage.models import (
EventType,
LineageEdge,
LineageNode,
OpenLineageEvent,
TableDetails,
TableFQN,
)
from metadata.ingestion.source.pipeline.openlineage.utils import (
FQNNotFoundException,
message_to_open_lineage_event,
)
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class OpenlineageSource(PipelineServiceSource):
"""
Implements the necessary methods of PipelineServiceSource to facilitate registering OpenLineage pipelines with
metadata into Open Metadata.
Works under the assumption that OpenLineage integrations produce events to Kafka topic, which is a source of events
for this connector.
Only 'SUCCESS' OpenLineage events are taken into account in this connector.
Configuring OpenLineage integrations: https://openlineage.io/docs/integrations/about
"""
@classmethod
def create(cls, config_dict, metadata: OpenMetadata):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: OpenLineageConnection = config.serviceConnection.__root__.config
if not isinstance(connection, OpenLineageConnection):
raise InvalidSourceException(
f"Expected OpenLineageConnection, but got {connection}"
)
return cls(config, metadata)
def prepare(self):
"""Nothing to prepare"""
def close(self) -> None:
self.metadata.close()
@classmethod
def _get_table_details(cls, data: Dict) -> TableDetails:
"""
extracts table entity schema and name from input/output entry collected from Open Lineage.
:param data: single entry from inputs/outputs objects
:return: TableDetails object with schema and name
"""
symlinks = data.get("facets", {}).get("symlinks", {}).get("identifiers", [])
# for some OL events name can be extracted from dataset facet but symlinks is preferred so - if present - we
# use it instead
if len(symlinks) > 0:
try:
# @todo verify if table can have multiple identifiers pointing at it
name = symlinks[0]["name"]
except (KeyError, IndexError):
raise ValueError(
"input table name cannot be retrieved from symlinks.identifiers facet."
)
else:
try:
name = data["name"]
except KeyError:
raise ValueError(
"input table name cannot be retrieved from name attribute."
)
name_parts = name.split(".")
if len(name_parts) < 2:
raise ValueError(
f"input table name should be of 'schema.table' format! Received: {name}"
)
# we take last two elements to explicitly collect schema and table names
# in BigQuery Open Lineage events name_parts would be list of 3 elements as first one is GCP Project ID
# however, concept of GCP Project ID is not represented in Open Metadata and hence - we need to skip this part
return TableDetails(name=name_parts[-1], schema=name_parts[-2])
def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
try:
return self._get_table_fqn_from_om(table_details)
except FQNNotFoundException:
try:
schema_fqn = self._get_schema_fqn_from_om(table_details.schema)
return f"{schema_fqn}.{table_details.name}"
except FQNNotFoundException:
return None
def _get_table_fqn_from_om(self, table_details: TableDetails) -> Optional[str]:
"""
Based on partial schema and table names look for matching table object in open metadata.
:param schema: schema name
:param table: table name
:return: fully qualified name of a Table in Open Metadata
"""
result = None
services = self.source_config.dbServiceNames
for db_service in services:
result = fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=db_service,
database_name=None,
schema_name=table_details.schema,
table_name=table_details.name,
)
if not result:
raise FQNNotFoundException(
f"Table FQN not found for table: {table_details} within services: {services}"
)
return result
def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]:
"""
Based on partial schema name look for any matching DatabaseSchema object in open metadata.
:param schema: schema name
:return: fully qualified name of a DatabaseSchema in Open Metadata
"""
result = None
services = self.source_config.dbServiceNames
for db_service in services:
result = fqn.build(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=db_service,
database_name=None,
schema_name=schema,
skip_es_search=False,
)
if result:
return result
if not result:
raise FQNNotFoundException(
f"Schema FQN not found within services: {services}"
)
return result
@classmethod
def _render_pipeline_name(cls, pipeline_details: OpenLineageEvent) -> str:
"""
Renders pipeline name from parent facet of run facet. It is our expectation that every OL event contains parent
run facet so we can always create pipeline entities and link them to lineage events.
:param run_facet: Open Lineage run facet
:return: pipeline name (not fully qualified name)
"""
run_facet = pipeline_details.run_facet
namespace = run_facet["facets"]["parent"]["job"]["namespace"]
name = run_facet["facets"]["parent"]["job"]["name"]
return f"{namespace}-{name}"
@classmethod
def _filter_event_by_type(
cls, event: OpenLineageEvent, event_type: EventType
) -> Optional[Dict]:
"""
returns event if it's of particular event_type.
for example - for lineage events we will be only looking for EventType.COMPLETE event type.
:param event: Open Lineage raw event.
:param event_type: type of event we are looking for.
:return: Open Lineage event if matches event_type, otherwise None
"""
return event if event.event_type == event_type else {}
@classmethod
def _get_om_table_columns(cls, table_input: Dict) -> Optional[List]:
"""
:param table_input:
:return:
"""
try:
fields = table_input["facets"]["schema"]["fields"]
# @todo check if this way of passing type is ok
columns = [
Column(name=f.get("name"), dataType=f.get("type").upper())
for f in fields
]
return columns
except KeyError:
return None
def get_create_table_request(self, table: Dict) -> Optional[Either]:
"""
If certain table from Open Lineage events doesn't already exist in Open Metadata, register appropriate entity.
This makes sense especially for output facet of OpenLineage event - as database service ingestion is a scheduled
process we might fall into situation where we received Open Lineage event about creation of a table that is yet
to be ingested by database service ingestion process. To avoid missing on such lineage scenarios, we will create
table entity beforehand.
:param table: single object from inputs/outputs facet
:return: request to create the entity (if needed)
"""
om_table_fqn = None
try:
table_details = OpenlineageSource._get_table_details(table)
except ValueError as e:
return Either(
left=StackTraceError(
name="",
error=f"Failed to get partial table name: {e}",
stackTrace=traceback.format_exc(),
)
)
try:
om_table_fqn = self._get_table_fqn_from_om(table_details)
# if fqn found then it means table is already registered and we don't need to render create table request
return None
except FQNNotFoundException:
pass
# If OM Table FQN was not found based on OL Partial Name - we need to register it.
if not om_table_fqn:
try:
om_schema_fqn = self._get_schema_fqn_from_om(table_details.schema)
except FQNNotFoundException as e:
return Either(
left=StackTraceError(
name="",
error=f"Failed to get fully qualified schema name: {e}",
stackTrace=traceback.format_exc(),
)
)
# After finding schema fqn (based on partial schema name) we know where we can create table
# and we move forward with creating request.
if om_schema_fqn:
columns = OpenlineageSource._get_om_table_columns(table) or []
request = CreateTableRequest(
name=table_details.name,
columns=columns,
databaseSchema=om_schema_fqn,
)
return Either(right=request)
return None
@classmethod
def _get_ol_table_name(cls, table: Dict) -> str:
return "/".join(table.get(f) for f in ["namespace", "name"]).replace("//", "/")
def _build_ol_name_to_fqn_map(self, tables: List):
result = {}
for table in tables:
table_fqn = self._get_table_fqn(OpenlineageSource._get_table_details(table))
if table_fqn:
result[OpenlineageSource._get_ol_table_name(table)] = table_fqn
return result
@classmethod
def _create_output_lineage_dict(
cls, lineage_info: List[Tuple[str, str, str, str]]
) -> Dict[str, Dict[str, List[ColumnLineage]]]:
result = defaultdict(lambda: defaultdict(list))
for (output_table, input_table, output_column), group in groupby(
lineage_info, lambda x: x[:3]
):
input_columns = [input_col for _, _, _, input_col in group]
result[output_table][input_table] += [
ColumnLineage(toColumn=output_column, fromColumns=input_columns)
]
return result
def _get_column_lineage(
self, inputs: List, outputs: List
) -> Dict[str, Dict[str, List[ColumnLineage]]]:
_result: List = []
ol_name_to_fqn_map = self._build_ol_name_to_fqn_map(inputs + outputs)
for table in outputs:
output_table_fqn = self._get_table_fqn(
OpenlineageSource._get_table_details(table)
)
for field_name, field_spec in (
table.get("facets", {})
.get("columnLineage", {})
.get("fields", {})
.items()
):
for input_field in field_spec.get("inputFields", []):
input_table_ol_name = OpenlineageSource._get_ol_table_name(
input_field
)
_result.append( # output table, input table, output column, input column
(
output_table_fqn,
ol_name_to_fqn_map.get(input_table_ol_name),
f"{output_table_fqn}.{field_name}",
f'{ol_name_to_fqn_map.get(input_table_ol_name)}.{input_field.get("field")}',
)
)
return OpenlineageSource._create_output_lineage_dict(_result)
def yield_pipeline(
self, pipeline_details: OpenLineageEvent
) -> Iterable[Either[CreatePipelineRequest]]:
pipeline_name = self.get_pipeline_name(pipeline_details)
try:
description = f"""```json
{json.dumps(pipeline_details.run_facet, indent=4).strip()}```"""
request = CreatePipelineRequest(
name=pipeline_name,
service=self.context.pipeline_service,
description=description,
)
yield Either(right=request)
self.register_record(pipeline_request=request)
except ValueError:
yield Either(
left=StackTraceError(
name=pipeline_name,
message="Failed to collect metadata required for pipeline creation.",
),
stackTrace=traceback.format_exc(),
)
def yield_pipeline_lineage_details(
self, pipeline_details: OpenLineageEvent
) -> Iterable[Either[AddLineageRequest]]:
inputs, outputs = pipeline_details.inputs, pipeline_details.outputs
input_edges: List[LineageNode] = []
output_edges: List[LineageNode] = []
for spec in [(inputs, input_edges), (outputs, output_edges)]:
tables, tables_list = spec
for table in tables:
create_table_request = self.get_create_table_request(table)
if create_table_request:
yield create_table_request
table_fqn = self._get_table_fqn(
OpenlineageSource._get_table_details(table)
)
if table_fqn:
tables_list.append(
LineageNode(
fqn=TableFQN(value=table_fqn),
uuid=self.metadata.get_by_name(Table, table_fqn).id,
)
)
edges = [
LineageEdge(from_node=n[0], to_node=n[1])
for n in product(input_edges, output_edges)
]
column_lineage = self._get_column_lineage(inputs, outputs)
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.pipeline_service,
pipeline_name=self.context.pipeline,
)
pipeline_entity = self.metadata.get_by_name(entity=Pipeline, fqn=pipeline_fqn)
for edge in edges:
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=edge.from_node.uuid, type=edge.from_node.node_type
),
toEntity=EntityReference(
id=edge.to_node.uuid, type=edge.to_node.node_type
),
lineageDetails=LineageDetails(
pipeline=EntityReference(
id=pipeline_entity.id.__root__,
type="pipeline",
),
description=f"Lineage extracted from OpenLineage job: {pipeline_details.job['name']}",
source=Source.OpenLineage,
columnsLineage=column_lineage.get(
edge.to_node.fqn.value, {}
).get(edge.from_node.fqn.value, []),
),
),
)
)
def get_pipelines_list(self) -> Optional[List[Any]]:
"""Get List of all pipelines"""
try:
consumer = self.client
session_active = True
empty_msg_cnt = 0
pool_timeout = self.service_connection.poolTimeout
while session_active:
message = consumer.poll(timeout=pool_timeout)
if message is None:
logger.debug("no new messages")
empty_msg_cnt += 1
if (
empty_msg_cnt * pool_timeout
> self.service_connection.sessionTimeout
):
# There is no new messages, timeout is passed
session_active = False
else:
logger.debug(f"new message {message.value()}")
empty_msg_cnt = 0
try:
_result = message_to_open_lineage_event(
json.loads(message.value())
)
result = self._filter_event_by_type(_result, EventType.COMPLETE)
if result:
yield result
except Exception as e:
logger.debug(e)
except Exception as e:
traceback.print_exc()
raise InvalidSourceException(f"Failed to read from Kafka: {str(e)}")
finally:
# Close down consumer to commit final offsets.
# @todo address this
consumer.close()
def get_pipeline_name(self, pipeline_details: OpenLineageEvent) -> str:
return OpenlineageSource._render_pipeline_name(pipeline_details)
def yield_pipeline_status(
self, pipeline_details: OpenLineageEvent
) -> Iterable[Either[OMetaPipelineStatus]]:
pass
def mark_pipelines_as_deleted(self):
"""
OpenLineage pipelines are coming from streaming data and hence subsequent executions of ingestion processes
can cause deletion of a pipeline. Because of this we turn off pipeline deletion by overwriting this method
and leaving it blank. Setting 'Mark Deleted Pipelines' in ingestion process will have no effect!
"""

View File

@ -0,0 +1,88 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Openlineage Source Model module
"""
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List
@dataclass
class OpenLineageEvent:
"""
An object containing data extracted from raw OpenLineage event. Used as a basis for all abstract methods of
OpenlineageSource connector.
"""
run_facet: Dict
job: Dict
event_type: str
inputs: List[Any]
outputs: List[Any]
@dataclass
class TableFQN:
"""
Fully Qualified Name of a Table.
"""
value: str
@dataclass
class ColumnFQN:
"""
Fully Qualified Name of a Column.
"""
value: str
@dataclass
class LineageNode:
"""
A node being a part of Lineage information.
"""
uuid: str
fqn: TableFQN
node_type: str = "table"
@dataclass
class LineageEdge:
"""
An object describing connection of two nodes in the Lineage information.
"""
from_node: LineageNode
to_node: LineageNode
@dataclass
class TableDetails:
"""
Minimal table information.
"""
schema: str
name: str
class EventType(str, Enum):
"""
List of used OpenLineage event types.
"""
COMPLETE = "COMPLETE"

View File

@ -0,0 +1,59 @@
"""
Utils used by OpenlineageSource connector.
"""
from functools import reduce
from typing import Dict
from metadata.ingestion.source.pipeline.openlineage.models import OpenLineageEvent
def message_to_open_lineage_event(incoming_event: Dict) -> OpenLineageEvent:
"""
Method that takes raw Open Lineage event and parses is to shape into OpenLineageEvent.
We check whether received event (from Kafka) adheres to expected form and contains all the fields that are required
for successful processing by OpenMetadata OpenLineage connector.
:param incoming_event: raw event received from kafka topic by OpenlineageSource
:return: OpenLineageEvent
"""
fields_to_verify = [
"run.facets.parent.job.name",
"run.facets.parent.job.namespace",
"inputs",
"outputs",
"eventType",
"job.name",
"job.namespace",
]
for field in fields_to_verify:
try:
reduce(lambda x, y: x[y], field.split("."), incoming_event)
except KeyError:
raise ValueError("Event malformed!")
run_facet = incoming_event["run"]
inputs = incoming_event["inputs"]
outputs = incoming_event["outputs"]
event_type = incoming_event["eventType"]
job = incoming_event["job"]
result = OpenLineageEvent(
run_facet=run_facet,
event_type=event_type,
job=job,
inputs=inputs,
outputs=outputs,
)
return result
class FQNNotFoundException(Exception):
"""
Error raised when, while searching for an entity (Table, DatabaseSchema) there is no match in OM.
"""
pass

View File

@ -192,17 +192,34 @@ def _(
@fqn_build_registry.add(DatabaseSchema)
def _(
_: Optional[OpenMetadata], # ES Search not enabled for Schemas
metadata: Optional[OpenMetadata], # ES Search not enabled for Schemas
*,
service_name: str,
database_name: str,
database_name: Optional[str],
schema_name: str,
) -> str:
if not service_name or not database_name or not schema_name:
raise FQNBuildingException(
f"Args should be informed, but got service=`{service_name}`, db=`{database_name}`, schema=`{schema_name}`"
skip_es_search: bool = True,
fetch_multiple_entities: bool = False,
) -> Union[Optional[str], Optional[List[str]]]:
entity: Optional[Union[DatabaseSchema, List[DatabaseSchema]]] = None
if not skip_es_search:
entity = search_database_schema_from_es(
metadata=metadata,
database_name=database_name,
schema_name=schema_name,
fetch_multiple_entities=fetch_multiple_entities,
service_name=service_name,
)
return _build(service_name, database_name, schema_name)
if not entity and database_name:
fqn = _build(service_name, database_name, schema_name)
return [fqn] if fetch_multiple_entities else fqn
if entity and fetch_multiple_entities:
return [str(table.fullyQualifiedName.__root__) for table in entity]
if entity:
return str(entity.fullyQualifiedName.__root__)
return None
@fqn_build_registry.add(Database)
@ -574,6 +591,43 @@ def build_es_fqn_search_string(
return fqn_search_string
def search_database_schema_from_es(
metadata: OpenMetadata,
database_name: str,
schema_name: str,
service_name: str,
fetch_multiple_entities: bool = False,
fields: Optional[str] = None,
):
"""
Find database schema entity in elasticsearch index.
:param metadata: OM Client
:param database_name: name of database in which we are searching for database schema
:param schema_name: name of schema we are searching for
:param service_name: name of service in which we are searching for database schema
:param fetch_multiple_entities: should single match be returned or all matches
:param fields: additional fields to return
:return: entity / entities matching search criteria
"""
if not schema_name:
raise FQNBuildingException(
f"Schema Name should be informed, but got schema_name=`{schema_name}`"
)
fqn_search_string = _build(service_name or "*", database_name or "*", schema_name)
es_result = metadata.es_search_from_fqn(
entity_type=DatabaseSchema,
fqn_search_string=fqn_search_string,
fields=fields,
)
return get_entity_from_es_result(
entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities
)
def search_table_from_es(
metadata: OpenMetadata,
database_name: str,

View File

@ -0,0 +1,330 @@
{
"eventTime": "2023-12-16T14:22:11.949Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE",
"run": {
"runId": "59fc8906-4a4a-45ab-9a54-9cc2d399e10e",
"facets": {
"parent": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json#/$defs/ParentRunFacet",
"run": {
"runId": "daf8bcc1-cc3c-41bb-9251-334cacf698fa"
},
"job": {
"namespace": "TESTSchedulerID",
"name": "TESTParentJobName4"
}
},
"spark.logicalPlan": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"plan": [
{
"class": "org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand",
"num-children": 1,
"table": {
"product-class": "org.apache.spark.sql.catalyst.catalog.CatalogTable",
"identifier": {
"product-class": "org.apache.spark.sql.catalyst.TableIdentifier",
"table": "dst_table_test_from_src_table_df",
"database": "test_db"
},
"tableType": {
"product-class": "org.apache.spark.sql.catalyst.catalog.CatalogTableType",
"name": "MANAGED"
},
"storage": {
"product-class": "org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat",
"compressed": false,
"properties": null
},
"schema": {
"type": "struct",
"fields": []
},
"provider": "parquet",
"partitionColumnNames": [],
"owner": "",
"createTime": 1702736481797,
"lastAccessTime": -1,
"createVersion": "",
"properties": null,
"unsupportedFeatures": [],
"tracksPartitionsInCatalog": false,
"schemaPreservesCase": true,
"ignoredProperties": null
},
"mode": null,
"query": 0,
"outputColumnNames": "[id, randomid, zip]"
},
{
"class": "org.apache.spark.sql.execution.datasources.LogicalRelation",
"num-children": 0,
"relation": null,
"output": [
[
{
"class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children": 0,
"name": "id",
"dataType": "long",
"nullable": true,
"metadata": {},
"exprId": {
"product-class": "org.apache.spark.sql.catalyst.expressions.ExprId",
"id": 9,
"jvmId": "78343417-b80d-45a2-b489-e7d8920e61dd"
},
"qualifier": []
}
],
[
{
"class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children": 0,
"name": "randomid",
"dataType": "string",
"nullable": true,
"metadata": {},
"exprId": {
"product-class": "org.apache.spark.sql.catalyst.expressions.ExprId",
"id": 10,
"jvmId": "78343417-b80d-45a2-b489-e7d8920e61dd"
},
"qualifier": []
}
],
[
{
"class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children": 0,
"name": "zip",
"dataType": "string",
"nullable": true,
"metadata": {},
"exprId": {
"product-class": "org.apache.spark.sql.catalyst.expressions.ExprId",
"id": 11,
"jvmId": "78343417-b80d-45a2-b489-e7d8920e61dd"
},
"qualifier": []
}
]
],
"catalogTable": {
"product-class": "org.apache.spark.sql.catalyst.catalog.CatalogTable",
"identifier": {
"product-class": "org.apache.spark.sql.catalyst.TableIdentifier",
"table": "src_table_test",
"database": "test_db"
},
"tableType": {
"product-class": "org.apache.spark.sql.catalyst.catalog.CatalogTableType",
"name": "MANAGED"
},
"storage": {
"product-class": "org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat",
"locationUri": null,
"inputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"outputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"serde": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
"compressed": false,
"properties": null
},
"schema": {
"type": "struct",
"fields": [
{
"name": "id",
"type": "long",
"nullable": true,
"metadata": {}
},
{
"name": "randomid",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "zip",
"type": "string",
"nullable": true,
"metadata": {
"__CHAR_VARCHAR_TYPE_STRING": "varchar(10)"
}
}
]
},
"provider": "parquet",
"partitionColumnNames": [],
"owner": "test_user@ad.test.net",
"createTime": 1682523315000,
"lastAccessTime": 0,
"createVersion": "3.3.1",
"properties": null,
"stats": null,
"unsupportedFeatures": [],
"tracksPartitionsInCatalog": false,
"schemaPreservesCase": true,
"ignoredProperties": null
},
"isStreaming": false
}
]
},
"spark_version": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"spark-version": "3.3.1",
"openlineage-spark-version": "1.5.0"
},
"processing_engine": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-0/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet",
"version": "3.3.1",
"name": "spark",
"openlineageAdapterVersion": "1.5.0"
},
"environment-properties": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"environment-properties": {}
}
}
},
"job": {
"namespace": "TESTSchedulerID",
"name": "test_user_spark.execute_create_data_source_table_as_select_command.dst_table_test_from_src_table_df",
"facets": {}
},
"inputs": [
{
"namespace": "s3a://test_db-db",
"name": "src_table_test",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet",
"name": "s3a://test_db-db",
"uri": "s3a://test_db-db"
},
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "randomid",
"type": "string"
},
{
"name": "zip",
"type": "string"
}
]
},
"symlinks": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet",
"identifiers": [
{
"namespace": "hive://hive-metastore.hive.svc.cluster.local:9083",
"name": "shopify.raw_product_catalog",
"type": "TABLE"
}
]
}
},
"inputFacets": {}
}
],
"outputs": [
{
"namespace": "s3a://test_db-db",
"name": "dst_table_test_from_src_table_df",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet",
"name": "s3a://test_db-db",
"uri": "s3a://test_db-db"
},
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "randomid",
"type": "string"
},
{
"name": "zip",
"type": "string"
}
]
},
"columnLineage": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet",
"fields": {
"id": {
"inputFields": [
{
"namespace": "s3a://test_db-db",
"name": "/src_table_test",
"field": "comments"
}
]
},
"randomid": {
"inputFields": [
{
"namespace": "s3a://test_db-db",
"name": "/src_table_test",
"field": "products"
}
]
},
"zip": {
"inputFields": [
{
"namespace": "s3a://test_db-db",
"name": "/src_table_test",
"field": "platform"
}
]
}
}
},
"symlinks": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet",
"identifiers": [
{
"namespace": "hive://hive-metastore.hive.svc.cluster.local:9083",
"name": "shopify.fact_order_new5",
"type": "TABLE"
}
]
},
"lifecycleStateChange": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet",
"lifecycleStateChange": "CREATE"
}
},
"outputFacets": {}
}
]
}

View File

@ -0,0 +1,533 @@
import copy
import json
import unittest
from pathlib import Path
from unittest.mock import MagicMock, Mock, patch
from uuid import UUID
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import (
ConsumerOffsets,
SecurityProtocol,
)
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityLineage import ColumnLineage
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.pipeline.openlineage.metadata import OpenlineageSource
from metadata.ingestion.source.pipeline.openlineage.models import OpenLineageEvent
from metadata.ingestion.source.pipeline.openlineage.utils import (
message_to_open_lineage_event,
)
# Global constants
MOCK_OL_CONFIG = {
"source": {
"type": "openlineage",
"serviceName": "openlineage_source",
"serviceConnection": {
"config": {
"type": "OpenLineage",
"brokersUrl": "testbroker:9092",
"topicName": "test-topic",
"consumerGroupName": "test-consumergroup",
"consumerOffsets": ConsumerOffsets.earliest,
"securityProtocol": SecurityProtocol.PLAINTEXT,
"SSLCertificateLocation": "",
"SSLKeyLocation": "",
"SSLCALocation": "",
"poolTimeout": 0.3,
"sessionTimeout": 1,
}
},
"sourceConfig": {"config": {"type": "PipelineMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
MOCK_SPLINE_UI_URL = "http://localhost:9090"
PIPELINE_ID = "3f784e72-5bf7-5704-8828-ae8464fe915b:lhq160w0"
MOCK_PIPELINE_URL = f"{MOCK_SPLINE_UI_URL}/app/events/overview/{PIPELINE_ID}"
MOCK_PIPELINE_SERVICE = PipelineService(
id="85811038-099a-11ed-861d-0242ac120002",
name="openlineage_source",
fullyQualifiedName=FullyQualifiedEntityName(__root__="openlineage_source"),
connection=PipelineConnection(),
serviceType=PipelineServiceType.Airflow,
)
MOCK_PIPELINE = Pipeline(
id="2aaa012e-099a-11ed-861d-0242ac120002",
name=PIPELINE_ID,
fullyQualifiedName=f"openlineage_source.{PIPELINE_ID}",
displayName="MSSQL <> Postgres",
sourceUrl=MOCK_PIPELINE_URL,
tasks=[
Task(
name=PIPELINE_ID,
displayName="jdbc postgres ssl app",
sourceUrl=MOCK_PIPELINE_URL,
)
],
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
),
)
VALID_EVENT = {
"run": {
"facets": {
"parent": {"job": {"name": "test-job", "namespace": "test-namespace"}}
}
},
"inputs": [],
"outputs": [],
"eventType": "START",
"job": {"name": "test-job", "namespace": "test-namespace"},
}
MISSING_RUN_FACETS_PARENT_JOB_NAME_EVENT = copy.deepcopy(VALID_EVENT)
del MISSING_RUN_FACETS_PARENT_JOB_NAME_EVENT["run"]["facets"]["parent"]["job"]["name"]
MALFORMED_NESTED_STRUCTURE_EVENT = copy.deepcopy(VALID_EVENT)
MALFORMED_NESTED_STRUCTURE_EVENT["run"]["facets"]["parent"]["job"] = "Not a dict"
with open(
f"{Path(__file__).parent}/../../resources/datasets/openlineage_event.json"
) as ol_file:
FULL_OL_KAFKA_EVENT = json.load(ol_file)
EXPECTED_OL_EVENT = OpenLineageEvent(
run_facet=FULL_OL_KAFKA_EVENT["run"],
job=FULL_OL_KAFKA_EVENT["job"],
event_type=FULL_OL_KAFKA_EVENT["eventType"],
inputs=FULL_OL_KAFKA_EVENT["inputs"],
outputs=FULL_OL_KAFKA_EVENT["outputs"],
)
class OpenLineageUnitTest(unittest.TestCase):
@patch(
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"
)
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
config = OpenMetadataWorkflowConfig.parse_obj(MOCK_OL_CONFIG)
self.open_lineage_source = OpenlineageSource.create(
MOCK_OL_CONFIG["source"],
config.workflowConfig.openMetadataServerConfig,
)
self.open_lineage_source.context.__dict__[
"pipeline"
] = MOCK_PIPELINE.name.__root__
self.open_lineage_source.context.__dict__[
"pipeline_service"
] = MOCK_PIPELINE_SERVICE.name.__root__
self.open_lineage_source.source_config.dbServiceNames = ["skun"]
@patch(
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"
)
@patch("confluent_kafka.Consumer")
def setUp(self, mock_consumer, mock_test_connection):
mock_test_connection.return_value = False
self.mock_consumer = mock_consumer
def setup_mock_consumer_with_kafka_event(self, event):
mock_msg = MagicMock()
mock_msg.error.return_value = None
mock_msg.value.return_value = json.dumps(event).encode()
self.mock_consumer.poll.side_effect = [
mock_msg,
None,
None,
None,
None,
None,
None,
]
self.open_lineage_source.client = self.mock_consumer
def test_message_to_ol_event_valid_event(self):
"""Test conversion with a valid event."""
result = message_to_open_lineage_event(VALID_EVENT)
self.assertIsInstance(result, OpenLineageEvent)
def test_message_to_ol_event_missing_run_facets_parent_job_name(self):
"""Test conversion with missing 'run.facets.parent.job.name' field."""
with self.assertRaises(ValueError):
message_to_open_lineage_event(MISSING_RUN_FACETS_PARENT_JOB_NAME_EVENT)
def test_message_to_ol_event_malformed_nested_structure(self):
"""Test conversion with a malformed nested structure."""
with self.assertRaises(TypeError):
message_to_open_lineage_event(MALFORMED_NESTED_STRUCTURE_EVENT)
def test_poll_message_receives_message(self):
"""Test if poll_message receives a kafka message."""
self.setup_mock_consumer_with_kafka_event(VALID_EVENT)
result = self.open_lineage_source.client.poll(timeout=1)
self.assertIsNotNone(result)
self.assertEqual(json.loads(result.value().decode()), VALID_EVENT)
def read_openlineage_event_from_kafka(self, kafka_event):
self.setup_mock_consumer_with_kafka_event(kafka_event)
result_generator = self.open_lineage_source.get_pipelines_list()
results = []
try:
while True:
results.append(next(result_generator))
except StopIteration:
pass
return results[0]
def test_create_output_lineage_dict_empty_input(self):
"""Test with an empty input list."""
result = self.open_lineage_source._create_output_lineage_dict([])
self.assertEqual(result, {})
def test_create_output_lineage_dict_single_lineage_entry(self):
"""Test with a single lineage entry."""
lineage_info = [
("output_table", "input_table", "output_column", "input_column")
]
result = self.open_lineage_source._create_output_lineage_dict(lineage_info)
expected = {
"output_table": {
"input_table": [
ColumnLineage(
toColumn="output_column", fromColumns=["input_column"]
)
]
}
}
self.assertEqual(result, expected)
def test_create_output_lineage_dict_multiple_entries_different_outputs(self):
"""Test with multiple entries having different output tables."""
lineage_info = [
("output_table1", "input_table", "output_column1", "input_column"),
("output_table2", "input_table", "output_column2", "input_column"),
]
result = self.open_lineage_source._create_output_lineage_dict(lineage_info)
expected = {
"output_table1": {
"input_table": [
ColumnLineage(
toColumn="output_column1", fromColumns=["input_column"]
)
]
},
"output_table2": {
"input_table": [
ColumnLineage(
toColumn="output_column2", fromColumns=["input_column"]
)
]
},
}
self.assertEqual(result, expected)
def test_create_output_lineage_dict_multiple_entries_same_output(self):
"""Test with multiple entries sharing the same output table."""
lineage_info = [
("output_table", "input_table1", "output_column", "input_column1"),
("output_table", "input_table2", "output_column", "input_column2"),
]
result = self.open_lineage_source._create_output_lineage_dict(lineage_info)
expected = {
"output_table": {
"input_table1": [
ColumnLineage(
toColumn="output_column", fromColumns=["input_column1"]
)
],
"input_table2": [
ColumnLineage(
toColumn="output_column", fromColumns=["input_column2"]
)
],
}
}
self.assertEqual(result, expected)
def test_get_column_lineage_empty_inputs_outputs(self):
"""Test with empty input and output lists."""
inputs = []
outputs = []
result = self.open_lineage_source._get_column_lineage(inputs, outputs)
self.assertEqual(result, {})
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn"
)
def test_build_ol_name_to_fqn_map_with_valid_data(self, mock_get_table_fqn):
# Mock _get_table_fqn to return a constructed FQN based on the provided table details
mock_get_table_fqn.side_effect = (
lambda table_details: f"database.schema.{table_details.name}"
)
tables = [
{"name": "schema.table1", "facets": {}, "namespace": "ns://"},
{"name": "schema.table2", "facets": {}, "namespace": "ns://"},
]
expected_map = {
"ns://schema.table1": "database.schema.table1",
"ns://schema.table2": "database.schema.table2",
}
result = self.open_lineage_source._build_ol_name_to_fqn_map(tables)
self.assertEqual(result, expected_map)
self.assertEqual(mock_get_table_fqn.call_count, 2)
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn"
)
def test_build_ol_name_to_fqn_map_with_missing_fqn(self, mock_get_table_fqn):
# Mock _get_table_fqn to return None for missing FQN
mock_get_table_fqn.return_value = None
tables = [{"name": "schema.table1", "facets": {}, "namespace": "ns://"}]
expected_map = {} # Expect an empty map since FQN is missing
result = self.open_lineage_source._build_ol_name_to_fqn_map(tables)
self.assertEqual(result, expected_map)
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn"
)
def test_build_ol_name_to_fqn_map_with_empty_tables(self, mock_get_table_fqn):
# No need to set up the mock specifically since it won't be called with empty input
tables = [] # No tables provided
expected_map = {} # Expect an empty map
result = self.open_lineage_source._build_ol_name_to_fqn_map(tables)
self.assertEqual(result, expected_map)
mock_get_table_fqn.assert_not_called()
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn"
)
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._build_ol_name_to_fqn_map"
)
def test_get_column_lineage_valid_inputs_outputs(
self, mock_build_map, mock_get_table_fqn
):
"""Test with valid input and output lists."""
# Setup
mock_get_table_fqn.side_effect = (
lambda table_details: f"database.schema.{table_details.name}"
)
mock_build_map.return_value = {
"s3a:/project-db/src_test1": "database.schema.input_table_1",
"s3a:/project-db/src_test2": "database.schema.input_table_2",
}
inputs = [
{"name": "schema.input_table1", "facets": {}, "namespace": "hive://"},
{"name": "schema.input_table2", "facets": {}, "namespace": "hive://"},
]
outputs = [
{
"name": "schema.output_table",
"facets": {
"columnLineage": {
"fields": {
"output_column1": {
"inputFields": [
{
"field": "input_column1",
"namespace": "s3a://project-db",
"name": "/src_test1",
}
]
},
"output_column2": {
"inputFields": [
{
"field": "input_column2",
"namespace": "s3a://project-db",
"name": "/src_test2",
}
]
},
}
}
},
}
]
result = self.open_lineage_source._get_column_lineage(inputs, outputs)
expected = {
"database.schema.output_table": {
"database.schema.input_table_1": [
ColumnLineage(
toColumn="database.schema.output_table.output_column1",
fromColumns=["database.schema.input_table_1.input_column1"],
)
],
"database.schema.input_table_2": [
ColumnLineage(
toColumn="database.schema.output_table.output_column2",
fromColumns=["database.schema.input_table_2.input_column2"],
)
],
}
}
self.assertEqual(result, expected)
def test_get_column_lineage__invalid_inputs_outputs_structure(self):
"""Test with invalid input and output structure."""
inputs = [{"invalid": "data"}]
outputs = [{"invalid": "data"}]
with self.assertRaises(ValueError):
self.open_lineage_source._get_column_lineage(inputs, outputs)
def test_get_table_details_with_symlinks(self):
"""Test with valid data where symlinks are present."""
data = {
"facets": {"symlinks": {"identifiers": [{"name": "project.schema.table"}]}}
}
result = self.open_lineage_source._get_table_details(data)
self.assertEqual(result.name, "table")
self.assertEqual(result.schema, "schema")
def test_get_table_details_without_symlinks(self):
"""Test with valid data but without symlinks."""
data = {"name": "schema.table"}
result = self.open_lineage_source._get_table_details(data)
self.assertEqual(result.name, "table")
self.assertEqual(result.schema, "schema")
def test_get_table_details_invalid_data_missing_symlinks_and_name(self):
"""Test with invalid data missing both symlinks and name."""
data = {}
with self.assertRaises(ValueError):
self.open_lineage_source._get_table_details(data)
def test_get_table_details_invalid_symlinks_structure(self):
"""Test with invalid symlinks structure."""
data = {"facets": {"symlinks": {"identifiers": [{}]}}}
with self.assertRaises(ValueError):
self.open_lineage_source._get_table_details(data)
def test_get_table_details_invalid_name_structure(self):
"""Test with invalid name structure."""
data = {"name": "invalidname"}
with self.assertRaises(ValueError):
self.open_lineage_source._get_table_details(data)
def test_get_pipelines_list(self):
"""Test get_pipelines_list method"""
ol_event = self.read_openlineage_event_from_kafka(FULL_OL_KAFKA_EVENT)
self.assertIsInstance(ol_event, OpenLineageEvent)
self.assertEqual(ol_event, EXPECTED_OL_EVENT)
@patch(
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om"
)
def test_yield_pipeline_lineage_details(self, mock_get_entity):
def t_fqn_build_side_effect(
table_details,
):
return f"testService.shopify.{table_details.name}"
def mock_get_uuid_by_name(entity, fqn):
if fqn == "testService.shopify.raw_product_catalog":
# source of table lineage
return Mock(id="69fc8906-4a4a-45ab-9a54-9cc2d399e10e")
elif fqn == "testService.shopify.fact_order_new5":
# dst of table lineage
return Mock(id="59fc8906-4a4a-45ab-9a54-9cc2d399e10e")
else:
# pipeline
z = Mock()
z.id.__root__ = "79fc8906-4a4a-45ab-9a54-9cc2d399e10e"
return z
def extract_lineage_details(pip_results):
table_lineage = []
col_lineage = []
for r in pip_results:
table_lineage.append(
(
r.right.edge.fromEntity.id.__root__,
r.right.edge.toEntity.id.__root__,
)
)
for col in r.right.edge.lineageDetails.columnsLineage:
col_lineage.append(
(col.fromColumns[0].__root__, col.toColumn.__root__)
)
return table_lineage, col_lineage
# Set up the side effect for the mock entity FQN builder
mock_get_entity.side_effect = t_fqn_build_side_effect
ol_event = self.read_openlineage_event_from_kafka(FULL_OL_KAFKA_EVENT)
with patch.object(
OpenMetadataConnection,
"get_by_name",
create=True,
side_effect=mock_get_uuid_by_name,
):
pip_results = self.open_lineage_source.yield_pipeline_lineage_details(
ol_event
)
table_lineage, col_lineage = extract_lineage_details(pip_results)
expected_table_lineage = [
(
UUID("69fc8906-4a4a-45ab-9a54-9cc2d399e10e"),
UUID("59fc8906-4a4a-45ab-9a54-9cc2d399e10e"),
)
]
expected_col_lineage = [
(
"testService.shopify.raw_product_catalog.comments",
"testService.shopify.fact_order_new5.id",
),
(
"testService.shopify.raw_product_catalog.products",
"testService.shopify.fact_order_new5.randomid",
),
(
"testService.shopify.raw_product_catalog.platform",
"testService.shopify.fact_order_new5.zip",
),
]
self.assertEqual(col_lineage, expected_col_lineage)
self.assertEqual(table_lineage, expected_table_lineage)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,151 @@
---
title: OpenLineage
slug: /connectors/pipeline/openlineage
---
# OpenLineage
In this section, we provide guides and references to use the OpenLineage connector.
## What is OpenLineage?
According to [documentation](https://openlineage.io/docs/):
```
OpenLineage is an open framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata.
```
OpenLineage, apart from being a specification, is also a set of integrations collecting lineage from various systems such as Apache Airflow and Spark.
## OpenMetadata Openlineage connector
OpenMetadata OpenLineage connector consumes open lineage events from kafka broker and translates it to OpenMetadata Lineage information.
{% image
src="/images/v1.3/connectors/pipeline/openlineage/connector-flow.svg"
alt="OpenLineage Connector" /%}
### Airflow OpenLineage events
Configure your Airflow instance
1. Install appropriate provider in Airflow: [apache-airflow-providers-openlineage](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/index.html)
2. Configure OpenLineage Provider in Airflow - [documentation](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html#using-openlineage-integration)
1. remember to use `kafka` transport mode as this connector works under assumption OL events are collected from kafka topic
2. detailed list of configuration options for OpenLineage can be found [here](https://openlineage.io/docs/client/python/#configuration)
### Spark OpenLineage events
Configure Spark Session to produce OpenLineage events compatible with OpenLineage connector available in OpenMetadata.
# @todo complete kafka config
```shell
from pyspark.sql import SparkSession
from uuid import uuid4
spark = SparkSession.builder\
.config('spark.openlineage.namespace', 'mynamespace')\
.config('spark.openlineage.parentJobName', 'hello-world')\
.config('spark.openlineage.parentRunId', str(uuid4()))\
.config('spark.jars.packages', 'io.openlineage:openlineage-spark:1.7.0')\
.config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')\
.config('spark.openlineage.transport.type', 'kafka')\
.getOrCreate()
```
## Requirements
{% note %}
We support OpenLineage events created by OpenLineage versions starting from OpenLineage 1.7.0
{% /note %}
## Metadata Ingestion
#### Connection Details
##### Providing connection details via UI
{% partial
file="/v1.3/connectors/metadata-ingestion-ui.md"
variables={
connector: "Openlineage",
selectServicePath: "/images/v1.3/connectors/openlineage/select-service.png",
addNewServicePath: "/images/v1.3/connectors/openlineage/add-new-service.png",
serviceConnectionPath: "/images/v1.3/connectors/openlineage/service-connection.png",
}
/%}
{% stepsContainer %}
{% extraContent parentTagNme="stepsContainer" %}
##### Providing connection details programmatically via API
###### 1. Preparing the Client
```python
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
server_config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="<token>"
),
)
metadata = OpenMetadata(server_config)
assert metadata.health_check() # Will fail if we cannot reach the server
```
###### 2. Creating the OpenLineage Pipeline service
```python
from metadata.generated.schema.api.services.createPipelineService import CreatePipelineServiceRequest
from metadata.generated.schema.entity.services.pipelineService import (
PipelineServiceType,
PipelineConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import (
OpenLineageConnection,
SecurityProtocol as KafkaSecurityProtocol,
ConsumerOffsets
)
openlineage_service_request = CreatePipelineServiceRequest(
name='openlineage-service',
displayName='OpenLineage Service',
serviceType=PipelineServiceType.OpenLineage,
connection=PipelineConnection(
config=OpenLineageConnection(
brokersUrl='broker1:9092,broker2:9092',
topicName='openlineage-events',
consumerGroupName='openmetadata-consumer',
consumerOffsets=ConsumerOffsets.earliest,
poolTimeout=3.0,
sessionTimeout=60,
securityProtocol=KafkaSecurityProtocol.SSL,
# below ssl confing in optional and used only when securityProtocol=KafkaSecurityProtocol.SSL
SSLCertificateLocation='/path/to/kafka/certs/Certificate.pem',
SSLKeyLocation='/path/to/kafka/certs/Key.pem',
SSLCALocation='/path/to/kafka/certs/RootCA.pem',
)
),
)
metadata.create_or_update(openlineage_service_request)
```
{% /extraContent %}
{% partial file="/v1.3/connectors/test-connection.md" /%}
{% partial file="/v1.3/connectors/pipeline/configure-ingestion.md" /%}
{% partial file="/v1.3/connectors/ingestion-schedule-and-deploy.md" /%}
{% /stepsContainer %}
{% partial file="/v1.3/connectors/troubleshooting.md" /%}

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

View File

@ -0,0 +1,14 @@
{
"name": "OpenLineage",
"displayName": "Airflow Test Connection",
"description": "This Test Connection validates the access against the message broker.",
"steps": [
{
"name": "GetWatermarkOffsets",
"description": "Check if the message broker is reachable to fetch the topic details.",
"errorMessage": "Failed to connect to message broker",
"shortCircuit": true,
"mandatory": true
}
]
}

View File

@ -0,0 +1,99 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/openLineageConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "OpenLineageConnection",
"description": "OpenLineage Connection Config",
"type": "object",
"javaType": "org.openmetadata.schema.services.connections.pipeline.OpenLineageConnection",
"definitions": {
"OpenLineageType": {
"description": "Service type.",
"type": "string",
"enum": ["OpenLineage"],
"default": "OpenLineage"
}
},
"properties": {
"type": {
"description": "Service Type",
"$ref": "#/definitions/OpenLineageType",
"default": "OpenLineage"
},
"brokersUrl": {
"title": "Kafka brokers list",
"description": "service type of the messaging source",
"type": "string"
},
"topicName": {
"title": "Topic Name",
"description": "topic from where Open lineage events will be pulled ",
"type": "string"
},
"consumerGroupName": {
"title": "Consumer Group",
"description": "consumer group name ",
"type": "string"
},
"consumerOffsets": {
"title": "Initial consumer offsets",
"description": "initial Kafka consumer offset",
"default": "earliest",
"type": "string",
"enum": ["earliest", "latest"],
"javaEnums": [
{
"name": "earliest"
},
{
"name": "latest"
}
]
},
"poolTimeout": {
"title": "Single pool call timeout",
"description": "max allowed wait time",
"type": "number",
"default": 1.0
},
"sessionTimeout": {
"title": "Broker inactive session timeout",
"description": "max allowed inactivity time",
"type": "integer",
"default": 30
},
"securityProtocol": {
"title": "Kafka security protocol",
"description": "Kafka security protocol config",
"default": "PLAINTEXT",
"type": "string",
"enum": ["PLAINTEXT", "SSL"],
"javaEnums": [
{
"name": "PLAINTEXT"
},
{
"name": "SSL"
}
]
},
"SSLCertificateLocation": {
"title": "SSL Certificate location",
"description": "Kafka SSL certificate location",
"type": "string"
},
"SSLKeyLocation": {
"title": "SSL Key location",
"description": "Kafka SSL key location",
"type": "string"
},
"SSLCALocation": {
"title": "SSL CA location",
"description": "Kafka SSL ca location",
"type": "string"
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false
}

View File

@ -27,7 +27,8 @@
"CustomPipeline",
"DatabricksPipeline",
"Spline",
"Spark"
"Spark",
"OpenLineage"
],
"javaEnums": [
{
@ -62,6 +63,9 @@
},
{
"name": "Spark"
},
{
"name": "OpenLineage"
}
]
},
@ -108,6 +112,9 @@
},
{
"$ref": "./connections/pipeline/sparkConnection.json"
},
{
"$ref": "./connections/pipeline/openLineageConnection.json"
}
]
}

View File

@ -52,7 +52,7 @@
"source": {
"description": "Lineage type describes how a lineage was created.",
"type": "string",
"enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage"],
"enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage"],
"default": "Manual"
}
}

View File

@ -0,0 +1,87 @@
# OpenLineage
In this section, we provide guides and references to use the OpenLineage connector. You can view the full documentation [here](https://docs.open-metadata.org/connectors/pipeline/openlineage).
## Requirements
We ingest OpenLineage metadata by reading OpenLineage events from kafka topic
## Connection Details
$$section
### Kafka brokers list $(id="brokersUrl")
OpenMetadata for reaching OpenLineage events connects to kafka brokers.
This should be specified as a broker:port list separated by commas in the format `broker:port`. E.g., `kafkabroker1:9092,kafkabroker2:9092`.
$$
$$section
### Kafka Topic name $(id="topicName")
OpenMetadata is reading OpenLineage events from certain kafka topic
This should be specified as topic name string . E.g., `openlineage-events`.
$$
$$section
### Kafka consumer group name $(id="consumerGroupName")
Name of consumer kafka consumer group that will be used by OpenLineage kafka consumer
This should be specified as consumer group name string . E.g., `openmetadata-openlineage-consumer`.
$$
$$section
### Kafka initial consumer offsets $(id="consumerOffsets")
When new kafka consumer group is created an initial offset information is required.
This should be specified as `earliest` or `latest` .
$$
$$section
### Kafka single pool timeout $(id="poolTimeout")
This setting indicates how long connector should wait for new messages in single pool cal.
This should be specified as number of seconds represented as decimal number . E.g., `1.0`
$$
$$section
### Kafka session timeout $(id="sessionTimeout")
This setting indicates how long connector should wait for new messages in kafka session.
After kafka session timeout is reached connector assumes that there is no new messages to be processed
and successfully ends an ingestion process .
This should be specified as number of seconds represented as integer number . E.g., `30` .
$$
$$section
### Kafka securityProtocol $(id="securityProtocol")
Kafka Security protocol config.
This should be specified as `PLAINTEXT` or `SSL` .
$$
$$section
### Kafka SSL certificate location $(id="SSLCertificateLocation")
When Kafka security protocol is set to `SSL` then path to SSL certificate is needed.
Certificate have to be in PEM format
This should be specified path to pem file . E.g., `/path/to/kafka/certificate.pem` .
$$
$$section
### Kafka SSL key location $(id="SSLKeyLocation")
When Kafka security protocol is set to `SSL` then path to SSL key is needed.
Key have to be in PEM format
This should be specified path to pem file . E.g., `/path/to/kafka/key.pem` .
$$
$$section
### Kafka SSL CA location $(id="SSLCALocation")
When Kafka security protocol is set to `SSL` then path to SSL CA is needed.
CA have to be in PEM format
This should be specified path to pem file . E.g., `/path/to/kafka/CA.pem` .
$$

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg width="100%" height="100%" viewBox="0 0 120 120" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" xml:space="preserve" xmlns:serif="http://www.serif.com/" style="fill-rule:evenodd;clip-rule:evenodd;stroke-linejoin:round;stroke-miterlimit:2;">
<g transform="matrix(1,0,0,1,-409.803,0)">
<g id="openlineage-icon-color" transform="matrix(0.995272,0,0,1.38053,409.803,0)">
<rect x="0" y="0" width="120.57" height="86.923" style="fill:none;"/>
<g transform="matrix(1.00475,0,0,0.724359,-0.469881,11.2728)">
<g transform="matrix(1,0,0,1,39.5361,51.8347)">
<path d="M0,-16.664C-1.133,-18.991 -2.901,-20.95 -5.1,-22.314C-7.347,-23.674 -9.934,-24.368 -12.56,-24.314C-17.892,-24.446 -22.808,-21.446 -25.13,-16.644C-26.394,-14.08 -27.035,-11.253 -27,-8.394C-27.049,-5.535 -26.422,-2.705 -25.17,-0.134C-22.855,4.67 -17.941,7.674 -12.61,7.546C-9.985,7.592 -7.4,6.899 -5.15,5.546C-2.931,4.177 -1.146,2.208 0,-0.134C1.261,-2.702 1.892,-5.534 1.84,-8.394C1.893,-11.258 1.263,-14.093 0,-16.664M2.9,1.436C-0.053,7.261 -6.081,10.879 -12.61,10.746C-19.138,10.887 -25.17,7.276 -28.13,1.456C-29.681,-1.586 -30.463,-4.96 -30.41,-8.374C-30.468,-11.804 -29.686,-15.196 -28.13,-18.254C-26.692,-21.064 -24.5,-23.419 -21.8,-25.054C-19.023,-26.706 -15.841,-27.551 -12.61,-27.494C-9.379,-27.55 -6.197,-26.705 -3.42,-25.054C-0.723,-23.418 1.465,-21.063 2.9,-18.254C4.461,-15.194 5.247,-11.798 5.19,-8.364C5.236,-4.96 4.449,-1.596 2.9,1.436" style="fill:rgb(116,164,188);fill-rule:nonzero;"/>
</g>
<g transform="matrix(1,0,0,1,83.4978,24.8649)">
<path d="M0,37.192C-0.034,37.193 -0.068,37.193 -0.102,37.192L-19.102,37.192C-19.952,37.22 -20.664,36.554 -20.692,35.704C-20.693,35.67 -20.693,35.636 -20.692,35.602L-20.692,1.602C-20.703,1.171 -20.528,0.755 -20.212,0.462C-19.875,0.145 -19.424,-0.021 -18.962,0.002C-18.528,-0.008 -18.107,0.154 -17.792,0.452C-17.475,0.745 -17.3,1.161 -17.312,1.592L-17.312,34.062L-0.132,34.062C0.299,34.04 0.72,34.191 1.038,34.482C1.341,34.774 1.504,35.182 1.488,35.602C1.516,36.452 0.85,37.164 0,37.192M2.949,-15.389L-29.341,-15.389L-29.341,52.581L4.929,52.581L27.949,18.591L2.949,-15.389Z" style="fill:rgb(116,164,188);fill-rule:nonzero;"/>
</g>
</g>
</g>
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.5 KiB

View File

@ -83,4 +83,5 @@ export const LINEAGE_SOURCE: { [key in Source]: string } = {
[Source.QueryLineage]: 'Query Lineage',
[Source.SparkLineage]: 'Spark Lineage',
[Source.ViewLineage]: 'View Lineage',
[Source.OpenLineage]: 'OpenLineage',
};

View File

@ -54,6 +54,7 @@ import msAzure from '../assets/img/service-icon-ms-azure.png';
import mssql from '../assets/img/service-icon-mssql.png';
import mstr from '../assets/img/service-icon-mstr.png';
import nifi from '../assets/img/service-icon-nifi.png';
import openlineage from '../assets/img/service-icon-openlineage.svg';
import oracle from '../assets/img/service-icon-oracle.png';
import pinot from '../assets/img/service-icon-pinot.png';
import postgres from '../assets/img/service-icon-post.png';
@ -168,6 +169,7 @@ export const FIVETRAN = fivetran;
export const AMUNDSEN = amundsen;
export const ATLAS = atlas;
export const SAS = sas;
export const OPENLINEAGE = openlineage;
export const LOGO = logo;
export const AIRFLOW = airflow;
@ -386,6 +388,7 @@ export const BETA_SERVICES = [
DatabaseServiceType.Doris,
PipelineServiceType.Spline,
PipelineServiceType.Spark,
PipelineServiceType.OpenLineage,
DashboardServiceType.QlikSense,
DatabaseServiceType.Couchbase,
DatabaseServiceType.Greenplum,

View File

@ -23,6 +23,7 @@ import domoPipelineConnection from '../jsons/connectionSchemas/connections/pipel
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json';
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
import openLineageConnection from '../jsons/connectionSchemas/connections/pipeline/openLineageConnection.json';
import splineConnection from '../jsons/connectionSchemas/connections/pipeline/splineConnection.json';
export const getPipelineConfig = (type: PipelineServiceType) => {
@ -80,6 +81,11 @@ export const getPipelineConfig = (type: PipelineServiceType) => {
break;
}
case PipelineServiceType.OpenLineage: {
schema = openLineageConnection;
break;
}
default:
break;

View File

@ -59,6 +59,7 @@ import {
MSSQL,
MYSQL,
NIFI,
OPENLINEAGE,
OPEN_SEARCH,
ORACLE,
PINOT,
@ -335,6 +336,9 @@ class ServiceUtilClassBase {
case PipelineServiceType.DatabricksPipeline:
return DATABRICK;
case PipelineServiceType.OpenLineage:
return OPENLINEAGE;
case MlModelServiceType.Mlflow:
return MLFLOW;