Added pipeline metadata to the edge lines (#5552)

Added pipeline metadata to the edge lines (#5552)
This commit is contained in:
Onkar Ravgan 2022-06-21 20:52:37 +05:30 committed by GitHub
parent a14de6eee1
commit d717f8cc17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 91 deletions

View File

@ -13,8 +13,6 @@
package org.openmetadata.catalog.jdbi3; package org.openmetadata.catalog.jdbi3;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -75,7 +73,7 @@ public class LineageRepository {
private String validateLineageDetails(EntityReference from, EntityReference to, LineageDetails details) private String validateLineageDetails(EntityReference from, EntityReference to, LineageDetails details)
throws IOException { throws IOException {
if (details == null || listOrEmpty(details.getColumnsLineage()).isEmpty()) { if (details == null) {
return null; return null;
} }
@ -86,6 +84,7 @@ public class LineageRepository {
Table fromTable = dao.tableDAO().findEntityById(from.getId()); Table fromTable = dao.tableDAO().findEntityById(from.getId());
Table toTable = dao.tableDAO().findEntityById(to.getId()); Table toTable = dao.tableDAO().findEntityById(to.getId());
if (columnsLineage != null) {
for (ColumnLineage columnLineage : columnsLineage) { for (ColumnLineage columnLineage : columnsLineage) {
for (String fromColumn : columnLineage.getFromColumns()) { for (String fromColumn : columnLineage.getFromColumns()) {
// From column belongs to the fromNode // From column belongs to the fromNode
@ -98,6 +97,7 @@ public class LineageRepository {
} }
TableRepository.validateColumnFQN(toTable, columnLineage.getToColumn()); TableRepository.validateColumnFQN(toTable, columnLineage.getToColumn());
} }
}
return JsonUtils.pojoToJson(details); return JsonUtils.pojoToJson(details);
} }

View File

@ -7,46 +7,45 @@
"javaType": "org.openmetadata.catalog.type.EntityLineage", "javaType": "org.openmetadata.catalog.type.EntityLineage",
"definitions": { "definitions": {
"columnLineage": { "columnLineage": {
"type" : "object", "type": "object",
"properties": { "properties": {
"fromColumns" : { "fromColumns": {
"description": "One or more source columns identified by fully qualified column name used by transformation function to create destination column.", "description": "One or more source columns identified by fully qualified column name used by transformation function to create destination column.",
"type" : "array", "type": "array",
"items" : { "items": {
"$ref" : "../type/basic.json#/definitions/fullyQualifiedEntityName" "$ref": "../type/basic.json#/definitions/fullyQualifiedEntityName"
} }
}, },
"toColumn" : { "toColumn": {
"description": "Destination column identified by fully qualified column name created by the transformation of source columns.", "description": "Destination column identified by fully qualified column name created by the transformation of source columns.",
"$ref" : "../type/basic.json#/definitions/fullyQualifiedEntityName" "$ref": "../type/basic.json#/definitions/fullyQualifiedEntityName"
}, },
"function" : { "function": {
"description": "Transformation function applied to source columns to create destination column. That is `function(fromColumns) -> toColumn`.", "description": "Transformation function applied to source columns to create destination column. That is `function(fromColumns) -> toColumn`.",
"$ref" : "../type/basic.json#/definitions/sqlFunction" "$ref": "../type/basic.json#/definitions/sqlFunction"
} }
} }
}, },
"lineageDetails" : { "lineageDetails": {
"description" : "Lineage details including sqlQuery + pipeline + columnLineage.", "description": "Lineage details including sqlQuery + pipeline + columnLineage.",
"type" : "object", "type": "object",
"properties": { "properties": {
"sqlQuery" : { "sqlQuery": {
"description": "SQL used for transformation.", "description": "SQL used for transformation.",
"$ref" : "../type/basic.json#/definitions/sqlQuery" "$ref": "../type/basic.json#/definitions/sqlQuery"
}, },
"columnsLineage" : { "columnsLineage": {
"description" : "Lineage information of how upstream columns were combined to get downstream column.", "description": "Lineage information of how upstream columns were combined to get downstream column.",
"type" : "array", "type": "array",
"items" : { "items": {
"$ref" : "#/definitions/columnLineage" "$ref": "#/definitions/columnLineage"
} }
}, },
"pipeline" : { "pipeline": {
"description": "Pipeline where the sqlQuery is periodically run.", "description": "Pipeline where the sqlQuery is periodically run.",
"$ref" : "../type/entityReference.json" "$ref": "../type/entityReference.json"
}
} }
},
"required": ["sqlQuery", "columnsLineage"]
}, },
"edge": { "edge": {
"description": "Edge in the lineage graph from one entity to another by entity IDs.", "description": "Edge in the lineage graph from one entity to another by entity IDs.",

View File

@ -43,7 +43,7 @@ from metadata.generated.schema.entity.services.pipelineService import (
PipelineService, PipelineService,
PipelineServiceType, PipelineServiceType,
) )
from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.helpers import datetime_to_ts from metadata.utils.helpers import datetime_to_ts
@ -348,30 +348,26 @@ def parse_lineage(
airflow_service_entity=airflow_service_entity, airflow_service_entity=airflow_service_entity,
metadata=metadata, metadata=metadata,
) )
lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline.id, type="pipeline")
)
operator.log.info("Parsing Lineage") operator.log.info("Parsing Lineage")
for table in inlets if inlets else []: for from_table in inlets if inlets else []:
table_entity = metadata.get_by_name(entity=Table, fqn=table) from_entity = metadata.get_by_name(entity=Table, fqn=from_table)
operator.log.debug(f"from entity {table_entity}") operator.log.debug(f"from entity {from_entity}")
for to_table in outlets if outlets else []:
to_entity = metadata.get_by_name(entity=Table, fqn=to_table)
operator.log.debug(f"To entity {to_entity}")
lineage = AddLineageRequest( lineage = AddLineageRequest(
edge=EntitiesEdge( edge=EntitiesEdge(
fromEntity=EntityReference(id=table_entity.id, type="table"), fromEntity=EntityReference(id=from_entity.id, type="table"),
toEntity=EntityReference(id=pipeline.id, type="pipeline"), toEntity=EntityReference(id=to_entity.id, type="table"),
) )
) )
operator.log.debug(f"From lineage {lineage}") if lineage_details:
metadata.add_lineage(lineage) lineage.edge.lineageDetails = lineage_details
operator.log.debug(f"Lineage {lineage}")
for table in outlets if outlets else []:
table_entity = metadata.get_by_name(entity=Table, fqn=table)
operator.log.debug(f"To entity {table_entity}")
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=pipeline.id, type="pipeline"),
toEntity=EntityReference(id=table_entity.id, type="table"),
)
)
operator.log.debug(f"To lineage {lineage}")
metadata.add_lineage(lineage) metadata.add_lineage(lineage)
return pipeline return pipeline

View File

@ -39,7 +39,7 @@ from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
@ -194,6 +194,10 @@ class AirbyteSource(Source[CreatePipelineRequest]):
if not source_service or not destination_service: if not source_service or not destination_service:
return return
lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline")
)
for task in connection.get("syncCatalog", {}).get("streams") or []: for task in connection.get("syncCatalog", {}).get("streams") or []:
stream = task.get("stream") stream = task.get("stream")
from_fqn = fqn.build( from_fqn = fqn.build(
@ -214,23 +218,21 @@ class AirbyteSource(Source[CreatePipelineRequest]):
service_name=destination_connection.get("name"), service_name=destination_connection.get("name"),
) )
if not from_fqn and not to_fqn:
continue
from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn) from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn)
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn) to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn)
yield AddLineageRequest(
if not from_entity or not to_entity:
continue
lineage = AddLineageRequest(
edge=EntitiesEdge( edge=EntitiesEdge(
fromEntity=EntityReference(id=from_entity.id, type="table"), fromEntity=EntityReference(id=from_entity.id, type="table"),
toEntity=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
)
yield AddLineageRequest(
edge=EntitiesEdge(
toEntity=EntityReference(id=to_entity.id, type="table"), toEntity=EntityReference(id=to_entity.id, type="table"),
fromEntity=EntityReference(id=pipeline_entity.id, type="pipeline"),
) )
) )
if lineage_details:
lineage.edge.lineageDetails = lineage_details
yield lineage
def next_record(self) -> Iterable[Entity]: def next_record(self) -> Iterable[Entity]:
""" """

View File

@ -47,7 +47,7 @@ from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
@ -287,29 +287,23 @@ class AirflowSource(Source[CreatePipelineRequest]):
:return: Lineage from inlets and outlets :return: Lineage from inlets and outlets
""" """
dag: SerializedDAG = serialized_dag.dag dag: SerializedDAG = serialized_dag.dag
lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline")
)
for task in dag.tasks: for task in dag.tasks:
for table_fqn in self.get_inlets(task) or []: for from_fqn in self.get_inlets(task) or []:
table_entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn) from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn)
yield AddLineageRequest( for to_fqn in self.get_outlets(task) or []:
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn)
lineage = AddLineageRequest(
edge=EntitiesEdge( edge=EntitiesEdge(
fromEntity=EntityReference(id=table_entity.id, type="table"), fromEntity=EntityReference(id=from_entity.id, type="table"),
toEntity=EntityReference( toEntity=EntityReference(id=to_entity.id, type="table"),
id=pipeline_entity.id, type="pipeline"
),
)
)
for table_fqn in self.get_outlets(task) or []:
table_entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn)
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=pipeline_entity.id, type="pipeline"
),
toEntity=EntityReference(id=table_entity.id, type="table"),
) )
) )
if lineage_details:
lineage.edge.lineageDetails = lineage_details
yield lineage
def next_record(self) -> Iterable[Entity]: def next_record(self) -> Iterable[Entity]:
""" """