MINOR: Fix unity catalog lineage - handle errors (#19791)

* MINOR: Fix unity catalog lineage - hanle errors

* lint
This commit is contained in:
Mayur Singal 2025-02-14 10:45:31 +05:30 committed by GitHub
parent 584816fc33
commit c8e24e5f3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -11,6 +11,7 @@
"""
Databricks Unity Catalog Lineage Source Module
"""
import traceback
from typing import Iterable, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -117,6 +118,54 @@ class UnitycatalogLineageSource(Source):
)
return None
def _handle_upstream_table(
self,
table_streams: LineageTableStreams,
table: Table,
databricks_table_fqn: str,
) -> Iterable[Either[AddLineageRequest]]:
for upstream_table in table_streams.upstream_tables:
try:
if not upstream_table.name:
continue
from_entity_fqn = fqn.build(
metadata=self.metadata,
entity_type=Table,
database_name=upstream_table.catalog_name,
schema_name=upstream_table.schema_name,
table_name=upstream_table.name,
service_name=self.config.serviceName,
)
from_entity = self.metadata.get_by_name(
entity=Table, fqn=from_entity_fqn
)
if from_entity:
lineage_details = self._get_lineage_details(
from_table=from_entity,
to_table=table,
databricks_table_fqn=databricks_table_fqn,
)
yield Either(
left=None,
right=AddLineageRequest(
edge=EntitiesEdge(
toEntity=EntityReference(id=table.id, type="table"),
fromEntity=EntityReference(
id=from_entity.id, type="table"
),
lineageDetails=lineage_details,
)
),
)
except Exception:
logger.debug(
"Error while processing lineage for "
f"{upstream_table.catalog_name}.{upstream_table.schema_name}.{upstream_table.name}"
f" -> {databricks_table_fqn}"
)
logger.debug(traceback.format_exc())
def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]:
"""
Based on the query logs, prepare the lineage
@ -133,37 +182,9 @@ class UnitycatalogLineageSource(Source):
table_streams: LineageTableStreams = self.client.get_table_lineage(
databricks_table_fqn
)
for upstream_table in table_streams.upstream_tables:
from_entity_fqn = fqn.build(
metadata=self.metadata,
entity_type=Table,
database_name=upstream_table.catalog_name,
schema_name=upstream_table.schema_name,
table_name=upstream_table.name,
service_name=self.config.serviceName,
)
from_entity = self.metadata.get_by_name(
entity=Table, fqn=from_entity_fqn
)
if from_entity:
lineage_details = self._get_lineage_details(
from_table=from_entity,
to_table=table,
databricks_table_fqn=databricks_table_fqn,
)
yield Either(
left=None,
right=AddLineageRequest(
edge=EntitiesEdge(
toEntity=EntityReference(id=table.id, type="table"),
fromEntity=EntityReference(
id=from_entity.id, type="table"
),
lineageDetails=lineage_details,
)
),
)
yield from self._handle_upstream_table(
table_streams, table, databricks_table_fqn
)
def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection)