From d717f8cc17d949620935a89d5fa94b141502ab07 Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Tue, 21 Jun 2022 20:52:37 +0530 Subject: [PATCH] Added pipeline metadata to the edge lines (#5552) Added pipeline metadata to the edge lines (#5552) --- .../catalog/jdbi3/LineageRepository.java | 24 +++++----- .../json/schema/type/entityLineage.json | 45 +++++++++---------- .../lineage/utils.py | 42 ++++++++--------- .../ingestion/source/pipeline/airbyte.py | 24 +++++----- .../ingestion/source/pipeline/airflow.py | 38 +++++++--------- 5 files changed, 82 insertions(+), 91 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java index c7de6f83419..68660f83228 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java @@ -13,8 +13,6 @@ package org.openmetadata.catalog.jdbi3; -import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -75,7 +73,7 @@ public class LineageRepository { private String validateLineageDetails(EntityReference from, EntityReference to, LineageDetails details) throws IOException { - if (details == null || listOrEmpty(details.getColumnsLineage()).isEmpty()) { + if (details == null) { return null; } @@ -86,17 +84,19 @@ public class LineageRepository { Table fromTable = dao.tableDAO().findEntityById(from.getId()); Table toTable = dao.tableDAO().findEntityById(to.getId()); - for (ColumnLineage columnLineage : columnsLineage) { - for (String fromColumn : columnLineage.getFromColumns()) { - // From column belongs to the fromNode - if (fromColumn.startsWith(fromTable.getFullyQualifiedName())) { - TableRepository.validateColumnFQN(fromTable, fromColumn); - } else { - Table otherTable = dao.tableDAO().findEntityByName(FullyQualifiedName.getTableFQN(fromColumn)); - TableRepository.validateColumnFQN(otherTable, fromColumn); + if (columnsLineage != null) { + for (ColumnLineage columnLineage : columnsLineage) { + for (String fromColumn : columnLineage.getFromColumns()) { + // From column belongs to the fromNode + if (fromColumn.startsWith(fromTable.getFullyQualifiedName())) { + TableRepository.validateColumnFQN(fromTable, fromColumn); + } else { + Table otherTable = dao.tableDAO().findEntityByName(FullyQualifiedName.getTableFQN(fromColumn)); + TableRepository.validateColumnFQN(otherTable, fromColumn); + } } + TableRepository.validateColumnFQN(toTable, columnLineage.getToColumn()); } - TableRepository.validateColumnFQN(toTable, columnLineage.getToColumn()); } return JsonUtils.pojoToJson(details); } diff --git a/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json b/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json index 11d0bb7554e..192c9d402e1 100644 --- a/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json +++ b/catalog-rest-service/src/main/resources/json/schema/type/entityLineage.json @@ -7,46 +7,45 @@ "javaType": "org.openmetadata.catalog.type.EntityLineage", "definitions": { "columnLineage": { - "type" : "object", + "type": "object", "properties": { - "fromColumns" : { + "fromColumns": { "description": "One or more source columns identified by fully qualified column name used by transformation function to create destination column.", - "type" : "array", - "items" : { - "$ref" : "../type/basic.json#/definitions/fullyQualifiedEntityName" + "type": "array", + "items": { + "$ref": "../type/basic.json#/definitions/fullyQualifiedEntityName" } }, - "toColumn" : { + "toColumn": { "description": "Destination column identified by fully qualified column name created by the transformation of source columns.", - "$ref" : "../type/basic.json#/definitions/fullyQualifiedEntityName" + "$ref": "../type/basic.json#/definitions/fullyQualifiedEntityName" }, - "function" : { + "function": { "description": "Transformation function applied to source columns to create destination column. That is `function(fromColumns) -> toColumn`.", - "$ref" : "../type/basic.json#/definitions/sqlFunction" + "$ref": "../type/basic.json#/definitions/sqlFunction" } } }, - "lineageDetails" : { - "description" : "Lineage details including sqlQuery + pipeline + columnLineage.", - "type" : "object", + "lineageDetails": { + "description": "Lineage details including sqlQuery + pipeline + columnLineage.", + "type": "object", "properties": { - "sqlQuery" : { + "sqlQuery": { "description": "SQL used for transformation.", - "$ref" : "../type/basic.json#/definitions/sqlQuery" + "$ref": "../type/basic.json#/definitions/sqlQuery" }, - "columnsLineage" : { - "description" : "Lineage information of how upstream columns were combined to get downstream column.", - "type" : "array", - "items" : { - "$ref" : "#/definitions/columnLineage" + "columnsLineage": { + "description": "Lineage information of how upstream columns were combined to get downstream column.", + "type": "array", + "items": { + "$ref": "#/definitions/columnLineage" } }, - "pipeline" : { + "pipeline": { "description": "Pipeline where the sqlQuery is periodically run.", - "$ref" : "../type/entityReference.json" + "$ref": "../type/entityReference.json" } - }, - "required": ["sqlQuery", "columnsLineage"] + } }, "edge": { "description": "Edge in the lineage graph from one entity to another by entity IDs.", diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py index b968ec70e83..58c6ff04417 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py @@ -43,7 +43,7 @@ from metadata.generated.schema.entity.services.pipelineService import ( PipelineService, PipelineServiceType, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.helpers import datetime_to_ts @@ -348,31 +348,27 @@ def parse_lineage( airflow_service_entity=airflow_service_entity, metadata=metadata, ) + lineage_details = LineageDetails( + pipeline=EntityReference(id=pipeline.id, type="pipeline") + ) operator.log.info("Parsing Lineage") - for table in inlets if inlets else []: - table_entity = metadata.get_by_name(entity=Table, fqn=table) - operator.log.debug(f"from entity {table_entity}") - lineage = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference(id=table_entity.id, type="table"), - toEntity=EntityReference(id=pipeline.id, type="pipeline"), + for from_table in inlets if inlets else []: + from_entity = metadata.get_by_name(entity=Table, fqn=from_table) + operator.log.debug(f"from entity {from_entity}") + for to_table in outlets if outlets else []: + to_entity = metadata.get_by_name(entity=Table, fqn=to_table) + operator.log.debug(f"To entity {to_entity}") + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=from_entity.id, type="table"), + toEntity=EntityReference(id=to_entity.id, type="table"), + ) ) - ) - operator.log.debug(f"From lineage {lineage}") - metadata.add_lineage(lineage) - - for table in outlets if outlets else []: - table_entity = metadata.get_by_name(entity=Table, fqn=table) - operator.log.debug(f"To entity {table_entity}") - lineage = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference(id=pipeline.id, type="pipeline"), - toEntity=EntityReference(id=table_entity.id, type="table"), - ) - ) - operator.log.debug(f"To lineage {lineage}") - metadata.add_lineage(lineage) + if lineage_details: + lineage.edge.lineageDetails = lineage_details + operator.log.debug(f"Lineage {lineage}") + metadata.add_lineage(lineage) return pipeline diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py index 82ef93534ad..f9c34aeb347 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py @@ -39,7 +39,7 @@ from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus @@ -194,6 +194,10 @@ class AirbyteSource(Source[CreatePipelineRequest]): if not source_service or not destination_service: return + lineage_details = LineageDetails( + pipeline=EntityReference(id=pipeline_entity.id, type="pipeline") + ) + for task in connection.get("syncCatalog", {}).get("streams") or []: stream = task.get("stream") from_fqn = fqn.build( @@ -214,23 +218,21 @@ class AirbyteSource(Source[CreatePipelineRequest]): service_name=destination_connection.get("name"), ) - if not from_fqn and not to_fqn: - continue - from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn) to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn) - yield AddLineageRequest( + + if not from_entity or not to_entity: + continue + + lineage = AddLineageRequest( edge=EntitiesEdge( fromEntity=EntityReference(id=from_entity.id, type="table"), - toEntity=EntityReference(id=pipeline_entity.id, type="pipeline"), - ) - ) - yield AddLineageRequest( - edge=EntitiesEdge( toEntity=EntityReference(id=to_entity.id, type="table"), - fromEntity=EntityReference(id=pipeline_entity.id, type="pipeline"), ) ) + if lineage_details: + lineage.edge.lineageDetails = lineage_details + yield lineage def next_record(self) -> Iterable[Entity]: """ diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index 741102b2cb8..fe413a91748 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -47,7 +47,7 @@ from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus @@ -287,29 +287,23 @@ class AirflowSource(Source[CreatePipelineRequest]): :return: Lineage from inlets and outlets """ dag: SerializedDAG = serialized_dag.dag - + lineage_details = LineageDetails( + pipeline=EntityReference(id=pipeline_entity.id, type="pipeline") + ) for task in dag.tasks: - for table_fqn in self.get_inlets(task) or []: - table_entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn) - yield AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference(id=table_entity.id, type="table"), - toEntity=EntityReference( - id=pipeline_entity.id, type="pipeline" - ), + for from_fqn in self.get_inlets(task) or []: + from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn) + for to_fqn in self.get_outlets(task) or []: + to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn) + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=from_entity.id, type="table"), + toEntity=EntityReference(id=to_entity.id, type="table"), + ) ) - ) - - for table_fqn in self.get_outlets(task) or []: - table_entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn) - yield AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=pipeline_entity.id, type="pipeline" - ), - toEntity=EntityReference(id=table_entity.id, type="table"), - ) - ) + if lineage_details: + lineage.edge.lineageDetails = lineage_details + yield lineage def next_record(self) -> Iterable[Entity]: """