From 46560e0b47a2ab866a2613a936b5f833c71ff38e Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Tue, 25 Jun 2024 14:36:13 +0200 Subject: [PATCH] fixed lineage error (#16790) --- .../metadata/ingestion/lineage/sql_lineage.py | 90 ++++++++++++------- 1 file changed, 56 insertions(+), 34 deletions(-) diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index bc7d13c3eee..db2011f3370 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -251,35 +251,56 @@ def _build_table_lineage( query: str, column_lineage_map: dict, lineage_source: LineageSource = LineageSource.QueryLineage, -) -> Iterable[Either[AddLineageRequest]]: +) -> Either[AddLineageRequest]: """ Prepare the lineage request generator + + Args: + from_entity (Table): entity link comes from + to_entity (Table): entity to link to + from_table_raw_name (str): table entity raw name we link from + to_table_raw_name (str): table entity raw name we link to + query (str): query + column_lineage_map (dict): map of the column lineage + lineage_source (LineageSource): lineage source + + Returns: + Either[AddLineageRequest] with the lineage request or an error """ - col_lineage = get_column_lineage( - to_entity=to_entity, - to_table_raw_name=str(to_table_raw_name), - from_entity=from_entity, - from_table_raw_name=str(from_table_raw_name), - column_lineage_map=column_lineage_map, - ) - lineage_details = LineageDetails(sqlQuery=query, source=lineage_source) - if col_lineage: - lineage_details.columnsLineage = col_lineage - lineage = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=from_entity.id.root, - type="table", - ), - toEntity=EntityReference( - id=to_entity.id.root, - type="table", - ), + try: + col_lineage = get_column_lineage( + to_entity=to_entity, + to_table_raw_name=str(to_table_raw_name), + from_entity=from_entity, + from_table_raw_name=str(from_table_raw_name), + column_lineage_map=column_lineage_map, + ) + lineage_details = LineageDetails(sqlQuery=query, source=lineage_source) + if col_lineage: + lineage_details.columnsLineage = col_lineage + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.root, + type="table", + ), + toEntity=EntityReference( + id=to_entity.id.root, + type="table", + ), + ) + ) + if lineage_details: + lineage.edge.lineageDetails = lineage_details + return Either(right=lineage) + except Exception as e: + return Either( + left=StackTraceError( + name="Lineage", + error=f"Error creating lineage for tables [{from_table_raw_name}] and [{to_table_raw_name}]: {e}", + stackTrace=traceback.format_exc(), + ) ) - ) - if lineage_details: - lineage.edge.lineageDetails = lineage_details - yield Either(right=lineage) # pylint: disable=too-many-arguments,too-many-locals @@ -325,15 +346,16 @@ def _create_lineage_by_table_name( for from_entity, to_entity in itertools.product( from_table_entities, to_table_entities ): - yield from _build_table_lineage( - to_entity=to_entity, - from_entity=from_entity, - to_table_raw_name=to_table, - from_table_raw_name=from_table, - query=query, - column_lineage_map=column_lineage_map, - lineage_source=lineage_source, - ) + if to_entity and from_entity: + yield _build_table_lineage( + to_entity=to_entity, + from_entity=from_entity, + to_table_raw_name=to_table, + from_table_raw_name=from_table, + query=query, + column_lineage_map=column_lineage_map, + lineage_source=lineage_source, + ) except Exception as exc: yield Either(