fixed lineage error (#16790)

This commit is contained in:
Imri Paran 2024-06-25 14:36:13 +02:00 committed by GitHub
parent 4395455d18
commit 46560e0b47
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -251,35 +251,56 @@ def _build_table_lineage(
query: str,
column_lineage_map: dict,
lineage_source: LineageSource = LineageSource.QueryLineage,
) -> Iterable[Either[AddLineageRequest]]:
) -> Either[AddLineageRequest]:
"""
Prepare the lineage request generator
Args:
from_entity (Table): entity link comes from
to_entity (Table): entity to link to
from_table_raw_name (str): table entity raw name we link from
to_table_raw_name (str): table entity raw name we link to
query (str): query
column_lineage_map (dict): map of the column lineage
lineage_source (LineageSource): lineage source
Returns:
Either[AddLineageRequest] with the lineage request or an error
"""
col_lineage = get_column_lineage(
to_entity=to_entity,
to_table_raw_name=str(to_table_raw_name),
from_entity=from_entity,
from_table_raw_name=str(from_table_raw_name),
column_lineage_map=column_lineage_map,
)
lineage_details = LineageDetails(sqlQuery=query, source=lineage_source)
if col_lineage:
lineage_details.columnsLineage = col_lineage
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.root,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.root,
type="table",
),
try:
col_lineage = get_column_lineage(
to_entity=to_entity,
to_table_raw_name=str(to_table_raw_name),
from_entity=from_entity,
from_table_raw_name=str(from_table_raw_name),
column_lineage_map=column_lineage_map,
)
lineage_details = LineageDetails(sqlQuery=query, source=lineage_source)
if col_lineage:
lineage_details.columnsLineage = col_lineage
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.root,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.root,
type="table",
),
)
)
if lineage_details:
lineage.edge.lineageDetails = lineage_details
return Either(right=lineage)
except Exception as e:
return Either(
left=StackTraceError(
name="Lineage",
error=f"Error creating lineage for tables [{from_table_raw_name}] and [{to_table_raw_name}]: {e}",
stackTrace=traceback.format_exc(),
)
)
)
if lineage_details:
lineage.edge.lineageDetails = lineage_details
yield Either(right=lineage)
# pylint: disable=too-many-arguments,too-many-locals
@ -325,15 +346,16 @@ def _create_lineage_by_table_name(
for from_entity, to_entity in itertools.product(
from_table_entities, to_table_entities
):
yield from _build_table_lineage(
to_entity=to_entity,
from_entity=from_entity,
to_table_raw_name=to_table,
from_table_raw_name=from_table,
query=query,
column_lineage_map=column_lineage_map,
lineage_source=lineage_source,
)
if to_entity and from_entity:
yield _build_table_lineage(
to_entity=to_entity,
from_entity=from_entity,
to_table_raw_name=to_table,
from_table_raw_name=from_table,
query=query,
column_lineage_map=column_lineage_map,
lineage_source=lineage_source,
)
except Exception as exc:
yield Either(