diff --git a/ingestion/setup.py b/ingestion/setup.py index 887ee1a1508..91ea70c8b7b 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -234,6 +234,7 @@ plugins: Dict[str, Set[str]] = { "mysql": {VERSIONS["pymysql"]}, "nifi": {}, # uses requests "okta": {"okta~=2.3"}, + "openlineage": {*COMMONS["kafka"]}, "oracle": {"cx_Oracle>=8.3.0,<9", "oracledb~=1.2"}, "pgspider": {"psycopg2-binary", "sqlalchemy-pgspider"}, "pinotdb": {"pinotdb~=0.3"}, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/__init__.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/connection.py new file mode 100644 index 00000000000..fb95b913be8 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/connection.py @@ -0,0 +1,88 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Source connection handler +""" +from typing import Optional + +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import TopicPartition + +from metadata.generated.schema.entity.automations.workflow import ( + Workflow as AutomationWorkflow, +) +from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import ( + OpenLineageConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import ( + SecurityProtocol as KafkaSecProtocol, +) +from metadata.ingestion.connections.test_connections import ( + SourceConnectionException, + test_connection_steps, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata + + +def get_connection(connection: OpenLineageConnection) -> KafkaConsumer: + """ + Create connection + """ + try: + config = { + "bootstrap.servers": connection.brokersUrl, + "group.id": connection.consumerGroupName, + "auto.offset.reset": connection.consumerOffsets.value, + } + if connection.securityProtocol.value == KafkaSecProtocol.SSL.value: + config.update( + { + "security.protocol": connection.securityProtocol.value, + "ssl.ca.location": connection.SSLCALocation, + "ssl.certificate.location": connection.SSLCertificateLocation, + "ssl.key.location": connection.SSLKeyLocation, + } + ) + + kafka_consumer = KafkaConsumer(config) + kafka_consumer.subscribe([connection.topicName]) + + return kafka_consumer + except Exception as exc: + msg = f"Unknown error connecting with {connection}: {exc}." + raise SourceConnectionException(msg) + + +def test_connection( + metadata: OpenMetadata, + client: KafkaConsumer, + service_connection: OpenLineageConnection, + automation_workflow: Optional[AutomationWorkflow] = None, +) -> None: + """ + Test connection. This can be executed either as part + of a metadata workflow or during an Automation Workflow + """ + + def custom_executor(): + _ = client.get_watermark_offsets( + TopicPartition(service_connection.topicName, 0) + ) + + test_fn = {"GetWatermarkOffsets": custom_executor} + + test_connection_steps( + metadata=metadata, + test_fn=test_fn, + service_type=service_connection.type.value, + automation_workflow=automation_workflow, + ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py new file mode 100644 index 00000000000..c2acf8be8ac --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -0,0 +1,520 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenLineage source to extract metadata from Kafka events +""" +import json +import traceback +from collections import defaultdict +from itertools import groupby, product +from typing import Any, Dict, Iterable, List, Optional, Tuple + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.table import Column, Table +from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import ( + OpenLineageConnection, +) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, + Source, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.models import Either +from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.openlineage.models import ( + EventType, + LineageEdge, + LineageNode, + OpenLineageEvent, + TableDetails, + TableFQN, +) +from metadata.ingestion.source.pipeline.openlineage.utils import ( + FQNNotFoundException, + message_to_open_lineage_event, +) +from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource +from metadata.utils import fqn +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class OpenlineageSource(PipelineServiceSource): + """ + Implements the necessary methods of PipelineServiceSource to facilitate registering OpenLineage pipelines with + metadata into Open Metadata. + + Works under the assumption that OpenLineage integrations produce events to Kafka topic, which is a source of events + for this connector. + + Only 'SUCCESS' OpenLineage events are taken into account in this connector. + + Configuring OpenLineage integrations: https://openlineage.io/docs/integrations/about + """ + + @classmethod + def create(cls, config_dict, metadata: OpenMetadata): + """Create class instance""" + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: OpenLineageConnection = config.serviceConnection.__root__.config + if not isinstance(connection, OpenLineageConnection): + raise InvalidSourceException( + f"Expected OpenLineageConnection, but got {connection}" + ) + return cls(config, metadata) + + def prepare(self): + """Nothing to prepare""" + + def close(self) -> None: + self.metadata.close() + + @classmethod + def _get_table_details(cls, data: Dict) -> TableDetails: + """ + extracts table entity schema and name from input/output entry collected from Open Lineage. + + :param data: single entry from inputs/outputs objects + :return: TableDetails object with schema and name + """ + symlinks = data.get("facets", {}).get("symlinks", {}).get("identifiers", []) + + # for some OL events name can be extracted from dataset facet but symlinks is preferred so - if present - we + # use it instead + if len(symlinks) > 0: + try: + # @todo verify if table can have multiple identifiers pointing at it + name = symlinks[0]["name"] + except (KeyError, IndexError): + raise ValueError( + "input table name cannot be retrieved from symlinks.identifiers facet." + ) + else: + try: + name = data["name"] + except KeyError: + raise ValueError( + "input table name cannot be retrieved from name attribute." + ) + + name_parts = name.split(".") + + if len(name_parts) < 2: + raise ValueError( + f"input table name should be of 'schema.table' format! Received: {name}" + ) + + # we take last two elements to explicitly collect schema and table names + # in BigQuery Open Lineage events name_parts would be list of 3 elements as first one is GCP Project ID + # however, concept of GCP Project ID is not represented in Open Metadata and hence - we need to skip this part + return TableDetails(name=name_parts[-1], schema=name_parts[-2]) + + def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]: + try: + return self._get_table_fqn_from_om(table_details) + except FQNNotFoundException: + try: + schema_fqn = self._get_schema_fqn_from_om(table_details.schema) + + return f"{schema_fqn}.{table_details.name}" + except FQNNotFoundException: + return None + + def _get_table_fqn_from_om(self, table_details: TableDetails) -> Optional[str]: + """ + Based on partial schema and table names look for matching table object in open metadata. + :param schema: schema name + :param table: table name + :return: fully qualified name of a Table in Open Metadata + """ + result = None + services = self.source_config.dbServiceNames + for db_service in services: + result = fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name=db_service, + database_name=None, + schema_name=table_details.schema, + table_name=table_details.name, + ) + if not result: + raise FQNNotFoundException( + f"Table FQN not found for table: {table_details} within services: {services}" + ) + return result + + def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]: + """ + Based on partial schema name look for any matching DatabaseSchema object in open metadata. + + :param schema: schema name + :return: fully qualified name of a DatabaseSchema in Open Metadata + """ + result = None + services = self.source_config.dbServiceNames + + for db_service in services: + result = fqn.build( + metadata=self.metadata, + entity_type=DatabaseSchema, + service_name=db_service, + database_name=None, + schema_name=schema, + skip_es_search=False, + ) + + if result: + return result + + if not result: + raise FQNNotFoundException( + f"Schema FQN not found within services: {services}" + ) + + return result + + @classmethod + def _render_pipeline_name(cls, pipeline_details: OpenLineageEvent) -> str: + """ + Renders pipeline name from parent facet of run facet. It is our expectation that every OL event contains parent + run facet so we can always create pipeline entities and link them to lineage events. + + :param run_facet: Open Lineage run facet + :return: pipeline name (not fully qualified name) + """ + run_facet = pipeline_details.run_facet + + namespace = run_facet["facets"]["parent"]["job"]["namespace"] + name = run_facet["facets"]["parent"]["job"]["name"] + + return f"{namespace}-{name}" + + @classmethod + def _filter_event_by_type( + cls, event: OpenLineageEvent, event_type: EventType + ) -> Optional[Dict]: + """ + returns event if it's of particular event_type. + for example - for lineage events we will be only looking for EventType.COMPLETE event type. + + :param event: Open Lineage raw event. + :param event_type: type of event we are looking for. + :return: Open Lineage event if matches event_type, otherwise None + """ + return event if event.event_type == event_type else {} + + @classmethod + def _get_om_table_columns(cls, table_input: Dict) -> Optional[List]: + """ + + :param table_input: + :return: + """ + try: + fields = table_input["facets"]["schema"]["fields"] + + # @todo check if this way of passing type is ok + columns = [ + Column(name=f.get("name"), dataType=f.get("type").upper()) + for f in fields + ] + return columns + except KeyError: + return None + + def get_create_table_request(self, table: Dict) -> Optional[Either]: + """ + If certain table from Open Lineage events doesn't already exist in Open Metadata, register appropriate entity. + This makes sense especially for output facet of OpenLineage event - as database service ingestion is a scheduled + process we might fall into situation where we received Open Lineage event about creation of a table that is yet + to be ingested by database service ingestion process. To avoid missing on such lineage scenarios, we will create + table entity beforehand. + + :param table: single object from inputs/outputs facet + :return: request to create the entity (if needed) + """ + om_table_fqn = None + + try: + table_details = OpenlineageSource._get_table_details(table) + except ValueError as e: + return Either( + left=StackTraceError( + name="", + error=f"Failed to get partial table name: {e}", + stackTrace=traceback.format_exc(), + ) + ) + try: + om_table_fqn = self._get_table_fqn_from_om(table_details) + + # if fqn found then it means table is already registered and we don't need to render create table request + return None + except FQNNotFoundException: + pass + + # If OM Table FQN was not found based on OL Partial Name - we need to register it. + if not om_table_fqn: + try: + om_schema_fqn = self._get_schema_fqn_from_om(table_details.schema) + except FQNNotFoundException as e: + return Either( + left=StackTraceError( + name="", + error=f"Failed to get fully qualified schema name: {e}", + stackTrace=traceback.format_exc(), + ) + ) + + # After finding schema fqn (based on partial schema name) we know where we can create table + # and we move forward with creating request. + if om_schema_fqn: + columns = OpenlineageSource._get_om_table_columns(table) or [] + + request = CreateTableRequest( + name=table_details.name, + columns=columns, + databaseSchema=om_schema_fqn, + ) + + return Either(right=request) + + return None + + @classmethod + def _get_ol_table_name(cls, table: Dict) -> str: + return "/".join(table.get(f) for f in ["namespace", "name"]).replace("//", "/") + + def _build_ol_name_to_fqn_map(self, tables: List): + result = {} + + for table in tables: + table_fqn = self._get_table_fqn(OpenlineageSource._get_table_details(table)) + + if table_fqn: + result[OpenlineageSource._get_ol_table_name(table)] = table_fqn + + return result + + @classmethod + def _create_output_lineage_dict( + cls, lineage_info: List[Tuple[str, str, str, str]] + ) -> Dict[str, Dict[str, List[ColumnLineage]]]: + result = defaultdict(lambda: defaultdict(list)) + for (output_table, input_table, output_column), group in groupby( + lineage_info, lambda x: x[:3] + ): + input_columns = [input_col for _, _, _, input_col in group] + + result[output_table][input_table] += [ + ColumnLineage(toColumn=output_column, fromColumns=input_columns) + ] + + return result + + def _get_column_lineage( + self, inputs: List, outputs: List + ) -> Dict[str, Dict[str, List[ColumnLineage]]]: + _result: List = [] + + ol_name_to_fqn_map = self._build_ol_name_to_fqn_map(inputs + outputs) + + for table in outputs: + output_table_fqn = self._get_table_fqn( + OpenlineageSource._get_table_details(table) + ) + for field_name, field_spec in ( + table.get("facets", {}) + .get("columnLineage", {}) + .get("fields", {}) + .items() + ): + for input_field in field_spec.get("inputFields", []): + input_table_ol_name = OpenlineageSource._get_ol_table_name( + input_field + ) + + _result.append( # output table, input table, output column, input column + ( + output_table_fqn, + ol_name_to_fqn_map.get(input_table_ol_name), + f"{output_table_fqn}.{field_name}", + f'{ol_name_to_fqn_map.get(input_table_ol_name)}.{input_field.get("field")}', + ) + ) + + return OpenlineageSource._create_output_lineage_dict(_result) + + def yield_pipeline( + self, pipeline_details: OpenLineageEvent + ) -> Iterable[Either[CreatePipelineRequest]]: + pipeline_name = self.get_pipeline_name(pipeline_details) + try: + description = f"""```json + {json.dumps(pipeline_details.run_facet, indent=4).strip()}```""" + request = CreatePipelineRequest( + name=pipeline_name, + service=self.context.pipeline_service, + description=description, + ) + + yield Either(right=request) + self.register_record(pipeline_request=request) + except ValueError: + yield Either( + left=StackTraceError( + name=pipeline_name, + message="Failed to collect metadata required for pipeline creation.", + ), + stackTrace=traceback.format_exc(), + ) + + def yield_pipeline_lineage_details( + self, pipeline_details: OpenLineageEvent + ) -> Iterable[Either[AddLineageRequest]]: + inputs, outputs = pipeline_details.inputs, pipeline_details.outputs + + input_edges: List[LineageNode] = [] + output_edges: List[LineageNode] = [] + + for spec in [(inputs, input_edges), (outputs, output_edges)]: + tables, tables_list = spec + + for table in tables: + create_table_request = self.get_create_table_request(table) + + if create_table_request: + yield create_table_request + + table_fqn = self._get_table_fqn( + OpenlineageSource._get_table_details(table) + ) + + if table_fqn: + tables_list.append( + LineageNode( + fqn=TableFQN(value=table_fqn), + uuid=self.metadata.get_by_name(Table, table_fqn).id, + ) + ) + + edges = [ + LineageEdge(from_node=n[0], to_node=n[1]) + for n in product(input_edges, output_edges) + ] + + column_lineage = self._get_column_lineage(inputs, outputs) + + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self.context.pipeline_service, + pipeline_name=self.context.pipeline, + ) + + pipeline_entity = self.metadata.get_by_name(entity=Pipeline, fqn=pipeline_fqn) + for edge in edges: + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=edge.from_node.uuid, type=edge.from_node.node_type + ), + toEntity=EntityReference( + id=edge.to_node.uuid, type=edge.to_node.node_type + ), + lineageDetails=LineageDetails( + pipeline=EntityReference( + id=pipeline_entity.id.__root__, + type="pipeline", + ), + description=f"Lineage extracted from OpenLineage job: {pipeline_details.job['name']}", + source=Source.OpenLineage, + columnsLineage=column_lineage.get( + edge.to_node.fqn.value, {} + ).get(edge.from_node.fqn.value, []), + ), + ), + ) + ) + + def get_pipelines_list(self) -> Optional[List[Any]]: + """Get List of all pipelines""" + try: + consumer = self.client + session_active = True + empty_msg_cnt = 0 + pool_timeout = self.service_connection.poolTimeout + while session_active: + message = consumer.poll(timeout=pool_timeout) + if message is None: + logger.debug("no new messages") + empty_msg_cnt += 1 + if ( + empty_msg_cnt * pool_timeout + > self.service_connection.sessionTimeout + ): + # There is no new messages, timeout is passed + session_active = False + else: + logger.debug(f"new message {message.value()}") + empty_msg_cnt = 0 + try: + _result = message_to_open_lineage_event( + json.loads(message.value()) + ) + result = self._filter_event_by_type(_result, EventType.COMPLETE) + if result: + yield result + except Exception as e: + logger.debug(e) + + except Exception as e: + traceback.print_exc() + + raise InvalidSourceException(f"Failed to read from Kafka: {str(e)}") + + finally: + # Close down consumer to commit final offsets. + # @todo address this + consumer.close() + + def get_pipeline_name(self, pipeline_details: OpenLineageEvent) -> str: + return OpenlineageSource._render_pipeline_name(pipeline_details) + + def yield_pipeline_status( + self, pipeline_details: OpenLineageEvent + ) -> Iterable[Either[OMetaPipelineStatus]]: + pass + + def mark_pipelines_as_deleted(self): + """ + OpenLineage pipelines are coming from streaming data and hence subsequent executions of ingestion processes + can cause deletion of a pipeline. Because of this we turn off pipeline deletion by overwriting this method + and leaving it blank. Setting 'Mark Deleted Pipelines' in ingestion process will have no effect! + """ diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/models.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/models.py new file mode 100644 index 00000000000..8a05dc3f568 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/models.py @@ -0,0 +1,88 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Openlineage Source Model module +""" + +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, List + + +@dataclass +class OpenLineageEvent: + """ + An object containing data extracted from raw OpenLineage event. Used as a basis for all abstract methods of + OpenlineageSource connector. + """ + + run_facet: Dict + job: Dict + event_type: str + inputs: List[Any] + outputs: List[Any] + + +@dataclass +class TableFQN: + """ + Fully Qualified Name of a Table. + """ + + value: str + + +@dataclass +class ColumnFQN: + """ + Fully Qualified Name of a Column. + """ + + value: str + + +@dataclass +class LineageNode: + """ + A node being a part of Lineage information. + """ + + uuid: str + fqn: TableFQN + node_type: str = "table" + + +@dataclass +class LineageEdge: + """ + An object describing connection of two nodes in the Lineage information. + """ + + from_node: LineageNode + to_node: LineageNode + + +@dataclass +class TableDetails: + """ + Minimal table information. + """ + + schema: str + name: str + + +class EventType(str, Enum): + """ + List of used OpenLineage event types. + """ + + COMPLETE = "COMPLETE" diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py new file mode 100644 index 00000000000..4be9b16594b --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py @@ -0,0 +1,59 @@ +""" +Utils used by OpenlineageSource connector. +""" + +from functools import reduce +from typing import Dict + +from metadata.ingestion.source.pipeline.openlineage.models import OpenLineageEvent + + +def message_to_open_lineage_event(incoming_event: Dict) -> OpenLineageEvent: + """ + Method that takes raw Open Lineage event and parses is to shape into OpenLineageEvent. + + We check whether received event (from Kafka) adheres to expected form and contains all the fields that are required + for successful processing by OpenMetadata OpenLineage connector. + + :param incoming_event: raw event received from kafka topic by OpenlineageSource + :return: OpenLineageEvent + """ + fields_to_verify = [ + "run.facets.parent.job.name", + "run.facets.parent.job.namespace", + "inputs", + "outputs", + "eventType", + "job.name", + "job.namespace", + ] + + for field in fields_to_verify: + try: + reduce(lambda x, y: x[y], field.split("."), incoming_event) + except KeyError: + raise ValueError("Event malformed!") + + run_facet = incoming_event["run"] + inputs = incoming_event["inputs"] + outputs = incoming_event["outputs"] + event_type = incoming_event["eventType"] + job = incoming_event["job"] + + result = OpenLineageEvent( + run_facet=run_facet, + event_type=event_type, + job=job, + inputs=inputs, + outputs=outputs, + ) + + return result + + +class FQNNotFoundException(Exception): + """ + Error raised when, while searching for an entity (Table, DatabaseSchema) there is no match in OM. + """ + + pass diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 0140cd2a4d1..d0684ab3304 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -192,17 +192,34 @@ def _( @fqn_build_registry.add(DatabaseSchema) def _( - _: Optional[OpenMetadata], # ES Search not enabled for Schemas + metadata: Optional[OpenMetadata], # ES Search not enabled for Schemas *, service_name: str, - database_name: str, + database_name: Optional[str], schema_name: str, -) -> str: - if not service_name or not database_name or not schema_name: - raise FQNBuildingException( - f"Args should be informed, but got service=`{service_name}`, db=`{database_name}`, schema=`{schema_name}`" + skip_es_search: bool = True, + fetch_multiple_entities: bool = False, +) -> Union[Optional[str], Optional[List[str]]]: + entity: Optional[Union[DatabaseSchema, List[DatabaseSchema]]] = None + + if not skip_es_search: + entity = search_database_schema_from_es( + metadata=metadata, + database_name=database_name, + schema_name=schema_name, + fetch_multiple_entities=fetch_multiple_entities, + service_name=service_name, ) - return _build(service_name, database_name, schema_name) + + if not entity and database_name: + fqn = _build(service_name, database_name, schema_name) + return [fqn] if fetch_multiple_entities else fqn + if entity and fetch_multiple_entities: + return [str(table.fullyQualifiedName.__root__) for table in entity] + if entity: + return str(entity.fullyQualifiedName.__root__) + + return None @fqn_build_registry.add(Database) @@ -574,6 +591,43 @@ def build_es_fqn_search_string( return fqn_search_string +def search_database_schema_from_es( + metadata: OpenMetadata, + database_name: str, + schema_name: str, + service_name: str, + fetch_multiple_entities: bool = False, + fields: Optional[str] = None, +): + """ + Find database schema entity in elasticsearch index. + + :param metadata: OM Client + :param database_name: name of database in which we are searching for database schema + :param schema_name: name of schema we are searching for + :param service_name: name of service in which we are searching for database schema + :param fetch_multiple_entities: should single match be returned or all matches + :param fields: additional fields to return + :return: entity / entities matching search criteria + """ + if not schema_name: + raise FQNBuildingException( + f"Schema Name should be informed, but got schema_name=`{schema_name}`" + ) + + fqn_search_string = _build(service_name or "*", database_name or "*", schema_name) + + es_result = metadata.es_search_from_fqn( + entity_type=DatabaseSchema, + fqn_search_string=fqn_search_string, + fields=fields, + ) + + return get_entity_from_es_result( + entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities + ) + + def search_table_from_es( metadata: OpenMetadata, database_name: str, diff --git a/ingestion/tests/unit/resources/datasets/openlineage_event.json b/ingestion/tests/unit/resources/datasets/openlineage_event.json new file mode 100644 index 00000000000..a04bd7502ff --- /dev/null +++ b/ingestion/tests/unit/resources/datasets/openlineage_event.json @@ -0,0 +1,330 @@ + { + "eventTime": "2023-12-16T14:22:11.949Z", + "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", + "eventType": "COMPLETE", + "run": { + "runId": "59fc8906-4a4a-45ab-9a54-9cc2d399e10e", + "facets": { + "parent": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json#/$defs/ParentRunFacet", + "run": { + "runId": "daf8bcc1-cc3c-41bb-9251-334cacf698fa" + }, + "job": { + "namespace": "TESTSchedulerID", + "name": "TESTParentJobName4" + } + }, + "spark.logicalPlan": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "plan": [ + { + "class": "org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand", + "num-children": 1, + "table": { + "product-class": "org.apache.spark.sql.catalyst.catalog.CatalogTable", + "identifier": { + "product-class": "org.apache.spark.sql.catalyst.TableIdentifier", + "table": "dst_table_test_from_src_table_df", + "database": "test_db" + }, + "tableType": { + "product-class": "org.apache.spark.sql.catalyst.catalog.CatalogTableType", + "name": "MANAGED" + }, + "storage": { + "product-class": "org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat", + "compressed": false, + "properties": null + }, + "schema": { + "type": "struct", + "fields": [] + }, + "provider": "parquet", + "partitionColumnNames": [], + "owner": "", + "createTime": 1702736481797, + "lastAccessTime": -1, + "createVersion": "", + "properties": null, + "unsupportedFeatures": [], + "tracksPartitionsInCatalog": false, + "schemaPreservesCase": true, + "ignoredProperties": null + }, + "mode": null, + "query": 0, + "outputColumnNames": "[id, randomid, zip]" + }, + { + "class": "org.apache.spark.sql.execution.datasources.LogicalRelation", + "num-children": 0, + "relation": null, + "output": [ + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "id", + "dataType": "long", + "nullable": true, + "metadata": {}, + "exprId": { + "product-class": "org.apache.spark.sql.catalyst.expressions.ExprId", + "id": 9, + "jvmId": "78343417-b80d-45a2-b489-e7d8920e61dd" + }, + "qualifier": [] + } + ], + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "randomid", + "dataType": "string", + "nullable": true, + "metadata": {}, + "exprId": { + "product-class": "org.apache.spark.sql.catalyst.expressions.ExprId", + "id": 10, + "jvmId": "78343417-b80d-45a2-b489-e7d8920e61dd" + }, + "qualifier": [] + } + ], + [ + { + "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference", + "num-children": 0, + "name": "zip", + "dataType": "string", + "nullable": true, + "metadata": {}, + "exprId": { + "product-class": "org.apache.spark.sql.catalyst.expressions.ExprId", + "id": 11, + "jvmId": "78343417-b80d-45a2-b489-e7d8920e61dd" + }, + "qualifier": [] + } + ] + ], + "catalogTable": { + "product-class": "org.apache.spark.sql.catalyst.catalog.CatalogTable", + "identifier": { + "product-class": "org.apache.spark.sql.catalyst.TableIdentifier", + "table": "src_table_test", + "database": "test_db" + }, + "tableType": { + "product-class": "org.apache.spark.sql.catalyst.catalog.CatalogTableType", + "name": "MANAGED" + }, + "storage": { + "product-class": "org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat", + "locationUri": null, + "inputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "outputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", + "serde": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + "compressed": false, + "properties": null + }, + "schema": { + "type": "struct", + "fields": [ + { + "name": "id", + "type": "long", + "nullable": true, + "metadata": {} + }, + { + "name": "randomid", + "type": "string", + "nullable": true, + "metadata": {} + }, + { + "name": "zip", + "type": "string", + "nullable": true, + "metadata": { + "__CHAR_VARCHAR_TYPE_STRING": "varchar(10)" + } + } + ] + }, + "provider": "parquet", + "partitionColumnNames": [], + "owner": "test_user@ad.test.net", + "createTime": 1682523315000, + "lastAccessTime": 0, + "createVersion": "3.3.1", + "properties": null, + "stats": null, + "unsupportedFeatures": [], + "tracksPartitionsInCatalog": false, + "schemaPreservesCase": true, + "ignoredProperties": null + }, + "isStreaming": false + } + ] + }, + "spark_version": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "spark-version": "3.3.1", + "openlineage-spark-version": "1.5.0" + }, + "processing_engine": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-0/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", + "version": "3.3.1", + "name": "spark", + "openlineageAdapterVersion": "1.5.0" + }, + "environment-properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "environment-properties": {} + } + } + }, + "job": { + "namespace": "TESTSchedulerID", + "name": "test_user_spark.execute_create_data_source_table_as_select_command.dst_table_test_from_src_table_df", + "facets": {} + }, + "inputs": [ + { + "namespace": "s3a://test_db-db", + "name": "src_table_test", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "s3a://test_db-db", + "uri": "s3a://test_db-db" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "id", + "type": "long" + }, + { + "name": "randomid", + "type": "string" + }, + { + "name": "zip", + "type": "string" + } + ] + }, + "symlinks": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", + "identifiers": [ + { + "namespace": "hive://hive-metastore.hive.svc.cluster.local:9083", + "name": "shopify.raw_product_catalog", + "type": "TABLE" + } + ] + } + }, + "inputFacets": {} + } + ], + "outputs": [ + { + "namespace": "s3a://test_db-db", + "name": "dst_table_test_from_src_table_df", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "s3a://test_db-db", + "uri": "s3a://test_db-db" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "id", + "type": "long" + }, + { + "name": "randomid", + "type": "string" + }, + { + "name": "zip", + "type": "string" + } + ] + }, + "columnLineage": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet", + "fields": { + "id": { + "inputFields": [ + { + "namespace": "s3a://test_db-db", + "name": "/src_table_test", + "field": "comments" + } + ] + }, + "randomid": { + "inputFields": [ + { + "namespace": "s3a://test_db-db", + "name": "/src_table_test", + "field": "products" + } + ] + }, + "zip": { + "inputFields": [ + { + "namespace": "s3a://test_db-db", + "name": "/src_table_test", + "field": "platform" + } + ] + } + } + }, + "symlinks": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", + "identifiers": [ + { + "namespace": "hive://hive-metastore.hive.svc.cluster.local:9083", + "name": "shopify.fact_order_new5", + "type": "TABLE" + } + ] + }, + "lifecycleStateChange": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.5.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet", + "lifecycleStateChange": "CREATE" + } + }, + "outputFacets": {} + } + ] + } \ No newline at end of file diff --git a/ingestion/tests/unit/topology/pipeline/test_openlineage.py b/ingestion/tests/unit/topology/pipeline/test_openlineage.py new file mode 100644 index 00000000000..f1c74668bae --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_openlineage.py @@ -0,0 +1,533 @@ +import copy +import json +import unittest +from pathlib import Path +from unittest.mock import MagicMock, Mock, patch +from uuid import UUID + +from metadata.generated.schema.entity.data.pipeline import Pipeline, Task +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import ( + ConsumerOffsets, + SecurityProtocol, +) +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.basic import FullyQualifiedEntityName +from metadata.generated.schema.type.entityLineage import ColumnLineage +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.source.pipeline.openlineage.metadata import OpenlineageSource +from metadata.ingestion.source.pipeline.openlineage.models import OpenLineageEvent +from metadata.ingestion.source.pipeline.openlineage.utils import ( + message_to_open_lineage_event, +) + +# Global constants +MOCK_OL_CONFIG = { + "source": { + "type": "openlineage", + "serviceName": "openlineage_source", + "serviceConnection": { + "config": { + "type": "OpenLineage", + "brokersUrl": "testbroker:9092", + "topicName": "test-topic", + "consumerGroupName": "test-consumergroup", + "consumerOffsets": ConsumerOffsets.earliest, + "securityProtocol": SecurityProtocol.PLAINTEXT, + "SSLCertificateLocation": "", + "SSLKeyLocation": "", + "SSLCALocation": "", + "poolTimeout": 0.3, + "sessionTimeout": 1, + } + }, + "sourceConfig": {"config": {"type": "PipelineMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} +MOCK_SPLINE_UI_URL = "http://localhost:9090" +PIPELINE_ID = "3f784e72-5bf7-5704-8828-ae8464fe915b:lhq160w0" +MOCK_PIPELINE_URL = f"{MOCK_SPLINE_UI_URL}/app/events/overview/{PIPELINE_ID}" +MOCK_PIPELINE_SERVICE = PipelineService( + id="85811038-099a-11ed-861d-0242ac120002", + name="openlineage_source", + fullyQualifiedName=FullyQualifiedEntityName(__root__="openlineage_source"), + connection=PipelineConnection(), + serviceType=PipelineServiceType.Airflow, +) + +MOCK_PIPELINE = Pipeline( + id="2aaa012e-099a-11ed-861d-0242ac120002", + name=PIPELINE_ID, + fullyQualifiedName=f"openlineage_source.{PIPELINE_ID}", + displayName="MSSQL <> Postgres", + sourceUrl=MOCK_PIPELINE_URL, + tasks=[ + Task( + name=PIPELINE_ID, + displayName="jdbc postgres ssl app", + sourceUrl=MOCK_PIPELINE_URL, + ) + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + +VALID_EVENT = { + "run": { + "facets": { + "parent": {"job": {"name": "test-job", "namespace": "test-namespace"}} + } + }, + "inputs": [], + "outputs": [], + "eventType": "START", + "job": {"name": "test-job", "namespace": "test-namespace"}, +} + +MISSING_RUN_FACETS_PARENT_JOB_NAME_EVENT = copy.deepcopy(VALID_EVENT) +del MISSING_RUN_FACETS_PARENT_JOB_NAME_EVENT["run"]["facets"]["parent"]["job"]["name"] + +MALFORMED_NESTED_STRUCTURE_EVENT = copy.deepcopy(VALID_EVENT) +MALFORMED_NESTED_STRUCTURE_EVENT["run"]["facets"]["parent"]["job"] = "Not a dict" + +with open( + f"{Path(__file__).parent}/../../resources/datasets/openlineage_event.json" +) as ol_file: + FULL_OL_KAFKA_EVENT = json.load(ol_file) + +EXPECTED_OL_EVENT = OpenLineageEvent( + run_facet=FULL_OL_KAFKA_EVENT["run"], + job=FULL_OL_KAFKA_EVENT["job"], + event_type=FULL_OL_KAFKA_EVENT["eventType"], + inputs=FULL_OL_KAFKA_EVENT["inputs"], + outputs=FULL_OL_KAFKA_EVENT["outputs"], +) + + +class OpenLineageUnitTest(unittest.TestCase): + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection" + ) + def __init__(self, methodName, test_connection) -> None: + super().__init__(methodName) + test_connection.return_value = False + config = OpenMetadataWorkflowConfig.parse_obj(MOCK_OL_CONFIG) + self.open_lineage_source = OpenlineageSource.create( + MOCK_OL_CONFIG["source"], + config.workflowConfig.openMetadataServerConfig, + ) + self.open_lineage_source.context.__dict__[ + "pipeline" + ] = MOCK_PIPELINE.name.__root__ + self.open_lineage_source.context.__dict__[ + "pipeline_service" + ] = MOCK_PIPELINE_SERVICE.name.__root__ + self.open_lineage_source.source_config.dbServiceNames = ["skun"] + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection" + ) + @patch("confluent_kafka.Consumer") + def setUp(self, mock_consumer, mock_test_connection): + mock_test_connection.return_value = False + self.mock_consumer = mock_consumer + + def setup_mock_consumer_with_kafka_event(self, event): + mock_msg = MagicMock() + mock_msg.error.return_value = None + mock_msg.value.return_value = json.dumps(event).encode() + self.mock_consumer.poll.side_effect = [ + mock_msg, + None, + None, + None, + None, + None, + None, + ] + self.open_lineage_source.client = self.mock_consumer + + def test_message_to_ol_event_valid_event(self): + """Test conversion with a valid event.""" + result = message_to_open_lineage_event(VALID_EVENT) + self.assertIsInstance(result, OpenLineageEvent) + + def test_message_to_ol_event_missing_run_facets_parent_job_name(self): + """Test conversion with missing 'run.facets.parent.job.name' field.""" + with self.assertRaises(ValueError): + message_to_open_lineage_event(MISSING_RUN_FACETS_PARENT_JOB_NAME_EVENT) + + def test_message_to_ol_event_malformed_nested_structure(self): + """Test conversion with a malformed nested structure.""" + with self.assertRaises(TypeError): + message_to_open_lineage_event(MALFORMED_NESTED_STRUCTURE_EVENT) + + def test_poll_message_receives_message(self): + """Test if poll_message receives a kafka message.""" + self.setup_mock_consumer_with_kafka_event(VALID_EVENT) + result = self.open_lineage_source.client.poll(timeout=1) + self.assertIsNotNone(result) + self.assertEqual(json.loads(result.value().decode()), VALID_EVENT) + + def read_openlineage_event_from_kafka(self, kafka_event): + self.setup_mock_consumer_with_kafka_event(kafka_event) + result_generator = self.open_lineage_source.get_pipelines_list() + results = [] + try: + while True: + results.append(next(result_generator)) + except StopIteration: + pass + return results[0] + + def test_create_output_lineage_dict_empty_input(self): + """Test with an empty input list.""" + result = self.open_lineage_source._create_output_lineage_dict([]) + self.assertEqual(result, {}) + + def test_create_output_lineage_dict_single_lineage_entry(self): + """Test with a single lineage entry.""" + lineage_info = [ + ("output_table", "input_table", "output_column", "input_column") + ] + result = self.open_lineage_source._create_output_lineage_dict(lineage_info) + expected = { + "output_table": { + "input_table": [ + ColumnLineage( + toColumn="output_column", fromColumns=["input_column"] + ) + ] + } + } + self.assertEqual(result, expected) + + def test_create_output_lineage_dict_multiple_entries_different_outputs(self): + """Test with multiple entries having different output tables.""" + lineage_info = [ + ("output_table1", "input_table", "output_column1", "input_column"), + ("output_table2", "input_table", "output_column2", "input_column"), + ] + result = self.open_lineage_source._create_output_lineage_dict(lineage_info) + expected = { + "output_table1": { + "input_table": [ + ColumnLineage( + toColumn="output_column1", fromColumns=["input_column"] + ) + ] + }, + "output_table2": { + "input_table": [ + ColumnLineage( + toColumn="output_column2", fromColumns=["input_column"] + ) + ] + }, + } + self.assertEqual(result, expected) + + def test_create_output_lineage_dict_multiple_entries_same_output(self): + """Test with multiple entries sharing the same output table.""" + lineage_info = [ + ("output_table", "input_table1", "output_column", "input_column1"), + ("output_table", "input_table2", "output_column", "input_column2"), + ] + result = self.open_lineage_source._create_output_lineage_dict(lineage_info) + expected = { + "output_table": { + "input_table1": [ + ColumnLineage( + toColumn="output_column", fromColumns=["input_column1"] + ) + ], + "input_table2": [ + ColumnLineage( + toColumn="output_column", fromColumns=["input_column2"] + ) + ], + } + } + self.assertEqual(result, expected) + + def test_get_column_lineage_empty_inputs_outputs(self): + """Test with empty input and output lists.""" + inputs = [] + outputs = [] + result = self.open_lineage_source._get_column_lineage(inputs, outputs) + self.assertEqual(result, {}) + + @patch( + "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn" + ) + def test_build_ol_name_to_fqn_map_with_valid_data(self, mock_get_table_fqn): + # Mock _get_table_fqn to return a constructed FQN based on the provided table details + mock_get_table_fqn.side_effect = ( + lambda table_details: f"database.schema.{table_details.name}" + ) + + tables = [ + {"name": "schema.table1", "facets": {}, "namespace": "ns://"}, + {"name": "schema.table2", "facets": {}, "namespace": "ns://"}, + ] + + expected_map = { + "ns://schema.table1": "database.schema.table1", + "ns://schema.table2": "database.schema.table2", + } + + result = self.open_lineage_source._build_ol_name_to_fqn_map(tables) + + self.assertEqual(result, expected_map) + self.assertEqual(mock_get_table_fqn.call_count, 2) + + @patch( + "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn" + ) + def test_build_ol_name_to_fqn_map_with_missing_fqn(self, mock_get_table_fqn): + # Mock _get_table_fqn to return None for missing FQN + mock_get_table_fqn.return_value = None + + tables = [{"name": "schema.table1", "facets": {}, "namespace": "ns://"}] + + expected_map = {} # Expect an empty map since FQN is missing + + result = self.open_lineage_source._build_ol_name_to_fqn_map(tables) + + self.assertEqual(result, expected_map) + + @patch( + "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn" + ) + def test_build_ol_name_to_fqn_map_with_empty_tables(self, mock_get_table_fqn): + # No need to set up the mock specifically since it won't be called with empty input + + tables = [] # No tables provided + + expected_map = {} # Expect an empty map + + result = self.open_lineage_source._build_ol_name_to_fqn_map(tables) + + self.assertEqual(result, expected_map) + mock_get_table_fqn.assert_not_called() + + @patch( + "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn" + ) + @patch( + "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._build_ol_name_to_fqn_map" + ) + def test_get_column_lineage_valid_inputs_outputs( + self, mock_build_map, mock_get_table_fqn + ): + """Test with valid input and output lists.""" + # Setup + mock_get_table_fqn.side_effect = ( + lambda table_details: f"database.schema.{table_details.name}" + ) + mock_build_map.return_value = { + "s3a:/project-db/src_test1": "database.schema.input_table_1", + "s3a:/project-db/src_test2": "database.schema.input_table_2", + } + + inputs = [ + {"name": "schema.input_table1", "facets": {}, "namespace": "hive://"}, + {"name": "schema.input_table2", "facets": {}, "namespace": "hive://"}, + ] + outputs = [ + { + "name": "schema.output_table", + "facets": { + "columnLineage": { + "fields": { + "output_column1": { + "inputFields": [ + { + "field": "input_column1", + "namespace": "s3a://project-db", + "name": "/src_test1", + } + ] + }, + "output_column2": { + "inputFields": [ + { + "field": "input_column2", + "namespace": "s3a://project-db", + "name": "/src_test2", + } + ] + }, + } + } + }, + } + ] + result = self.open_lineage_source._get_column_lineage(inputs, outputs) + + expected = { + "database.schema.output_table": { + "database.schema.input_table_1": [ + ColumnLineage( + toColumn="database.schema.output_table.output_column1", + fromColumns=["database.schema.input_table_1.input_column1"], + ) + ], + "database.schema.input_table_2": [ + ColumnLineage( + toColumn="database.schema.output_table.output_column2", + fromColumns=["database.schema.input_table_2.input_column2"], + ) + ], + } + } + self.assertEqual(result, expected) + + def test_get_column_lineage__invalid_inputs_outputs_structure(self): + """Test with invalid input and output structure.""" + inputs = [{"invalid": "data"}] + outputs = [{"invalid": "data"}] + with self.assertRaises(ValueError): + self.open_lineage_source._get_column_lineage(inputs, outputs) + + def test_get_table_details_with_symlinks(self): + """Test with valid data where symlinks are present.""" + data = { + "facets": {"symlinks": {"identifiers": [{"name": "project.schema.table"}]}} + } + result = self.open_lineage_source._get_table_details(data) + self.assertEqual(result.name, "table") + self.assertEqual(result.schema, "schema") + + def test_get_table_details_without_symlinks(self): + """Test with valid data but without symlinks.""" + data = {"name": "schema.table"} + result = self.open_lineage_source._get_table_details(data) + self.assertEqual(result.name, "table") + self.assertEqual(result.schema, "schema") + + def test_get_table_details_invalid_data_missing_symlinks_and_name(self): + """Test with invalid data missing both symlinks and name.""" + data = {} + with self.assertRaises(ValueError): + self.open_lineage_source._get_table_details(data) + + def test_get_table_details_invalid_symlinks_structure(self): + """Test with invalid symlinks structure.""" + data = {"facets": {"symlinks": {"identifiers": [{}]}}} + with self.assertRaises(ValueError): + self.open_lineage_source._get_table_details(data) + + def test_get_table_details_invalid_name_structure(self): + """Test with invalid name structure.""" + data = {"name": "invalidname"} + with self.assertRaises(ValueError): + self.open_lineage_source._get_table_details(data) + + def test_get_pipelines_list(self): + """Test get_pipelines_list method""" + ol_event = self.read_openlineage_event_from_kafka(FULL_OL_KAFKA_EVENT) + self.assertIsInstance(ol_event, OpenLineageEvent) + self.assertEqual(ol_event, EXPECTED_OL_EVENT) + + @patch( + "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om" + ) + def test_yield_pipeline_lineage_details(self, mock_get_entity): + def t_fqn_build_side_effect( + table_details, + ): + return f"testService.shopify.{table_details.name}" + + def mock_get_uuid_by_name(entity, fqn): + if fqn == "testService.shopify.raw_product_catalog": + # source of table lineage + return Mock(id="69fc8906-4a4a-45ab-9a54-9cc2d399e10e") + elif fqn == "testService.shopify.fact_order_new5": + # dst of table lineage + return Mock(id="59fc8906-4a4a-45ab-9a54-9cc2d399e10e") + else: + # pipeline + z = Mock() + z.id.__root__ = "79fc8906-4a4a-45ab-9a54-9cc2d399e10e" + return z + + def extract_lineage_details(pip_results): + table_lineage = [] + col_lineage = [] + for r in pip_results: + table_lineage.append( + ( + r.right.edge.fromEntity.id.__root__, + r.right.edge.toEntity.id.__root__, + ) + ) + for col in r.right.edge.lineageDetails.columnsLineage: + col_lineage.append( + (col.fromColumns[0].__root__, col.toColumn.__root__) + ) + return table_lineage, col_lineage + + # Set up the side effect for the mock entity FQN builder + mock_get_entity.side_effect = t_fqn_build_side_effect + + ol_event = self.read_openlineage_event_from_kafka(FULL_OL_KAFKA_EVENT) + + with patch.object( + OpenMetadataConnection, + "get_by_name", + create=True, + side_effect=mock_get_uuid_by_name, + ): + pip_results = self.open_lineage_source.yield_pipeline_lineage_details( + ol_event + ) + table_lineage, col_lineage = extract_lineage_details(pip_results) + + expected_table_lineage = [ + ( + UUID("69fc8906-4a4a-45ab-9a54-9cc2d399e10e"), + UUID("59fc8906-4a4a-45ab-9a54-9cc2d399e10e"), + ) + ] + expected_col_lineage = [ + ( + "testService.shopify.raw_product_catalog.comments", + "testService.shopify.fact_order_new5.id", + ), + ( + "testService.shopify.raw_product_catalog.products", + "testService.shopify.fact_order_new5.randomid", + ), + ( + "testService.shopify.raw_product_catalog.platform", + "testService.shopify.fact_order_new5.zip", + ), + ] + + self.assertEqual(col_lineage, expected_col_lineage) + self.assertEqual(table_lineage, expected_table_lineage) + + +if __name__ == "__main__": + unittest.main() diff --git a/openmetadata-docs/content/v1.3.x/connectors/pipeline/openlineage/index.md b/openmetadata-docs/content/v1.3.x/connectors/pipeline/openlineage/index.md new file mode 100644 index 00000000000..e113020e5bd --- /dev/null +++ b/openmetadata-docs/content/v1.3.x/connectors/pipeline/openlineage/index.md @@ -0,0 +1,151 @@ +--- +title: OpenLineage +slug: /connectors/pipeline/openlineage +--- + +# OpenLineage + +In this section, we provide guides and references to use the OpenLineage connector. + +## What is OpenLineage? + +According to [documentation](https://openlineage.io/docs/): + +``` +OpenLineage is an open framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata. +``` + +OpenLineage, apart from being a specification, is also a set of integrations collecting lineage from various systems such as Apache Airflow and Spark. + + +## OpenMetadata Openlineage connector + +OpenMetadata OpenLineage connector consumes open lineage events from kafka broker and translates it to OpenMetadata Lineage information. + +{% image + src="/images/v1.3/connectors/pipeline/openlineage/connector-flow.svg" + alt="OpenLineage Connector" /%} + + +### Airflow OpenLineage events + +Configure your Airflow instance + +1. Install appropriate provider in Airflow: [apache-airflow-providers-openlineage](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/index.html) +2. Configure OpenLineage Provider in Airflow - [documentation](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html#using-openlineage-integration) + 1. remember to use `kafka` transport mode as this connector works under assumption OL events are collected from kafka topic + 2. detailed list of configuration options for OpenLineage can be found [here](https://openlineage.io/docs/client/python/#configuration) + +### Spark OpenLineage events + +Configure Spark Session to produce OpenLineage events compatible with OpenLineage connector available in OpenMetadata. + +# @todo complete kafka config +```shell +from pyspark.sql import SparkSession +from uuid import uuid4 + +spark = SparkSession.builder\ +.config('spark.openlineage.namespace', 'mynamespace')\ +.config('spark.openlineage.parentJobName', 'hello-world')\ +.config('spark.openlineage.parentRunId', str(uuid4()))\ +.config('spark.jars.packages', 'io.openlineage:openlineage-spark:1.7.0')\ +.config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')\ +.config('spark.openlineage.transport.type', 'kafka')\ +.getOrCreate() +``` + +## Requirements + +{% note %} +We support OpenLineage events created by OpenLineage versions starting from OpenLineage 1.7.0 +{% /note %} + +## Metadata Ingestion + +#### Connection Details + +##### Providing connection details via UI + +{% partial + file="/v1.3/connectors/metadata-ingestion-ui.md" + variables={ + connector: "Openlineage", + selectServicePath: "/images/v1.3/connectors/openlineage/select-service.png", + addNewServicePath: "/images/v1.3/connectors/openlineage/add-new-service.png", + serviceConnectionPath: "/images/v1.3/connectors/openlineage/service-connection.png", +} +/%} + +{% stepsContainer %} +{% extraContent parentTagNme="stepsContainer" %} + +##### Providing connection details programmatically via API +###### 1. Preparing the Client + +```python +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata + +server_config = OpenMetadataConnection( + hostPort="http://localhost:8585/api", + authProvider="openmetadata", + securityConfig=OpenMetadataJWTClientConfig( + jwtToken="" + ), +) +metadata = OpenMetadata(server_config) + +assert metadata.health_check() # Will fail if we cannot reach the server +``` +###### 2. Creating the OpenLineage Pipeline service +```python + +from metadata.generated.schema.api.services.createPipelineService import CreatePipelineServiceRequest +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineServiceType, + PipelineConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import ( + OpenLineageConnection, + SecurityProtocol as KafkaSecurityProtocol, + ConsumerOffsets +) + + +openlineage_service_request = CreatePipelineServiceRequest( + name='openlineage-service', + displayName='OpenLineage Service', + serviceType=PipelineServiceType.OpenLineage, + connection=PipelineConnection( + config=OpenLineageConnection( + brokersUrl='broker1:9092,broker2:9092', + topicName='openlineage-events', + consumerGroupName='openmetadata-consumer', + consumerOffsets=ConsumerOffsets.earliest, + poolTimeout=3.0, + sessionTimeout=60, + securityProtocol=KafkaSecurityProtocol.SSL, + # below ssl confing in optional and used only when securityProtocol=KafkaSecurityProtocol.SSL + SSLCertificateLocation='/path/to/kafka/certs/Certificate.pem', + SSLKeyLocation='/path/to/kafka/certs/Key.pem', + SSLCALocation='/path/to/kafka/certs/RootCA.pem', + ) + ), +) + +metadata.create_or_update(openlineage_service_request) + + +``` +{% /extraContent %} +{% partial file="/v1.3/connectors/test-connection.md" /%} +{% partial file="/v1.3/connectors/pipeline/configure-ingestion.md" /%} +{% partial file="/v1.3/connectors/ingestion-schedule-and-deploy.md" /%} +{% /stepsContainer %} +{% partial file="/v1.3/connectors/troubleshooting.md" /%} \ No newline at end of file diff --git a/openmetadata-docs/images/v1.3/connectors/openmetadata/add-new-service.png b/openmetadata-docs/images/v1.3/connectors/openmetadata/add-new-service.png new file mode 100644 index 00000000000..aa2b482e2c3 Binary files /dev/null and b/openmetadata-docs/images/v1.3/connectors/openmetadata/add-new-service.png differ diff --git a/openmetadata-docs/images/v1.3/connectors/openmetadata/select-service.png b/openmetadata-docs/images/v1.3/connectors/openmetadata/select-service.png new file mode 100644 index 00000000000..a80a0b58bf2 Binary files /dev/null and b/openmetadata-docs/images/v1.3/connectors/openmetadata/select-service.png differ diff --git a/openmetadata-docs/images/v1.3/connectors/openmetadata/service-connection.png b/openmetadata-docs/images/v1.3/connectors/openmetadata/service-connection.png new file mode 100644 index 00000000000..64d25c67bf3 Binary files /dev/null and b/openmetadata-docs/images/v1.3/connectors/openmetadata/service-connection.png differ diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/pipeline/openlineage.json b/openmetadata-service/src/main/resources/json/data/testConnections/pipeline/openlineage.json new file mode 100644 index 00000000000..49dc6bc21cd --- /dev/null +++ b/openmetadata-service/src/main/resources/json/data/testConnections/pipeline/openlineage.json @@ -0,0 +1,14 @@ +{ + "name": "OpenLineage", + "displayName": "Airflow Test Connection", + "description": "This Test Connection validates the access against the message broker.", + "steps": [ + { + "name": "GetWatermarkOffsets", + "description": "Check if the message broker is reachable to fetch the topic details.", + "errorMessage": "Failed to connect to message broker", + "shortCircuit": true, + "mandatory": true + } + ] +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json new file mode 100644 index 00000000000..c0197bf9520 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json @@ -0,0 +1,99 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/openLineageConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "OpenLineageConnection", + "description": "OpenLineage Connection Config", + "type": "object", + "javaType": "org.openmetadata.schema.services.connections.pipeline.OpenLineageConnection", + "definitions": { + "OpenLineageType": { + "description": "Service type.", + "type": "string", + "enum": ["OpenLineage"], + "default": "OpenLineage" + } + }, + "properties": { + "type": { + "description": "Service Type", + "$ref": "#/definitions/OpenLineageType", + "default": "OpenLineage" + }, + "brokersUrl": { + "title": "Kafka brokers list", + "description": "service type of the messaging source", + "type": "string" + }, + "topicName": { + "title": "Topic Name", + "description": "topic from where Open lineage events will be pulled ", + "type": "string" + }, + "consumerGroupName": { + "title": "Consumer Group", + "description": "consumer group name ", + "type": "string" + }, + "consumerOffsets": { + "title": "Initial consumer offsets", + "description": "initial Kafka consumer offset", + "default": "earliest", + "type": "string", + "enum": ["earliest", "latest"], + "javaEnums": [ + { + "name": "earliest" + }, + { + "name": "latest" + } + ] + }, + "poolTimeout": { + "title": "Single pool call timeout", + "description": "max allowed wait time", + "type": "number", + "default": 1.0 + }, + "sessionTimeout": { + "title": "Broker inactive session timeout", + "description": "max allowed inactivity time", + "type": "integer", + "default": 30 + }, + "securityProtocol": { + "title": "Kafka security protocol", + "description": "Kafka security protocol config", + "default": "PLAINTEXT", + "type": "string", + "enum": ["PLAINTEXT", "SSL"], + "javaEnums": [ + { + "name": "PLAINTEXT" + }, + { + "name": "SSL" + } + ] + }, + "SSLCertificateLocation": { + "title": "SSL Certificate location", + "description": "Kafka SSL certificate location", + "type": "string" + }, + "SSLKeyLocation": { + "title": "SSL Key location", + "description": "Kafka SSL key location", + "type": "string" + }, + "SSLCALocation": { + "title": "SSL CA location", + "description": "Kafka SSL ca location", + "type": "string" + }, + "supportsMetadataExtraction": { + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false +} diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json index 416e17e4180..77a56cedba9 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json @@ -27,7 +27,8 @@ "CustomPipeline", "DatabricksPipeline", "Spline", - "Spark" + "Spark", + "OpenLineage" ], "javaEnums": [ { @@ -62,6 +63,9 @@ }, { "name": "Spark" + }, + { + "name": "OpenLineage" } ] }, @@ -108,6 +112,9 @@ }, { "$ref": "./connections/pipeline/sparkConnection.json" + }, + { + "$ref": "./connections/pipeline/openLineageConnection.json" } ] } diff --git a/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json b/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json index 2e87189842f..3c3f7d3c481 100644 --- a/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json +++ b/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json @@ -52,7 +52,7 @@ "source": { "description": "Lineage type describes how a lineage was created.", "type": "string", - "enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage"], + "enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage"], "default": "Manual" } } diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/OpenLineage.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/OpenLineage.md new file mode 100644 index 00000000000..8e10e30fe5d --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/OpenLineage.md @@ -0,0 +1,87 @@ +# OpenLineage + +In this section, we provide guides and references to use the OpenLineage connector. You can view the full documentation [here](https://docs.open-metadata.org/connectors/pipeline/openlineage). + +## Requirements + +We ingest OpenLineage metadata by reading OpenLineage events from kafka topic + +## Connection Details + +$$section +### Kafka brokers list $(id="brokersUrl") + +OpenMetadata for reaching OpenLineage events connects to kafka brokers. + +This should be specified as a broker:port list separated by commas in the format `broker:port`. E.g., `kafkabroker1:9092,kafkabroker2:9092`. + + +$$ + +$$section +### Kafka Topic name $(id="topicName") + +OpenMetadata is reading OpenLineage events from certain kafka topic + +This should be specified as topic name string . E.g., `openlineage-events`. +$$ + +$$section +### Kafka consumer group name $(id="consumerGroupName") + +Name of consumer kafka consumer group that will be used by OpenLineage kafka consumer + +This should be specified as consumer group name string . E.g., `openmetadata-openlineage-consumer`. +$$ + +$$section +### Kafka initial consumer offsets $(id="consumerOffsets") +When new kafka consumer group is created an initial offset information is required. + +This should be specified as `earliest` or `latest` . +$$ +$$section +### Kafka single pool timeout $(id="poolTimeout") +This setting indicates how long connector should wait for new messages in single pool cal. + +This should be specified as number of seconds represented as decimal number . E.g., `1.0` +$$ + +$$section +### Kafka session timeout $(id="sessionTimeout") +This setting indicates how long connector should wait for new messages in kafka session. +After kafka session timeout is reached connector assumes that there is no new messages to be processed +and successfully ends an ingestion process . + +This should be specified as number of seconds represented as integer number . E.g., `30` . +$$ +$$section +### Kafka securityProtocol $(id="securityProtocol") +Kafka Security protocol config. + +This should be specified as `PLAINTEXT` or `SSL` . +$$ + +$$section +### Kafka SSL certificate location $(id="SSLCertificateLocation") +When Kafka security protocol is set to `SSL` then path to SSL certificate is needed. +Certificate have to be in PEM format + +This should be specified path to pem file . E.g., `/path/to/kafka/certificate.pem` . +$$ + +$$section +### Kafka SSL key location $(id="SSLKeyLocation") +When Kafka security protocol is set to `SSL` then path to SSL key is needed. +Key have to be in PEM format + +This should be specified path to pem file . E.g., `/path/to/kafka/key.pem` . +$$ + +$$section +### Kafka SSL CA location $(id="SSLCALocation") +When Kafka security protocol is set to `SSL` then path to SSL CA is needed. +CA have to be in PEM format + +This should be specified path to pem file . E.g., `/path/to/kafka/CA.pem` . +$$ \ No newline at end of file diff --git a/openmetadata-ui/src/main/resources/ui/src/assets/img/service-icon-openlineage.svg b/openmetadata-ui/src/main/resources/ui/src/assets/img/service-icon-openlineage.svg new file mode 100644 index 00000000000..7528225206b --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/assets/img/service-icon-openlineage.svg @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + diff --git a/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts b/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts index 5d42310c619..6d673ec2003 100644 --- a/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts +++ b/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts @@ -83,4 +83,5 @@ export const LINEAGE_SOURCE: { [key in Source]: string } = { [Source.QueryLineage]: 'Query Lineage', [Source.SparkLineage]: 'Spark Lineage', [Source.ViewLineage]: 'View Lineage', + [Source.OpenLineage]: 'OpenLineage', }; diff --git a/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts b/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts index 2babc958d2f..a6b500e6364 100644 --- a/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts +++ b/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts @@ -54,6 +54,7 @@ import msAzure from '../assets/img/service-icon-ms-azure.png'; import mssql from '../assets/img/service-icon-mssql.png'; import mstr from '../assets/img/service-icon-mstr.png'; import nifi from '../assets/img/service-icon-nifi.png'; +import openlineage from '../assets/img/service-icon-openlineage.svg'; import oracle from '../assets/img/service-icon-oracle.png'; import pinot from '../assets/img/service-icon-pinot.png'; import postgres from '../assets/img/service-icon-post.png'; @@ -168,6 +169,7 @@ export const FIVETRAN = fivetran; export const AMUNDSEN = amundsen; export const ATLAS = atlas; export const SAS = sas; +export const OPENLINEAGE = openlineage; export const LOGO = logo; export const AIRFLOW = airflow; @@ -386,6 +388,7 @@ export const BETA_SERVICES = [ DatabaseServiceType.Doris, PipelineServiceType.Spline, PipelineServiceType.Spark, + PipelineServiceType.OpenLineage, DashboardServiceType.QlikSense, DatabaseServiceType.Couchbase, DatabaseServiceType.Greenplum, diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts index a54d0e0f940..7c32d177512 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts @@ -23,6 +23,7 @@ import domoPipelineConnection from '../jsons/connectionSchemas/connections/pipel import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json'; import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json'; import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json'; +import openLineageConnection from '../jsons/connectionSchemas/connections/pipeline/openLineageConnection.json'; import splineConnection from '../jsons/connectionSchemas/connections/pipeline/splineConnection.json'; export const getPipelineConfig = (type: PipelineServiceType) => { @@ -80,6 +81,11 @@ export const getPipelineConfig = (type: PipelineServiceType) => { break; } + case PipelineServiceType.OpenLineage: { + schema = openLineageConnection; + + break; + } default: break; diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts b/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts index a0c3cd00420..3e4aaccf0b1 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts @@ -59,6 +59,7 @@ import { MSSQL, MYSQL, NIFI, + OPENLINEAGE, OPEN_SEARCH, ORACLE, PINOT, @@ -335,6 +336,9 @@ class ServiceUtilClassBase { case PipelineServiceType.DatabricksPipeline: return DATABRICK; + case PipelineServiceType.OpenLineage: + return OPENLINEAGE; + case MlModelServiceType.Mlflow: return MLFLOW;