MINOR: Improve UC lineage exception handling (#23081)

(cherry picked from commit 7a6d5cd2fb483705b6e583f032fb24050196aab7)
This commit is contained in:
Mayur Singal 2025-08-25 15:45:51 +05:30 committed by OpenMetadata Release Bot
parent 7d2fe2ddf7
commit c8864a95ab
2 changed files with 44 additions and 24 deletions

View File

@ -63,9 +63,16 @@ def get_column_fqn(table_entity: Table, column: str) -> Optional[str]:
"""
if not table_entity:
return None
for tbl_column in table_entity.columns:
if column.lower() == tbl_column.name.root.lower():
return tbl_column.fullyQualifiedName.root
for tbl_column in table_entity.columns or []:
try:
if column.lower() == tbl_column.name.root.lower():
return tbl_column.fullyQualifiedName.root
except Exception as e:
logger.debug(traceback.format_exc())
logger.debug(
f"Error getting column FQN for column [{column}] in"
f"table [{table_entity.fullyQualifiedName}] for column [{tbl_column}]: {e}"
)
return None

View File

@ -93,29 +93,36 @@ class UnitycatalogLineageSource(Source):
def _get_lineage_details(
self, from_table: Table, to_table: Table, databricks_table_fqn: str
) -> Optional[LineageDetails]:
col_lineage = []
for column in to_table.columns:
column_streams = self.client.get_column_lineage(
databricks_table_fqn, column_name=column.name.root
)
from_columns = []
for col in column_streams.upstream_cols:
col_fqn = get_column_fqn(from_table, col.name)
if col_fqn:
from_columns.append(col_fqn)
if from_columns:
col_lineage.append(
ColumnLineage(
fromColumns=from_columns,
toColumn=column.fullyQualifiedName.root,
)
try:
col_lineage = []
for column in to_table.columns:
column_streams = self.client.get_column_lineage(
databricks_table_fqn, column_name=column.name.root
)
if col_lineage:
return LineageDetails(
columnsLineage=col_lineage, source=LineageSource.QueryLineage
from_columns = []
for col in column_streams.upstream_cols:
col_fqn = get_column_fqn(from_table, col.name)
if col_fqn:
from_columns.append(col_fqn)
if from_columns:
col_lineage.append(
ColumnLineage(
fromColumns=from_columns,
toColumn=column.fullyQualifiedName.root,
)
)
if col_lineage:
return LineageDetails(
columnsLineage=col_lineage, source=LineageSource.QueryLineage
)
return None
except Exception as exc:
logger.debug(
f"Error computing column lineage for {to_table.fullyQualifiedName.root} - {exc}"
)
return None
logger.debug(traceback.format_exc())
return None
def _handle_upstream_table(
self,
@ -157,6 +164,12 @@ class UnitycatalogLineageSource(Source):
)
),
)
else:
logger.debug(
f"Unable to find upstream entity for "
f"{upstream_table.catalog_name}.{upstream_table.schema_name}.{upstream_table.name}"
f" -> {databricks_table_fqn}"
)
except Exception:
logger.debug(
"Error while processing lineage for "