Add: function to get different catalogs (#9179)

* Add: function to get different catalogs

* Change Based on comments

* Change Based on comments

* Change Based on comments
This commit is contained in:
Milan Bariya 2022-12-08 15:42:58 +05:30 committed by GitHub
parent 96e77af8a4
commit 6f12c971e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 0 deletions

View File

@ -3,6 +3,7 @@ source:
serviceName: local_databricks serviceName: local_databricks
serviceConnection: serviceConnection:
config: config:
catalog: hive_metastore
token: <databricks token> token: <databricks token>
hostPort: localhost:443 hostPort: localhost:443
connectionArguments: connectionArguments:

View File

@ -11,13 +11,18 @@
"""Clickhouse source module""" """Clickhouse source module"""
import re import re
import traceback
from copy import deepcopy
from typing import Iterable
from pyhive.sqlalchemy_hive import _type_map from pyhive.sqlalchemy_hive import _type_map
from sqlalchemy import types, util from sqlalchemy import types, util
from sqlalchemy.engine import reflection from sqlalchemy.engine import reflection
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.sqltypes import String from sqlalchemy.sql.sqltypes import String
from sqlalchemy_databricks._dialect import DatabricksDialect from sqlalchemy_databricks._dialect import DatabricksDialect
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection, DatabricksConnection,
) )
@ -29,6 +34,12 @@ from metadata.generated.schema.metadataIngestion.workflow import (
) )
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.utils import fqn
from metadata.utils.connections import get_connection
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class STRUCT(String): class STRUCT(String):
@ -121,7 +132,25 @@ def get_columns(self, connection, table_name, schema=None, **kw):
return result return result
@reflection.cache
def get_schema_names(self, connection, **kw): # pylint: disable=unused-argument
# Equivalent to SHOW DATABASES
connection.execute(f"USE CATALOG {kw.get('database')}")
return [row[0] for row in connection.execute("SHOW SCHEMAS")]
def get_schema_names_reflection(self, **kw):
"""Return all schema names."""
if hasattr(self.dialect, "get_schema_names"):
with self._operation_context() as conn: # pylint: disable=protected-access
return self.dialect.get_schema_names(conn, info_cache=self.info_cache, **kw)
return []
DatabricksDialect.get_columns = get_columns DatabricksDialect.get_columns = get_columns
DatabricksDialect.get_schema_names = get_schema_names
reflection.Inspector.get_schema_names = get_schema_names_reflection
class DatabricksSource(CommonDbSourceService): class DatabricksSource(CommonDbSourceService):
@ -139,3 +168,59 @@ class DatabricksSource(CommonDbSourceService):
f"Expected DatabricksConnection, but got {connection}" f"Expected DatabricksConnection, but got {connection}"
) )
return cls(config, metadata_config) return cls(config, metadata_config)
def set_inspector(self, database_name: str) -> None:
"""
When sources override `get_database_names`, they will need
to setup multiple inspectors. They can use this function.
:param database_name: new database to set
"""
logger.info(f"Ingesting from catalog: {database_name}")
new_service_connection = deepcopy(self.service_connection)
new_service_connection.catalog = database_name
self.engine = get_connection(new_service_connection)
self.inspector = inspect(self.engine)
def get_database_names(self) -> Iterable[str]:
configured_catalog = self.service_connection.__dict__.get("catalog")
if configured_catalog:
self.set_inspector(database_name=configured_catalog)
yield configured_catalog
else:
results = self.connection.execute("SHOW CATALOGS")
for res in results:
new_catalog = res[0]
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.database_service.name.__root__,
database_name=new_catalog,
)
if filter_by_database(
self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else new_catalog,
):
self.status.filter(database_fqn, "Database Filtered Out")
continue
try:
self.set_inspector(database_name=new_catalog)
yield new_catalog
except Exception as exc:
logger.error(traceback.format_exc())
logger.warning(
f"Error trying to process database {new_catalog}: {exc}"
)
def get_raw_database_schema_names(self) -> Iterable[str]:
if self.service_connection.__dict__.get("databaseSchema"):
yield self.service_connection.databaseSchema
else:
for schema_name in self.inspector.get_schema_names(
database=self.context.database.name.__root__
):
yield schema_name

View File

@ -48,6 +48,16 @@
"description": "Databricks compute resources URL.", "description": "Databricks compute resources URL.",
"type": "string" "type": "string"
}, },
"catalog": {
"title": "Catalog",
"description": "Catalog of the data source(Example: hive_metastore). This is optional parameter, if you would like to restrict the metadata reading to a single catalog. When left blank, OpenMetadata Ingestion attempts to scan all the catalog.",
"type": "string"
},
"databaseSchema": {
"title": "databaseSchema",
"description": "databaseSchema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single databaseSchema. When left blank, OpenMetadata Ingestion attempts to scan all the databaseSchema.",
"type": "string"
},
"connectionOptions": { "connectionOptions": {
"title": "Connection Options", "title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions" "$ref": "../connectionBasicType.json#/definitions/connectionOptions"
@ -70,6 +80,10 @@
"title": "Supports Profiler", "title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler" "$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}, },
"supportsDatabase": {
"title": "Supports Database",
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"
},
"supportsQueryComment": { "supportsQueryComment": {
"title": "Supports Query Comment", "title": "Supports Query Comment",
"$ref": "../connectionBasicType.json#/definitions/supportsQueryComment" "$ref": "../connectionBasicType.json#/definitions/supportsQueryComment"