Refactor: Unity Catalog (#21801)

This commit is contained in:
Keshav Mohta 2025-06-20 16:04:34 +05:30 committed by GitHub
parent 2f7c6ef05e
commit 73ea60b898
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 238 additions and 67 deletions

View File

@ -36,6 +36,12 @@ from metadata.ingestion.connections.test_connections import test_connection_step
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient
from metadata.ingestion.source.database.unitycatalog.models import DatabricksTable from metadata.ingestion.source.database.unitycatalog.models import DatabricksTable
from metadata.ingestion.source.database.unitycatalog.queries import (
UNITY_CATALOG_GET_ALL_SCHEMA_TAGS,
UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS,
UNITY_CATALOG_GET_ALL_TABLE_TAGS,
UNITY_CATALOG_GET_CATALOGS_TAGS,
)
from metadata.utils.constants import THREE_MIN from metadata.utils.constants import THREE_MIN
from metadata.utils.db_utils import get_host_from_host_port from metadata.utils.db_utils import get_host_from_host_port
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
@ -111,6 +117,32 @@ def test_connection(
table_obj.name = table.name table_obj.name = table.name
break break
def get_tags(
service_connection: UnityCatalogConnection, table_obj: DatabricksTable
):
engine = get_sqlalchemy_connection(service_connection)
with engine.connect() as connection:
connection.execute(
UNITY_CATALOG_GET_CATALOGS_TAGS.format(
database=table_obj.catalog_name
).replace(";", " limit 1;")
)
connection.execute(
UNITY_CATALOG_GET_ALL_SCHEMA_TAGS.format(
database=table_obj.catalog_name
).replace(";", " limit 1;")
)
connection.execute(
UNITY_CATALOG_GET_ALL_TABLE_TAGS.format(
database=table_obj.catalog_name, schema=table_obj.schema_name
).replace(";", " limit 1;")
)
connection.execute(
UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS.format(
database=table_obj.catalog_name, schema=table_obj.schema_name
).replace(";", " limit 1;")
)
test_fn = { test_fn = {
"CheckAccess": connection.catalogs.list, "CheckAccess": connection.catalogs.list,
"GetDatabases": partial(get_catalogs, connection, table_obj), "GetDatabases": partial(get_catalogs, connection, table_obj),
@ -118,6 +150,7 @@ def test_connection(
"GetTables": partial(get_tables, connection, table_obj), "GetTables": partial(get_tables, connection, table_obj),
"GetViews": partial(get_tables, connection, table_obj), "GetViews": partial(get_tables, connection, table_obj),
"GetQueries": client.test_query_api_access, "GetQueries": client.test_query_api_access,
"GetTags": partial(get_tags, service_connection, table_obj),
} }
return test_connection_steps( return test_connection_steps(

View File

@ -65,20 +65,33 @@ from metadata.ingestion.source.database.external_table_lineage_mixin import (
from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure
from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient
from metadata.ingestion.source.database.unitycatalog.connection import get_connection from metadata.ingestion.source.database.unitycatalog.connection import (
get_connection,
get_sqlalchemy_connection,
)
from metadata.ingestion.source.database.unitycatalog.models import ( from metadata.ingestion.source.database.unitycatalog.models import (
ColumnJson, ColumnJson,
ElementType, ElementType,
ForeignConstrains, ForeignConstrains,
Type, Type,
) )
from metadata.ingestion.source.database.unitycatalog.queries import (
UNITY_CATALOG_GET_ALL_SCHEMA_TAGS,
UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS,
UNITY_CATALOG_GET_ALL_TABLE_TAGS,
UNITY_CATALOG_GET_CATALOGS_TAGS,
)
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.helpers import retry_with_docker_host from metadata.utils.helpers import retry_with_docker_host
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification
logger = ingestion_logger() logger = ingestion_logger()
UNITY_CATALOG_TAG = "UNITY CATALOG TAG"
UNITY_CATALOG_TAG_CLASSIFICATION = "UNITY CATALOG TAG CLASSIFICATION"
class UnitycatalogSource( class UnitycatalogSource(
ExternalTableLineageMixin, DatabaseServiceSource, MultiDBSource ExternalTableLineageMixin, DatabaseServiceSource, MultiDBSource
@ -179,10 +192,14 @@ class UnitycatalogSource(
From topology. From topology.
Prepare a database request and pass it to the sink Prepare a database request and pass it to the sink
""" """
catalog = self.client.catalogs.get(database_name)
yield Either( yield Either(
right=CreateDatabaseRequest( right=CreateDatabaseRequest(
name=database_name, name=database_name,
service=self.context.get().database_service, service=self.context.get().database_service,
owners=self.get_owner_ref(catalog.owner),
description=catalog.comment,
tags=self.get_database_tag_labels(database_name),
) )
) )
@ -227,6 +244,9 @@ class UnitycatalogSource(
From topology. From topology.
Prepare a database schema request and pass it to the sink Prepare a database schema request and pass it to the sink
""" """
schema = self.client.schemas.get(
full_name=f"{self.context.get().database}.{schema_name}"
)
yield Either( yield Either(
right=CreateDatabaseSchemaRequest( right=CreateDatabaseSchemaRequest(
name=EntityName(schema_name), name=EntityName(schema_name),
@ -238,6 +258,9 @@ class UnitycatalogSource(
database_name=self.context.get().database, database_name=self.context.get().database,
) )
), ),
description=schema.comment,
owners=self.get_owner_ref(schema.owner),
tags=self.get_schema_tag_labels(schema_name),
) )
) )
@ -315,7 +338,7 @@ class UnitycatalogSource(
(db_name, schema_name, table_name) (db_name, schema_name, table_name)
] = table.storage_location ] = table.storage_location
try: try:
columns = list(self.get_columns(table.columns)) columns = list(self.get_columns(table_name, table.columns))
( (
primary_constraints, primary_constraints,
foreign_constraints, foreign_constraints,
@ -341,6 +364,7 @@ class UnitycatalogSource(
) )
), ),
owners=self.get_owner_ref(table.owner), owners=self.get_owner_ref(table.owner),
tags=self.get_tag_labels(table_name),
) )
yield Either(right=table_request) yield Either(right=table_request)
@ -477,11 +501,12 @@ class UnitycatalogSource(
f"Unable to add description to complex datatypes for column [{column.name}]: {exc}" f"Unable to add description to complex datatypes for column [{column.name}]: {exc}"
) )
def get_columns(self, column_data: List[ColumnInfo]) -> Iterable[Column]: def get_columns(
self, table_name: str, column_data: List[ColumnInfo]
) -> Iterable[Column]:
""" """
process table regular columns info process table regular columns info
""" """
for column in column_data: for column in column_data:
parsed_string = {} parsed_string = {}
if column.type_text: if column.type_text:
@ -500,6 +525,9 @@ class UnitycatalogSource(
parsed_string["dataLength"] = parsed_string.get("dataLength", 1) parsed_string["dataLength"] = parsed_string.get("dataLength", 1)
if column.comment: if column.comment:
parsed_string["description"] = Markdown(column.comment) parsed_string["description"] = Markdown(column.comment)
parsed_string["tags"] = self.get_column_tag_labels(
table_name=table_name, column={"name": column.name}
)
parsed_column = Column(**parsed_string) parsed_column = Column(**parsed_string)
self.add_complex_datatype_descriptions( self.add_complex_datatype_descriptions(
column=parsed_column, column=parsed_column,
@ -507,10 +535,97 @@ class UnitycatalogSource(
) )
yield parsed_column yield parsed_column
def yield_database_tag(
self, database_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
"""Get Unity Catalog database/catalog tags using SQL query"""
query_tag_fqn_builder_mapping = (
(
UNITY_CATALOG_GET_CATALOGS_TAGS.format(database=database_name),
lambda tag: [self.context.get().database_service, database_name],
),
(
UNITY_CATALOG_GET_ALL_SCHEMA_TAGS.format(database=database_name),
lambda tag: [
self.context.get().database_service,
database_name,
tag.schema_name,
],
),
)
try:
with get_sqlalchemy_connection(
self.service_connection
).connect() as connection:
for query, tag_fqn_builder in query_tag_fqn_builder_mapping:
for tag in connection.execute(query):
yield from get_ometa_tag_and_classification(
tag_fqn=FullyQualifiedEntityName(
fqn._build(*tag_fqn_builder(tag))
), # pylint: disable=protected-access
tags=[tag.tag_value],
classification_name=tag.tag_name,
tag_description=UNITY_CATALOG_TAG,
classification_description=UNITY_CATALOG_TAG_CLASSIFICATION,
metadata=self.metadata,
system_tags=True,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error getting tags for catalog/schema {database_name}: {exc}"
)
def yield_tag( def yield_tag(
self, schema_name: str self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]: ) -> Iterable[Either[OMetaTagAndClassification]]:
"""No tags being processed""" """Get Unity Catalog schema tags using SQL query"""
database = self.context.get().database
query_tag_fqn_builder_mapping = (
(
UNITY_CATALOG_GET_ALL_TABLE_TAGS.format(
database=database, schema=schema_name
),
lambda tag: [
self.context.get().database_service,
database,
schema_name,
tag.table_name,
],
),
(
UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS.format(
database=database, schema=schema_name
),
lambda tag: [
self.context.get().database_service,
database,
schema_name,
tag.table_name,
tag.column_name,
],
),
)
try:
with get_sqlalchemy_connection(
self.service_connection
).connect() as connection:
for query, tag_fqn_builder in query_tag_fqn_builder_mapping:
for tag in connection.execute(query):
yield from get_ometa_tag_and_classification(
tag_fqn=FullyQualifiedEntityName(
fqn._build(*tag_fqn_builder(tag))
), # pylint: disable=protected-access
tags=[tag.tag_value],
classification_name=tag.tag_name,
tag_description=UNITY_CATALOG_TAG,
classification_description=UNITY_CATALOG_TAG_CLASSIFICATION,
metadata=self.metadata,
system_tags=True,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error getting tags for schema {schema_name}: {exc}")
def get_stored_procedures(self) -> Iterable[Any]: def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented""" """Not implemented"""
@ -527,24 +642,22 @@ class UnitycatalogSource(
"""Nothing to close""" """Nothing to close"""
# pylint: disable=arguments-renamed # pylint: disable=arguments-renamed
def get_owner_ref( def get_owner_ref(self, owner: Optional[str]) -> Optional[EntityReferenceList]:
self, table_owner: Optional[str]
) -> Optional[EntityReferenceList]:
""" """
Method to process the table owners Method to process the table owners
""" """
if self.source_config.includeOwners is False: if self.source_config.includeOwners is False:
return None return None
try: try:
if not table_owner or not isinstance(table_owner, str): if not owner or not isinstance(owner, str):
return None return None
owner_ref = self.metadata.get_reference_by_email(email=table_owner) owner_ref = self.metadata.get_reference_by_email(email=owner)
if owner_ref: if owner_ref:
return owner_ref return owner_ref
table_name = table_owner.split("@")[0] owner_name = owner.split("@")[0]
owner_ref = self.metadata.get_reference_by_name(name=table_name) owner_ref = self.metadata.get_reference_by_name(name=owner_name)
return owner_ref return owner_ref
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.warning(f"Error processing owner {table_owner}: {exc}") logger.warning(f"Error processing owner {owner}: {exc}")
return None return None

View File

@ -0,0 +1,19 @@
"""
SQL queries for Unity Catalog
"""
UNITY_CATALOG_GET_CATALOGS_TAGS = """
SELECT * FROM `{database}`.information_schema.catalog_tags;
"""
UNITY_CATALOG_GET_ALL_SCHEMA_TAGS = """
SELECT * FROM `{database}`.information_schema.schema_tags;
"""
UNITY_CATALOG_GET_ALL_TABLE_TAGS = """
SELECT * FROM `{database}`.information_schema.table_tags WHERE schema_name = '{schema}';
"""
UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS = """
SELECT * FROM `{database}`.information_schema.column_tags WHERE schema_name = '{schema}';
"""

View File

@ -318,7 +318,7 @@ MOCK_TABLE_INFO = TableInfo(
type_text="struct<a:int,b:string,c:array<string>,d:struct<abc:int>>", type_text="struct<a:int,b:string,c:array<string>,d:struct<abc:int>>",
), ),
], ],
comment=None, comment="this is a description for dataset input",
created_at=1713519443052, created_at=1713519443052,
created_by="test@open-metadata.org", created_by="test@open-metadata.org",
data_access_configuration_id="00000000-0000-0000-0000-000000000000", data_access_configuration_id="00000000-0000-0000-0000-000000000000",
@ -478,7 +478,7 @@ EXPTECTED_TABLE = [
right=CreateTableRequest( right=CreateTableRequest(
name=EntityName(root="complex_data"), name=EntityName(root="complex_data"),
displayName=None, displayName=None,
description=None, description="this is a description for dataset input",
tableType=TableType.Regular, tableType=TableType.Regular,
columns=[ columns=[
Column( Column(
@ -565,7 +565,7 @@ EXPTECTED_TABLE = [
dataTypeDisplay="int", dataTypeDisplay="int",
description=None, description=None,
fullyQualifiedName=None, fullyQualifiedName=None,
tags=None, tags=[],
constraint=None, constraint=None,
ordinalPosition=None, ordinalPosition=None,
jsonSchema=None, jsonSchema=None,
@ -584,7 +584,7 @@ EXPTECTED_TABLE = [
dataTypeDisplay="string", dataTypeDisplay="string",
description=None, description=None,
fullyQualifiedName=None, fullyQualifiedName=None,
tags=None, tags=[],
constraint=None, constraint=None,
ordinalPosition=None, ordinalPosition=None,
jsonSchema=None, jsonSchema=None,
@ -603,7 +603,7 @@ EXPTECTED_TABLE = [
dataTypeDisplay="array<string>", dataTypeDisplay="array<string>",
description=None, description=None,
fullyQualifiedName=None, fullyQualifiedName=None,
tags=None, tags=[],
constraint=None, constraint=None,
ordinalPosition=None, ordinalPosition=None,
jsonSchema=None, jsonSchema=None,
@ -622,7 +622,7 @@ EXPTECTED_TABLE = [
dataTypeDisplay="struct<abc:int>", dataTypeDisplay="struct<abc:int>",
description=None, description=None,
fullyQualifiedName=None, fullyQualifiedName=None,
tags=None, tags=[],
constraint=None, constraint=None,
ordinalPosition=None, ordinalPosition=None,
jsonSchema=None, jsonSchema=None,
@ -638,7 +638,7 @@ EXPTECTED_TABLE = [
dataTypeDisplay="int", dataTypeDisplay="int",
description=None, description=None,
fullyQualifiedName=None, fullyQualifiedName=None,
tags=None, tags=[],
constraint=None, constraint=None,
ordinalPosition=None, ordinalPosition=None,
jsonSchema=None, jsonSchema=None,

View File

@ -7,8 +7,8 @@ slug: /connectors/database/unity-catalog
name="Unity Catalog" name="Unity Catalog"
stage="PROD" stage="PROD"
platform="OpenMetadata" platform="OpenMetadata"
availableFeatures=["Metadata", "Query Usage", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt", "Sample Data", "Reverse Metadata (Collate Only)"] availableFeatures=["Metadata", "Query Usage", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt", "Sample Data", "Reverse Metadata (Collate Only)", "Owners", "Tags"]
unavailableFeatures=["Owners", "Tags", "Stored Procedures"] unavailableFeatures=["Stored Procedures"]
/ %} / %}

View File

@ -7,8 +7,8 @@ slug: /connectors/database/unity-catalog/yaml
name="Unity Catalog" name="Unity Catalog"
stage="PROD" stage="PROD"
platform="OpenMetadata" platform="OpenMetadata"
availableFeatures=["Metadata", "Query Usage", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt", "Sample Data", "Reverse Metadata (Collate Only)"] availableFeatures=["Metadata", "Query Usage", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt", "Sample Data", "Reverse Metadata (Collate Only)", "Owners", "Tags"]
unavailableFeatures=["Owners", "Tags", "Stored Procedures"] unavailableFeatures=["Stored Procedures"]
/ %} / %}
In this section, we provide guides and references to use the Unity Catalog connector. In this section, we provide guides and references to use the Unity Catalog connector.

View File

@ -1,44 +1,50 @@
{ {
"name": "UnityCatalog", "name": "UnityCatalog",
"displayName": "UnityCatalog Test Connection", "displayName": "UnityCatalog Test Connection",
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.", "description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.",
"steps": [ "steps": [
{ {
"name": "CheckAccess", "name": "CheckAccess",
"description": "Validate that we can properly reach the database and authenticate with the given credentials.", "description": "Validate that we can properly reach the database and authenticate with the given credentials.",
"errorMessage": "Failed to connect to unity catalog, please validate to token, http path & hostport", "errorMessage": "Failed to connect to unity catalog, please validate to token, http path & hostport",
"shortCircuit": true, "shortCircuit": true,
"mandatory": true "mandatory": true
}, },
{ {
"name": "GetDatabases", "name": "GetDatabases",
"description": "List all the databases available to the user.", "description": "List all the databases available to the user.",
"errorMessage": "Failed to fetch databases, please validate if the user has enough privilege to fetch databases.", "errorMessage": "Failed to fetch databases, please validate if the user has enough privilege to fetch databases.",
"mandatory": true "mandatory": true
}, },
{ {
"name": "GetSchemas", "name": "GetSchemas",
"description": "List all the schemas available to the user.", "description": "List all the schemas available to the user.",
"errorMessage": "Failed to fetch schemas, please validate if the user has enough privilege to fetch schemas.", "errorMessage": "Failed to fetch schemas, please validate if the user has enough privilege to fetch schemas.",
"mandatory": true "mandatory": true
}, },
{ {
"name": "GetTables", "name": "GetTables",
"description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.", "description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.", "errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.",
"mandatory": true "mandatory": true
}, },
{ {
"name": "GetViews", "name": "GetViews",
"description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.", "description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.", "errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.",
"mandatory": false "mandatory": false
}, },
{ {
"name": "GetQueries", "name": "GetQueries",
"description": "Check if we can access the queries form `https://{your_host}/api/2.0/sql/history/queries` API. NOTE: To access this api you must have a premium subscription to unity catalog.", "description": "Check if we can access the queries form `https://{your_host}/api/2.0/sql/history/queries` API. NOTE: To access this api you must have a premium subscription to unity catalog.",
"errorMessage": "Failed to fetch queries, please validate if the user has access to `https://{your_host}/api/2.0/sql/history/queries` API.", "errorMessage": "Failed to fetch queries, please validate if the user has access to `https://{your_host}/api/2.0/sql/history/queries` API.",
"mandatory": false "mandatory": false
} },
] {
} "name": "GetTags",
"description": "Check if tags can be accessed via `information_schema.catalog_tags`, `information_schema.schema_tags`, `information_schema.table_tags`, and `information_schema.column_tags`.",
"errorMessage": "Failed to fetch tags. Please ensure the httpPath is configured and verify that the user has access to view Unity Catalog metadata tables: `information_schema.catalog_tags`, `information_schema.schema_tags`, `information_schema.table_tags`, and `information_schema.column_tags`.",
"mandatory": false
}
]
}