Fix #16573: get table owners for databaricks & unitycatalog tables (#17282)

This commit is contained in:
harshsoni2024 2024-08-10 10:45:22 +05:30 committed by GitHub
parent 98229591b5
commit 1b04f1fb37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 82 additions and 0 deletions

View File

@ -15,6 +15,8 @@ import traceback
from copy import deepcopy from copy import deepcopy
from typing import Iterable, Optional, Tuple, Union from typing import Iterable, Optional, Tuple, Union
from pydantic import EmailStr
from pydantic_core import PydanticCustomError
from pyhive.sqlalchemy_hive import _type_map from pyhive.sqlalchemy_hive import _type_map
from sqlalchemy import types, util from sqlalchemy import types, util
from sqlalchemy.engine import reflection from sqlalchemy.engine import reflection
@ -35,6 +37,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.api.models import Either from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
@ -661,3 +664,41 @@ class DatabricksSource(ExternalTableLineageMixin, CommonDbSourceService, MultiDB
f"Table description error for table [{schema_name}.{table_name}]: {exc}" f"Table description error for table [{schema_name}.{table_name}]: {exc}"
) )
return description return description
def _filter_owner_name(self, owner_name: str) -> str:
"""remove unnecessary keyword from name"""
pattern = r"\(Unknown\)"
filtered_name = re.sub(pattern, "", owner_name).strip()
return filtered_name
def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]:
"""
Method to process the table owners
"""
try:
query = DATABRICKS_GET_TABLE_COMMENTS.format(
schema_name=self.context.get().database_schema,
table_name=table_name,
)
result = self.connection.engine.execute(query)
owner = None
for row in result:
row_dict = dict(row)
if row_dict.get("col_name") == "Owner":
owner = row_dict.get("data_type")
break
if not owner:
return
owner = self._filter_owner_name(owner)
owner_ref = None
try:
owner_email = EmailStr._validate(owner)
owner_ref = self.metadata.get_reference_by_email(email=owner_email)
except PydanticCustomError:
owner_ref = self.metadata.get_reference_by_name(name=owner)
return owner_ref
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing owner for table {table_name}: {exc}")
return

View File

@ -14,6 +14,8 @@ Client to interact with databricks apis
import json import json
import traceback import traceback
from requests import HTTPError
from metadata.ingestion.source.database.databricks.client import ( from metadata.ingestion.source.database.databricks.client import (
API_TIMEOUT, API_TIMEOUT,
DatabricksClient, DatabricksClient,
@ -27,6 +29,7 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get" TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get"
COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get" COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get"
TABLES_PATH = "/unity-catalog/tables"
class UnityCatalogClient(DatabricksClient): class UnityCatalogClient(DatabricksClient):
@ -85,3 +88,21 @@ class UnityCatalogClient(DatabricksClient):
logger.error(exc) logger.error(exc)
return LineageColumnStreams() return LineageColumnStreams()
def get_owner_info(self, full_table_name: str) -> str:
"""
get owner info from tables API
"""
try:
response = self.client.get(
f"{self.base_url}{TABLES_PATH}/{full_table_name}",
headers=self.headers,
timeout=API_TIMEOUT,
)
if response.status_code != 200:
raise HTTPError(response.text)
return response.json().get("owner")
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)
return

View File

@ -50,6 +50,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.api.models import Either from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
@ -61,6 +62,7 @@ from metadata.ingestion.source.database.external_table_lineage_mixin import (
) )
from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure
from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient
from metadata.ingestion.source.database.unitycatalog.connection import get_connection from metadata.ingestion.source.database.unitycatalog.connection import get_connection
from metadata.ingestion.source.database.unitycatalog.models import ( from metadata.ingestion.source.database.unitycatalog.models import (
ColumnJson, ColumnJson,
@ -99,6 +101,7 @@ class UnitycatalogSource(
) )
self.external_location_map = {} self.external_location_map = {}
self.client = get_connection(self.service_connection) self.client = get_connection(self.service_connection)
self.api_client = UnityCatalogClient(self.service_connection)
self.connection_obj = self.client self.connection_obj = self.client
self.table_constraints = [] self.table_constraints = []
self.context.storage_location = None self.context.storage_location = None
@ -330,6 +333,7 @@ class UnitycatalogSource(
schema_name=schema_name, schema_name=schema_name,
) )
), ),
owners=self.get_owner_ref(table_name),
) )
yield Either(right=table_request) yield Either(right=table_request)
@ -537,3 +541,19 @@ class UnitycatalogSource(
def close(self): def close(self):
"""Nothing to close""" """Nothing to close"""
def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]:
"""
Method to process the table owners
"""
try:
full_table_name = f"{self.context.get().database}.{self.context.get().database_schema}.{table_name}"
owner = self.api_client.get_owner_info(full_table_name)
if not owner:
return
owner_ref = self.metadata.get_reference_by_email(email=owner)
return owner_ref
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing owner for table {table_name}: {exc}")
return