diff --git a/ingestion/src/metadata/examples/workflows/databricks.yaml b/ingestion/src/metadata/examples/workflows/databricks.yaml index 2a968dd780a..a7c3cfa42e6 100644 --- a/ingestion/src/metadata/examples/workflows/databricks.yaml +++ b/ingestion/src/metadata/examples/workflows/databricks.yaml @@ -3,6 +3,7 @@ source: serviceName: local_databricks serviceConnection: config: + catalog: hive_metastore token: hostPort: localhost:443 connectionArguments: diff --git a/ingestion/src/metadata/ingestion/source/database/databricks.py b/ingestion/src/metadata/ingestion/source/database/databricks.py index e8010714c55..2db9a2b9879 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks.py @@ -11,13 +11,18 @@ """Clickhouse source module""" import re +import traceback +from copy import deepcopy +from typing import Iterable from pyhive.sqlalchemy_hive import _type_map from sqlalchemy import types, util from sqlalchemy.engine import reflection +from sqlalchemy.inspection import inspect from sqlalchemy.sql.sqltypes import String from sqlalchemy_databricks._dialect import DatabricksDialect +from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( DatabricksConnection, ) @@ -29,6 +34,12 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.utils import fqn +from metadata.utils.connections import get_connection +from metadata.utils.filters import filter_by_database +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() class STRUCT(String): @@ -121,7 +132,25 @@ def get_columns(self, connection, table_name, schema=None, **kw): return result +@reflection.cache +def get_schema_names(self, connection, **kw): # pylint: disable=unused-argument + # Equivalent to SHOW DATABASES + connection.execute(f"USE CATALOG {kw.get('database')}") + return [row[0] for row in connection.execute("SHOW SCHEMAS")] + + +def get_schema_names_reflection(self, **kw): + """Return all schema names.""" + + if hasattr(self.dialect, "get_schema_names"): + with self._operation_context() as conn: # pylint: disable=protected-access + return self.dialect.get_schema_names(conn, info_cache=self.info_cache, **kw) + return [] + + DatabricksDialect.get_columns = get_columns +DatabricksDialect.get_schema_names = get_schema_names +reflection.Inspector.get_schema_names = get_schema_names_reflection class DatabricksSource(CommonDbSourceService): @@ -139,3 +168,59 @@ class DatabricksSource(CommonDbSourceService): f"Expected DatabricksConnection, but got {connection}" ) return cls(config, metadata_config) + + def set_inspector(self, database_name: str) -> None: + """ + When sources override `get_database_names`, they will need + to setup multiple inspectors. They can use this function. + :param database_name: new database to set + """ + logger.info(f"Ingesting from catalog: {database_name}") + + new_service_connection = deepcopy(self.service_connection) + new_service_connection.catalog = database_name + self.engine = get_connection(new_service_connection) + self.inspector = inspect(self.engine) + + def get_database_names(self) -> Iterable[str]: + configured_catalog = self.service_connection.__dict__.get("catalog") + if configured_catalog: + self.set_inspector(database_name=configured_catalog) + yield configured_catalog + else: + results = self.connection.execute("SHOW CATALOGS") + for res in results: + new_catalog = res[0] + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.context.database_service.name.__root__, + database_name=new_catalog, + ) + if filter_by_database( + self.source_config.databaseFilterPattern, + database_fqn + if self.source_config.useFqnForFiltering + else new_catalog, + ): + self.status.filter(database_fqn, "Database Filtered Out") + continue + + try: + + self.set_inspector(database_name=new_catalog) + yield new_catalog + except Exception as exc: + logger.error(traceback.format_exc()) + logger.warning( + f"Error trying to process database {new_catalog}: {exc}" + ) + + def get_raw_database_schema_names(self) -> Iterable[str]: + if self.service_connection.__dict__.get("databaseSchema"): + yield self.service_connection.databaseSchema + else: + for schema_name in self.inspector.get_schema_names( + database=self.context.database.name.__root__ + ): + yield schema_name diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/databricksConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/databricksConnection.json index d1cf3d6c283..579bd90a2af 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/databricksConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/databricksConnection.json @@ -48,6 +48,16 @@ "description": "Databricks compute resources URL.", "type": "string" }, + "catalog": { + "title": "Catalog", + "description": "Catalog of the data source(Example: hive_metastore). This is optional parameter, if you would like to restrict the metadata reading to a single catalog. When left blank, OpenMetadata Ingestion attempts to scan all the catalog.", + "type": "string" + }, + "databaseSchema": { + "title": "databaseSchema", + "description": "databaseSchema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single databaseSchema. When left blank, OpenMetadata Ingestion attempts to scan all the databaseSchema.", + "type": "string" + }, "connectionOptions": { "title": "Connection Options", "$ref": "../connectionBasicType.json#/definitions/connectionOptions" @@ -70,6 +80,10 @@ "title": "Supports Profiler", "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" }, + "supportsDatabase": { + "title": "Supports Database", + "$ref": "../connectionBasicType.json#/definitions/supportsDatabase" + }, "supportsQueryComment": { "title": "Supports Query Comment", "$ref": "../connectionBasicType.json#/definitions/supportsQueryComment"