diff --git a/ingestion/examples/workflows/clickhouse_usage.json b/ingestion/examples/workflows/clickhouse_usage.json index abd26617987..3b3e55e75fb 100644 --- a/ingestion/examples/workflows/clickhouse_usage.json +++ b/ingestion/examples/workflows/clickhouse_usage.json @@ -1,17 +1,22 @@ { - "source": { - "type": "clickhouse-usage", + "source": { + "type": "clickhouse_usage", + "serviceName": "local_clickhouse", + "serviceConnection": { "config": { + "type": "ClickHouse", "username":"default", "password":"", - "database": "default", - "service_name": "local_clickhouse", - "schema_filter_pattern": { - "excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"] - }, - "duration":2 + "hostPort": "localhost:8123", + "database": "default" } }, + "sourceConfig": { + "config": { + "queryLogDuration": "1" + } + } + }, "processor": { "type": "query-parser", "config": { @@ -24,17 +29,16 @@ "filename": "/tmp/clickhouse_usage" } }, - "bulk_sink": { + "bulkSink": { "type": "metadata-usage", "config": { "filename": "/tmp/clickhouse_usage" } }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/examples/workflows/mssql_usage.json b/ingestion/examples/workflows/mssql_usage.json index cfa7b304264..85e95cc9568 100644 --- a/ingestion/examples/workflows/mssql_usage.json +++ b/ingestion/examples/workflows/mssql_usage.json @@ -1,16 +1,19 @@ { "source": { "type": "mssql-usage", - "config": { - "host_port": "localhost:1433", - "service_name": "local_mssql", - "database": "catalog_test", - "query": "select top 50 * from [{}].[{}]", - "username": "sa", - "password": "test!Password", - "duration":2, - "table_filter_pattern": { - "excludes": ["catalog_test.*"] + "serviceName": "local_mssql", + "serviceConnection": { + "config": { + "type": "MSSQL", + "database": "catalog_test", + "username": "sa", + "password": "test!Password", + "hostPort": "localhost:1433" + } + }, + "sourceConfig": { + "config": { + "queryLogDuration": "1" } } }, @@ -26,17 +29,16 @@ "filename": "/tmp/mssql_usage" } }, - "bulk_sink": { + "bulkSink": { "type": "metadata-usage", "config": { "filename": "/tmp/mssql_usage" } }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/src/metadata/config/common.py b/ingestion/src/metadata/config/common.py index 43d5954075d..a7cb847ab13 100644 --- a/ingestion/src/metadata/config/common.py +++ b/ingestion/src/metadata/config/common.py @@ -18,7 +18,7 @@ from typing import IO, Any, Optional from pydantic import BaseModel -FQDN_SEPARATOR: str = ":" +FQDN_SEPARATOR: str = "." class ConfigModel(BaseModel): diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 89158356e03..966e3919633 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -28,6 +28,7 @@ from metadata.ingestion.models.table_queries import ( ) from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.helpers import _get_formmated_table_name logger = logging.getLogger(__name__) @@ -72,12 +73,13 @@ class MetadataUsageBulkSink(BulkSink): for record in usage_records: table_usage = TableUsageCount(**json.loads(record)) if "." in table_usage.table: - table_usage.database, table_usage.table = table_usage.table.split(".")[ - -2: - ] + ( + table_usage.database_schema, + table_usage.table, + ) = table_usage.table.split(".")[-2:] self.service_name = table_usage.service_name table_entity = self.__get_table_entity( - table_usage.database, table_usage.table + table_usage.database, table_usage.database_schema, table_usage.table ) if table_entity is not None: if not table_usage_map.get(table_entity.id.__root__): @@ -168,11 +170,13 @@ class MetadataUsageBulkSink(BulkSink): else: column_joins_dict[column_join.table_column.column] = {} main_column_fqdn = self.__get_column_fqdn( - table_usage.database, column_join.table_column + table_usage.database, + table_usage.database_schema, + column_join.table_column, ) for column in column_join.joined_with: joined_column_fqdn = self.__get_column_fqdn( - table_usage.database, column + table_usage.database, table_usage.database_schema, column ) if str(joined_column_fqdn) in joined_with.keys(): column_joined_with = joined_with[str(joined_column_fqdn)] @@ -194,16 +198,23 @@ class MetadataUsageBulkSink(BulkSink): ) return table_joins - def __get_column_fqdn(self, database: str, table_column: TableColumn): - table_entity = self.__get_table_entity(database, table_column.table) + def __get_column_fqdn( + self, database: str, database_schema: str, table_column: TableColumn + ): + table_entity = self.__get_table_entity( + database, database_schema, table_column.table + ) if table_entity is None: return None for tbl_column in table_entity.columns: if table_column.column.lower() == tbl_column.name.__root__.lower(): - return tbl_column.fullyQualifiedName.__root__ + return tbl_column.fullyQualifiedName.__root__.__root__ - def __get_table_entity(self, database_name: str, table_name: str) -> Table: - table_fqn = f"{self.service_name}{FQDN_SEPARATOR}{database_name}{FQDN_SEPARATOR}{table_name}" + def __get_table_entity( + self, database_name: str, database_schema: str, table_name: str + ) -> Table: + table_fqn = f"{self.service_name}{FQDN_SEPARATOR}{database_name}{FQDN_SEPARATOR}{database_schema}{FQDN_SEPARATOR}{table_name}" + table_fqn = _get_formmated_table_name(table_fqn) table_entity = self.metadata.get_by_name(Table, fqdn=table_fqn) return table_entity diff --git a/ingestion/src/metadata/ingestion/models/table_queries.py b/ingestion/src/metadata/ingestion/models/table_queries.py index b899986a9a4..39225f62295 100644 --- a/ingestion/src/metadata/ingestion/models/table_queries.py +++ b/ingestion/src/metadata/ingestion/models/table_queries.py @@ -61,6 +61,7 @@ class TableUsageCount(BaseModel): table: str date: str database: str + database_schema: Optional[str] sql_queries: List[SqlQuery] count: int = 1 joins: TableColumnJoins diff --git a/ingestion/src/metadata/ingestion/source/clickhouse_usage.py b/ingestion/src/metadata/ingestion/source/clickhouse_usage.py index 3bdf720ba6a..1a9d3123d16 100644 --- a/ingestion/src/metadata/ingestion/source/clickhouse_usage.py +++ b/ingestion/src/metadata/ingestion/source/clickhouse_usage.py @@ -12,19 +12,25 @@ Clickhouse usage module """ +import ast from typing import Any, Dict, Iterable +from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( + ClickhouseConnection, +) from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus # This import verifies that the dependencies are available. from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.source.clickhouse import ClickhouseConfig from metadata.ingestion.source.sql_alchemy_helper import ( SQLAlchemyHelper, SQLSourceStatus, @@ -50,16 +56,19 @@ class ClickhouseUsageSource(Source[TableQuery]): report: """ - def __init__(self, config, metadata_config): + def __init__( + self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig + ): super().__init__() self.config = config - start, end = get_start_and_end(config.duration) + self.connection = config.serviceConnection.__root__.config + start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) self.analysis_date = start self.sql_stmt = CLICKHOUSE_SQL_USAGE_STATEMENT.format( start_time=start, end_time=end ) self.alchemy_helper = SQLAlchemyHelper( - config, + self.connection, metadata_config, DatabaseServiceType.ClickHouse.value, self.sql_stmt, @@ -68,7 +77,13 @@ class ClickhouseUsageSource(Source[TableQuery]): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config = ClickhouseConfig.parse_obj(config_dict) + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: ClickhouseConnection = config.serviceConnection.__root__.config + if not isinstance(connection, ClickhouseConnection): + raise InvalidSourceException( + f"Expected ClickhouseConnection, but got {connection}" + ) + return cls(config, metadata_config) def prepare(self): @@ -90,6 +105,11 @@ class ClickhouseUsageSource(Source[TableQuery]): :return: """ for row in get_raw_extract_iter(self.alchemy_helper): + database = "default" + if row["database_name"]: + database_list = ast.literal_eval(row["database_name"]) + database = database_list[0] if len(database_list) == 1 else "default" + table_query = TableQuery( query=row["query_id"], user_name=row["user_name"], @@ -97,11 +117,9 @@ class ClickhouseUsageSource(Source[TableQuery]): endtime=str(row["end_time"]), analysis_date=self.analysis_date, aborted=row["aborted"], - database=row["database_name"][0] - if len(row["database_name"]) >= 1 - else "default", + database=database, sql=row["query_text"], - service_name=self.config.service_name, + service_name=self.config.serviceName, ) yield table_query diff --git a/ingestion/src/metadata/ingestion/source/mssql_usage.py b/ingestion/src/metadata/ingestion/source/mssql_usage.py index 32105f8d451..a4f54f448f9 100644 --- a/ingestion/src/metadata/ingestion/source/mssql_usage.py +++ b/ingestion/src/metadata/ingestion/source/mssql_usage.py @@ -14,17 +14,22 @@ MSSQL usage module from typing import Any, Dict, Iterable +from metadata.generated.schema.entity.services.connections.database.mssqlConnection import ( + MssqlConnection, +) from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus # This import verifies that the dependencies are available. from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.source.mssql import MssqlConfig from metadata.ingestion.source.sql_alchemy_helper import ( SQLAlchemyHelper, SQLSourceStatus, @@ -49,20 +54,32 @@ class MssqlUsageSource(Source[TableQuery]): report: """ - def __init__(self, config, metadata_config): + def __init__( + self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig + ): super().__init__() self.config = config - start, end = get_start_and_end(config.duration) + self.connection = config.serviceConnection.__root__.config + start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) self.analysis_date = start self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format(start_date=start, end_date=end) self.alchemy_helper = SQLAlchemyHelper( - config, metadata_config, DatabaseServiceType.MSSQL.value, self.sql_stmt + self.connection, + metadata_config, + DatabaseServiceType.MSSQL.value, + self.sql_stmt, ) self.report = SQLSourceStatus() @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config = MssqlConfig.parse_obj(config_dict) + """Create class instance""" + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: MssqlConnection = config.serviceConnection.__root__.config + if not isinstance(connection, MssqlConnection): + raise InvalidSourceException( + f"Expected MssqlConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): @@ -93,7 +110,7 @@ class MssqlUsageSource(Source[TableQuery]): aborted=row["aborted"], database=row["database_name"], sql=row["query_text"], - service_name=self.config.service_name, + service_name=self.config.serviceName, ) if row["schema_name"] is not None: self.report.scanned(f"{row['database_name']}.{row['schema_name']}") diff --git a/ingestion/src/metadata/ingestion/source/sql_alchemy_helper.py b/ingestion/src/metadata/ingestion/source/sql_alchemy_helper.py index 8f43405cdba..a9adc496474 100644 --- a/ingestion/src/metadata/ingestion/source/sql_alchemy_helper.py +++ b/ingestion/src/metadata/ingestion/source/sql_alchemy_helper.py @@ -15,7 +15,8 @@ from typing import Any, Iterable from sqlalchemy import create_engine from ...generated.schema.metadataIngestion.workflow import OpenMetadataServerConfig -from .sql_source import SQLConnectionConfig, SQLSourceStatus +from ...utils.source_connections import get_connection_url +from .sql_source import SQLSourceStatus class SQLAlchemyHelper: @@ -23,7 +24,7 @@ class SQLAlchemyHelper: def __init__( self, - config: SQLConnectionConfig, + config, metadata_config: OpenMetadataServerConfig, platform: str, query: str, @@ -39,9 +40,11 @@ class SQLAlchemyHelper: Create a SQLAlchemy connection to Database """ engine = create_engine( - self.config.get_connection_url(), - **self.config.options, - connect_args=self.config.connect_args + get_connection_url(self.config), + **self.config.connectionOptions if self.config.connectionOptions else {}, + connect_args=self.config.connectionArguments + if self.config.connectionArguments + else {} ) conn = engine.connect() return conn diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index 8fc94eaf355..8aa968131f3 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -62,12 +62,11 @@ def get_connection_url_common(connection): if connection.username: url += f"{connection.username}" - if connection.password: - url += ( - f":{quote_plus(connection.password.get_secret_value())}" - if connection - else "" - ) + url += ( + f":{quote_plus(connection.password.get_secret_value())}" + if connection + else "" + ) url += "@" url += connection.hostPort