diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py index 64a6560bbc1..f21df24ce40 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py @@ -36,6 +36,12 @@ from metadata.ingestion.connections.test_connections import test_connection_step from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient from metadata.ingestion.source.database.unitycatalog.models import DatabricksTable +from metadata.ingestion.source.database.unitycatalog.queries import ( + UNITY_CATALOG_GET_ALL_SCHEMA_TAGS, + UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS, + UNITY_CATALOG_GET_ALL_TABLE_TAGS, + UNITY_CATALOG_GET_CATALOGS_TAGS, +) from metadata.utils.constants import THREE_MIN from metadata.utils.db_utils import get_host_from_host_port from metadata.utils.logger import ingestion_logger @@ -111,6 +117,32 @@ def test_connection( table_obj.name = table.name break + def get_tags( + service_connection: UnityCatalogConnection, table_obj: DatabricksTable + ): + engine = get_sqlalchemy_connection(service_connection) + with engine.connect() as connection: + connection.execute( + UNITY_CATALOG_GET_CATALOGS_TAGS.format( + database=table_obj.catalog_name + ).replace(";", " limit 1;") + ) + connection.execute( + UNITY_CATALOG_GET_ALL_SCHEMA_TAGS.format( + database=table_obj.catalog_name + ).replace(";", " limit 1;") + ) + connection.execute( + UNITY_CATALOG_GET_ALL_TABLE_TAGS.format( + database=table_obj.catalog_name, schema=table_obj.schema_name + ).replace(";", " limit 1;") + ) + connection.execute( + UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS.format( + database=table_obj.catalog_name, schema=table_obj.schema_name + ).replace(";", " limit 1;") + ) + test_fn = { "CheckAccess": connection.catalogs.list, "GetDatabases": partial(get_catalogs, connection, table_obj), @@ -118,6 +150,7 @@ def test_connection( "GetTables": partial(get_tables, connection, table_obj), "GetViews": partial(get_tables, connection, table_obj), "GetQueries": client.test_query_api_access, + "GetTags": partial(get_tags, service_connection, table_obj), } return test_connection_steps( diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index 5c1e74d10a9..39f3a11bba5 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -65,20 +65,33 @@ from metadata.ingestion.source.database.external_table_lineage_mixin import ( from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient -from metadata.ingestion.source.database.unitycatalog.connection import get_connection +from metadata.ingestion.source.database.unitycatalog.connection import ( + get_connection, + get_sqlalchemy_connection, +) from metadata.ingestion.source.database.unitycatalog.models import ( ColumnJson, ElementType, ForeignConstrains, Type, ) +from metadata.ingestion.source.database.unitycatalog.queries import ( + UNITY_CATALOG_GET_ALL_SCHEMA_TAGS, + UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS, + UNITY_CATALOG_GET_ALL_TABLE_TAGS, + UNITY_CATALOG_GET_CATALOGS_TAGS, +) from metadata.utils import fqn from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger +from metadata.utils.tag_utils import get_ometa_tag_and_classification logger = ingestion_logger() +UNITY_CATALOG_TAG = "UNITY CATALOG TAG" +UNITY_CATALOG_TAG_CLASSIFICATION = "UNITY CATALOG TAG CLASSIFICATION" + class UnitycatalogSource( ExternalTableLineageMixin, DatabaseServiceSource, MultiDBSource @@ -179,10 +192,14 @@ class UnitycatalogSource( From topology. Prepare a database request and pass it to the sink """ + catalog = self.client.catalogs.get(database_name) yield Either( right=CreateDatabaseRequest( name=database_name, service=self.context.get().database_service, + owners=self.get_owner_ref(catalog.owner), + description=catalog.comment, + tags=self.get_database_tag_labels(database_name), ) ) @@ -227,6 +244,9 @@ class UnitycatalogSource( From topology. Prepare a database schema request and pass it to the sink """ + schema = self.client.schemas.get( + full_name=f"{self.context.get().database}.{schema_name}" + ) yield Either( right=CreateDatabaseSchemaRequest( name=EntityName(schema_name), @@ -238,6 +258,9 @@ class UnitycatalogSource( database_name=self.context.get().database, ) ), + description=schema.comment, + owners=self.get_owner_ref(schema.owner), + tags=self.get_schema_tag_labels(schema_name), ) ) @@ -315,7 +338,7 @@ class UnitycatalogSource( (db_name, schema_name, table_name) ] = table.storage_location try: - columns = list(self.get_columns(table.columns)) + columns = list(self.get_columns(table_name, table.columns)) ( primary_constraints, foreign_constraints, @@ -341,6 +364,7 @@ class UnitycatalogSource( ) ), owners=self.get_owner_ref(table.owner), + tags=self.get_tag_labels(table_name), ) yield Either(right=table_request) @@ -477,11 +501,12 @@ class UnitycatalogSource( f"Unable to add description to complex datatypes for column [{column.name}]: {exc}" ) - def get_columns(self, column_data: List[ColumnInfo]) -> Iterable[Column]: + def get_columns( + self, table_name: str, column_data: List[ColumnInfo] + ) -> Iterable[Column]: """ process table regular columns info """ - for column in column_data: parsed_string = {} if column.type_text: @@ -500,6 +525,9 @@ class UnitycatalogSource( parsed_string["dataLength"] = parsed_string.get("dataLength", 1) if column.comment: parsed_string["description"] = Markdown(column.comment) + parsed_string["tags"] = self.get_column_tag_labels( + table_name=table_name, column={"name": column.name} + ) parsed_column = Column(**parsed_string) self.add_complex_datatype_descriptions( column=parsed_column, @@ -507,10 +535,97 @@ class UnitycatalogSource( ) yield parsed_column + def yield_database_tag( + self, database_name: str + ) -> Iterable[Either[OMetaTagAndClassification]]: + """Get Unity Catalog database/catalog tags using SQL query""" + query_tag_fqn_builder_mapping = ( + ( + UNITY_CATALOG_GET_CATALOGS_TAGS.format(database=database_name), + lambda tag: [self.context.get().database_service, database_name], + ), + ( + UNITY_CATALOG_GET_ALL_SCHEMA_TAGS.format(database=database_name), + lambda tag: [ + self.context.get().database_service, + database_name, + tag.schema_name, + ], + ), + ) + try: + with get_sqlalchemy_connection( + self.service_connection + ).connect() as connection: + for query, tag_fqn_builder in query_tag_fqn_builder_mapping: + for tag in connection.execute(query): + yield from get_ometa_tag_and_classification( + tag_fqn=FullyQualifiedEntityName( + fqn._build(*tag_fqn_builder(tag)) + ), # pylint: disable=protected-access + tags=[tag.tag_value], + classification_name=tag.tag_name, + tag_description=UNITY_CATALOG_TAG, + classification_description=UNITY_CATALOG_TAG_CLASSIFICATION, + metadata=self.metadata, + system_tags=True, + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error getting tags for catalog/schema {database_name}: {exc}" + ) + def yield_tag( self, schema_name: str ) -> Iterable[Either[OMetaTagAndClassification]]: - """No tags being processed""" + """Get Unity Catalog schema tags using SQL query""" + database = self.context.get().database + query_tag_fqn_builder_mapping = ( + ( + UNITY_CATALOG_GET_ALL_TABLE_TAGS.format( + database=database, schema=schema_name + ), + lambda tag: [ + self.context.get().database_service, + database, + schema_name, + tag.table_name, + ], + ), + ( + UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS.format( + database=database, schema=schema_name + ), + lambda tag: [ + self.context.get().database_service, + database, + schema_name, + tag.table_name, + tag.column_name, + ], + ), + ) + try: + with get_sqlalchemy_connection( + self.service_connection + ).connect() as connection: + for query, tag_fqn_builder in query_tag_fqn_builder_mapping: + for tag in connection.execute(query): + yield from get_ometa_tag_and_classification( + tag_fqn=FullyQualifiedEntityName( + fqn._build(*tag_fqn_builder(tag)) + ), # pylint: disable=protected-access + tags=[tag.tag_value], + classification_name=tag.tag_name, + tag_description=UNITY_CATALOG_TAG, + classification_description=UNITY_CATALOG_TAG_CLASSIFICATION, + metadata=self.metadata, + system_tags=True, + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error getting tags for schema {schema_name}: {exc}") def get_stored_procedures(self) -> Iterable[Any]: """Not implemented""" @@ -527,24 +642,22 @@ class UnitycatalogSource( """Nothing to close""" # pylint: disable=arguments-renamed - def get_owner_ref( - self, table_owner: Optional[str] - ) -> Optional[EntityReferenceList]: + def get_owner_ref(self, owner: Optional[str]) -> Optional[EntityReferenceList]: """ Method to process the table owners """ if self.source_config.includeOwners is False: return None try: - if not table_owner or not isinstance(table_owner, str): + if not owner or not isinstance(owner, str): return None - owner_ref = self.metadata.get_reference_by_email(email=table_owner) + owner_ref = self.metadata.get_reference_by_email(email=owner) if owner_ref: return owner_ref - table_name = table_owner.split("@")[0] - owner_ref = self.metadata.get_reference_by_name(name=table_name) + owner_name = owner.split("@")[0] + owner_ref = self.metadata.get_reference_by_name(name=owner_name) return owner_ref except Exception as exc: logger.debug(traceback.format_exc()) - logger.warning(f"Error processing owner {table_owner}: {exc}") + logger.warning(f"Error processing owner {owner}: {exc}") return None diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py new file mode 100644 index 00000000000..2cd59e73c25 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -0,0 +1,19 @@ +""" +SQL queries for Unity Catalog +""" + +UNITY_CATALOG_GET_CATALOGS_TAGS = """ +SELECT * FROM `{database}`.information_schema.catalog_tags; +""" + +UNITY_CATALOG_GET_ALL_SCHEMA_TAGS = """ +SELECT * FROM `{database}`.information_schema.schema_tags; +""" + +UNITY_CATALOG_GET_ALL_TABLE_TAGS = """ +SELECT * FROM `{database}`.information_schema.table_tags WHERE schema_name = '{schema}'; +""" + +UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS = """ +SELECT * FROM `{database}`.information_schema.column_tags WHERE schema_name = '{schema}'; +""" diff --git a/ingestion/tests/unit/topology/database/test_unity_catalog.py b/ingestion/tests/unit/topology/database/test_unity_catalog.py index 1fb914cdb87..185f125a029 100644 --- a/ingestion/tests/unit/topology/database/test_unity_catalog.py +++ b/ingestion/tests/unit/topology/database/test_unity_catalog.py @@ -318,7 +318,7 @@ MOCK_TABLE_INFO = TableInfo( type_text="struct,d:struct>", ), ], - comment=None, + comment="this is a description for dataset input", created_at=1713519443052, created_by="test@open-metadata.org", data_access_configuration_id="00000000-0000-0000-0000-000000000000", @@ -478,7 +478,7 @@ EXPTECTED_TABLE = [ right=CreateTableRequest( name=EntityName(root="complex_data"), displayName=None, - description=None, + description="this is a description for dataset input", tableType=TableType.Regular, columns=[ Column( @@ -565,7 +565,7 @@ EXPTECTED_TABLE = [ dataTypeDisplay="int", description=None, fullyQualifiedName=None, - tags=None, + tags=[], constraint=None, ordinalPosition=None, jsonSchema=None, @@ -584,7 +584,7 @@ EXPTECTED_TABLE = [ dataTypeDisplay="string", description=None, fullyQualifiedName=None, - tags=None, + tags=[], constraint=None, ordinalPosition=None, jsonSchema=None, @@ -603,7 +603,7 @@ EXPTECTED_TABLE = [ dataTypeDisplay="array", description=None, fullyQualifiedName=None, - tags=None, + tags=[], constraint=None, ordinalPosition=None, jsonSchema=None, @@ -622,7 +622,7 @@ EXPTECTED_TABLE = [ dataTypeDisplay="struct", description=None, fullyQualifiedName=None, - tags=None, + tags=[], constraint=None, ordinalPosition=None, jsonSchema=None, @@ -638,7 +638,7 @@ EXPTECTED_TABLE = [ dataTypeDisplay="int", description=None, fullyQualifiedName=None, - tags=None, + tags=[], constraint=None, ordinalPosition=None, jsonSchema=None, diff --git a/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/database/unity-catalog/index.md b/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/database/unity-catalog/index.md index 6369b6cdada..9eab236d548 100644 --- a/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/database/unity-catalog/index.md +++ b/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/database/unity-catalog/index.md @@ -7,8 +7,8 @@ slug: /connectors/database/unity-catalog name="Unity Catalog" stage="PROD" platform="OpenMetadata" -availableFeatures=["Metadata", "Query Usage", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt", "Sample Data", "Reverse Metadata (Collate Only)"] -unavailableFeatures=["Owners", "Tags", "Stored Procedures"] +availableFeatures=["Metadata", "Query Usage", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt", "Sample Data", "Reverse Metadata (Collate Only)", "Owners", "Tags"] +unavailableFeatures=["Stored Procedures"] / %} diff --git a/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/database/unity-catalog/yaml.md b/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/database/unity-catalog/yaml.md index e36da0f03a2..37de12d83bf 100644 --- a/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/database/unity-catalog/yaml.md +++ b/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/database/unity-catalog/yaml.md @@ -7,8 +7,8 @@ slug: /connectors/database/unity-catalog/yaml name="Unity Catalog" stage="PROD" platform="OpenMetadata" -availableFeatures=["Metadata", "Query Usage", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt", "Sample Data", "Reverse Metadata (Collate Only)"] -unavailableFeatures=["Owners", "Tags", "Stored Procedures"] +availableFeatures=["Metadata", "Query Usage", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt", "Sample Data", "Reverse Metadata (Collate Only)", "Owners", "Tags"] +unavailableFeatures=["Stored Procedures"] / %} In this section, we provide guides and references to use the Unity Catalog connector. diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/database/unitycatalog.json b/openmetadata-service/src/main/resources/json/data/testConnections/database/unitycatalog.json index 65840470132..e9b78458da3 100644 --- a/openmetadata-service/src/main/resources/json/data/testConnections/database/unitycatalog.json +++ b/openmetadata-service/src/main/resources/json/data/testConnections/database/unitycatalog.json @@ -1,44 +1,50 @@ { - "name": "UnityCatalog", - "displayName": "UnityCatalog Test Connection", - "description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.", - "steps": [ - { - "name": "CheckAccess", - "description": "Validate that we can properly reach the database and authenticate with the given credentials.", - "errorMessage": "Failed to connect to unity catalog, please validate to token, http path & hostport", - "shortCircuit": true, - "mandatory": true - }, - { - "name": "GetDatabases", - "description": "List all the databases available to the user.", - "errorMessage": "Failed to fetch databases, please validate if the user has enough privilege to fetch databases.", - "mandatory": true - }, - { - "name": "GetSchemas", - "description": "List all the schemas available to the user.", - "errorMessage": "Failed to fetch schemas, please validate if the user has enough privilege to fetch schemas.", - "mandatory": true - }, - { - "name": "GetTables", - "description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.", - "errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.", - "mandatory": true - }, - { - "name": "GetViews", - "description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.", - "errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.", - "mandatory": false - }, - { - "name": "GetQueries", - "description": "Check if we can access the queries form `https://{your_host}/api/2.0/sql/history/queries` API. NOTE: To access this api you must have a premium subscription to unity catalog.", - "errorMessage": "Failed to fetch queries, please validate if the user has access to `https://{your_host}/api/2.0/sql/history/queries` API.", - "mandatory": false - } - ] - } + "name": "UnityCatalog", + "displayName": "UnityCatalog Test Connection", + "description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.", + "steps": [ + { + "name": "CheckAccess", + "description": "Validate that we can properly reach the database and authenticate with the given credentials.", + "errorMessage": "Failed to connect to unity catalog, please validate to token, http path & hostport", + "shortCircuit": true, + "mandatory": true + }, + { + "name": "GetDatabases", + "description": "List all the databases available to the user.", + "errorMessage": "Failed to fetch databases, please validate if the user has enough privilege to fetch databases.", + "mandatory": true + }, + { + "name": "GetSchemas", + "description": "List all the schemas available to the user.", + "errorMessage": "Failed to fetch schemas, please validate if the user has enough privilege to fetch schemas.", + "mandatory": true + }, + { + "name": "GetTables", + "description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.", + "errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.", + "mandatory": true + }, + { + "name": "GetViews", + "description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.", + "errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.", + "mandatory": false + }, + { + "name": "GetQueries", + "description": "Check if we can access the queries form `https://{your_host}/api/2.0/sql/history/queries` API. NOTE: To access this api you must have a premium subscription to unity catalog.", + "errorMessage": "Failed to fetch queries, please validate if the user has access to `https://{your_host}/api/2.0/sql/history/queries` API.", + "mandatory": false + }, + { + "name": "GetTags", + "description": "Check if tags can be accessed via `information_schema.catalog_tags`, `information_schema.schema_tags`, `information_schema.table_tags`, and `information_schema.column_tags`.", + "errorMessage": "Failed to fetch tags. Please ensure the httpPath is configured and verify that the user has access to view Unity Catalog metadata tables: `information_schema.catalog_tags`, `information_schema.schema_tags`, `information_schema.table_tags`, and `information_schema.column_tags`.", + "mandatory": false + } + ] +} \ No newline at end of file