From c8864a95abd7601ea1d33eb4807c6cbb021965a4 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Mon, 25 Aug 2025 15:45:51 +0530 Subject: [PATCH] MINOR: Improve UC lineage exception handling (#23081) (cherry picked from commit 7a6d5cd2fb483705b6e583f032fb24050196aab7) --- .../metadata/ingestion/lineage/sql_lineage.py | 13 ++++- .../source/database/unitycatalog/lineage.py | 55 ++++++++++++------- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index c9975873a26..d01be900662 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -63,9 +63,16 @@ def get_column_fqn(table_entity: Table, column: str) -> Optional[str]: """ if not table_entity: return None - for tbl_column in table_entity.columns: - if column.lower() == tbl_column.name.root.lower(): - return tbl_column.fullyQualifiedName.root + for tbl_column in table_entity.columns or []: + try: + if column.lower() == tbl_column.name.root.lower(): + return tbl_column.fullyQualifiedName.root + except Exception as e: + logger.debug(traceback.format_exc()) + logger.debug( + f"Error getting column FQN for column [{column}] in" + f"table [{table_entity.fullyQualifiedName}] for column [{tbl_column}]: {e}" + ) return None diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index 1a2a8c15b81..36fca426a84 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -93,29 +93,36 @@ class UnitycatalogLineageSource(Source): def _get_lineage_details( self, from_table: Table, to_table: Table, databricks_table_fqn: str ) -> Optional[LineageDetails]: - col_lineage = [] - for column in to_table.columns: - column_streams = self.client.get_column_lineage( - databricks_table_fqn, column_name=column.name.root - ) - from_columns = [] - for col in column_streams.upstream_cols: - col_fqn = get_column_fqn(from_table, col.name) - if col_fqn: - from_columns.append(col_fqn) - - if from_columns: - col_lineage.append( - ColumnLineage( - fromColumns=from_columns, - toColumn=column.fullyQualifiedName.root, - ) + try: + col_lineage = [] + for column in to_table.columns: + column_streams = self.client.get_column_lineage( + databricks_table_fqn, column_name=column.name.root ) - if col_lineage: - return LineageDetails( - columnsLineage=col_lineage, source=LineageSource.QueryLineage + from_columns = [] + for col in column_streams.upstream_cols: + col_fqn = get_column_fqn(from_table, col.name) + if col_fqn: + from_columns.append(col_fqn) + + if from_columns: + col_lineage.append( + ColumnLineage( + fromColumns=from_columns, + toColumn=column.fullyQualifiedName.root, + ) + ) + if col_lineage: + return LineageDetails( + columnsLineage=col_lineage, source=LineageSource.QueryLineage + ) + return None + except Exception as exc: + logger.debug( + f"Error computing column lineage for {to_table.fullyQualifiedName.root} - {exc}" ) - return None + logger.debug(traceback.format_exc()) + return None def _handle_upstream_table( self, @@ -157,6 +164,12 @@ class UnitycatalogLineageSource(Source): ) ), ) + else: + logger.debug( + f"Unable to find upstream entity for " + f"{upstream_table.catalog_name}.{upstream_table.schema_name}.{upstream_table.name}" + f" -> {databricks_table_fqn}" + ) except Exception: logger.debug( "Error while processing lineage for "