diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index c4cf92666ba..92813459af5 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -11,6 +11,7 @@ """ Databricks Unity Catalog Lineage Source Module """ +import traceback from typing import Iterable, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -117,6 +118,54 @@ class UnitycatalogLineageSource(Source): ) return None + def _handle_upstream_table( + self, + table_streams: LineageTableStreams, + table: Table, + databricks_table_fqn: str, + ) -> Iterable[Either[AddLineageRequest]]: + for upstream_table in table_streams.upstream_tables: + try: + if not upstream_table.name: + continue + from_entity_fqn = fqn.build( + metadata=self.metadata, + entity_type=Table, + database_name=upstream_table.catalog_name, + schema_name=upstream_table.schema_name, + table_name=upstream_table.name, + service_name=self.config.serviceName, + ) + + from_entity = self.metadata.get_by_name( + entity=Table, fqn=from_entity_fqn + ) + if from_entity: + lineage_details = self._get_lineage_details( + from_table=from_entity, + to_table=table, + databricks_table_fqn=databricks_table_fqn, + ) + yield Either( + left=None, + right=AddLineageRequest( + edge=EntitiesEdge( + toEntity=EntityReference(id=table.id, type="table"), + fromEntity=EntityReference( + id=from_entity.id, type="table" + ), + lineageDetails=lineage_details, + ) + ), + ) + except Exception: + logger.debug( + "Error while processing lineage for " + f"{upstream_table.catalog_name}.{upstream_table.schema_name}.{upstream_table.name}" + f" -> {databricks_table_fqn}" + ) + logger.debug(traceback.format_exc()) + def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: """ Based on the query logs, prepare the lineage @@ -133,37 +182,9 @@ class UnitycatalogLineageSource(Source): table_streams: LineageTableStreams = self.client.get_table_lineage( databricks_table_fqn ) - for upstream_table in table_streams.upstream_tables: - from_entity_fqn = fqn.build( - metadata=self.metadata, - entity_type=Table, - database_name=upstream_table.catalog_name, - schema_name=upstream_table.schema_name, - table_name=upstream_table.name, - service_name=self.config.serviceName, - ) - - from_entity = self.metadata.get_by_name( - entity=Table, fqn=from_entity_fqn - ) - if from_entity: - lineage_details = self._get_lineage_details( - from_table=from_entity, - to_table=table, - databricks_table_fqn=databricks_table_fqn, - ) - yield Either( - left=None, - right=AddLineageRequest( - edge=EntitiesEdge( - toEntity=EntityReference(id=table.id, type="table"), - fromEntity=EntityReference( - id=from_entity.id, type="table" - ), - lineageDetails=lineage_details, - ) - ), - ) + yield from self._handle_upstream_table( + table_streams, table, databricks_table_fqn + ) def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection)