Minor: Improve UC owner ingestion (#21741)

* Minor: Improve UC owner ingestion

* lint
This commit is contained in:
Mayur Singal 2025-06-13 03:18:29 +05:30 committed by GitHub
parent 6fd5778219
commit d20d278c4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -340,7 +340,7 @@ class UnitycatalogSource(
schema_name=schema_name,
)
),
owners=self.get_owner_ref(table_name),
owners=self.get_owner_ref(table.owner),
)
yield Either(right=table_request)
@ -403,6 +403,7 @@ class UnitycatalogSource(
)
if referred_table_fqn:
for parent_column in column.parent_columns:
# pylint: disable=protected-access
col_fqn = fqn._build(referred_table_fqn, parent_column, quote=False)
if col_fqn:
referred_column_fqns.append(FullyQualifiedEntityName(col_fqn))
@ -419,6 +420,7 @@ class UnitycatalogSource(
return table_constraints
# pylint: disable=arguments-differ
def update_table_constraints(
self, table_constraints, foreign_columns, columns
) -> List[TableConstraint]:
@ -524,18 +526,25 @@ class UnitycatalogSource(
def close(self):
"""Nothing to close"""
def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]:
# pylint: disable=arguments-renamed
def get_owner_ref(
self, table_owner: Optional[str]
) -> Optional[EntityReferenceList]:
"""
Method to process the table owners
"""
if self.source_config.includeOwners is False:
return None
try:
full_table_name = f"{self.context.get().database}.{self.context.get().database_schema}.{table_name}"
owner = self.api_client.get_owner_info(full_table_name)
if not owner:
return
owner_ref = self.metadata.get_reference_by_email(email=owner)
if not table_owner or not isinstance(table_owner, str):
return None
owner_ref = self.metadata.get_reference_by_email(email=table_owner)
if owner_ref:
return owner_ref
table_name = table_owner.split("@")[0]
owner_ref = self.metadata.get_reference_by_name(name=table_name)
return owner_ref
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing owner for table {table_name}: {exc}")
return
logger.warning(f"Error processing owner {table_owner}: {exc}")
return None