diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json index f78cedd772b..c675d8e42f0 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json @@ -7,9 +7,7 @@ "databaseUsageConfigType": { "description": "Database Source Config Usage Pipeline type", "type": "string", - "enum": [ - "DatabaseUsage" - ], + "enum": ["DatabaseUsage"], "default": "DatabaseUsage" } }, @@ -33,6 +31,10 @@ "description": "Configuration to set the limit for query logs", "type": "integer", "default": "100" + }, + "queryLogFilePath": { + "description": "Configuration to set the file path for query logs", + "type": "string" } }, "additionalProperties": false diff --git a/ingestion/examples/workflows/mssql_usage.json b/ingestion/examples/workflows/mssql_usage.json index 8c2380ee90d..931e11f5a59 100644 --- a/ingestion/examples/workflows/mssql_usage.json +++ b/ingestion/examples/workflows/mssql_usage.json @@ -13,7 +13,8 @@ }, "sourceConfig": { "config": { - "queryLogDuration": "1" + "queryLogDuration": "1", + "queryLogFilePath": "" } } }, diff --git a/ingestion/examples/workflows/query_log_usage.json b/ingestion/examples/workflows/query_log_usage.json new file mode 100644 index 00000000000..e0ccecf461d --- /dev/null +++ b/ingestion/examples/workflows/query_log_usage.json @@ -0,0 +1,46 @@ +{ + "source": { + "type": "query-log-usage", + "serviceName": "local_mysql", + "serviceConnection": { + "config": { + "type": "Mysql", + "username": "openmetadata_user", + "password": "openmetadata_password", + "hostPort": "localhost:3306", + "connectionOptions": {}, + "connectionArguments": {} + } + }, + "sourceConfig": { + "config": { + "queryLogDuration": "1", + "queryLogFilePath": "" + } + } + }, + "processor": { + "type": "query-parser", + "config": { + "filter": "" + } + }, + "stage": { + "type": "table-usage", + "config": { + "filename": "/tmp/query_log_usage" + } + }, + "bulkSink": { + "type": "metadata-usage", + "config": { + "filename": "/tmp/query_log_usage" + } + }, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" + } + } +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/clickhouse_usage.py b/ingestion/src/metadata/ingestion/source/clickhouse_usage.py index f213e918f8c..b009648bcc9 100644 --- a/ingestion/src/metadata/ingestion/source/clickhouse_usage.py +++ b/ingestion/src/metadata/ingestion/source/clickhouse_usage.py @@ -13,7 +13,7 @@ Clickhouse usage module """ import ast -from typing import Any, Dict, Iterable +from typing import Iterable from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( ClickhouseConnection, @@ -27,14 +27,15 @@ from metadata.ingestion.api.source import InvalidSourceException, Source, Source # This import verifies that the dependencies are available. from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus +from metadata.ingestion.source.usage_source import UsageSource from metadata.utils.connections import get_connection, test_connection from metadata.utils.helpers import get_start_and_end from metadata.utils.sql_queries import CLICKHOUSE_SQL_USAGE_STATEMENT -class ClickhouseUsageSource(Source[TableQuery]): +class ClickhouseUsageSource(UsageSource): def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): - super().__init__() + super().__init__(config, metadata_config) self.config = config self.connection = config.serviceConnection.__root__.config start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) @@ -56,15 +57,6 @@ class ClickhouseUsageSource(Source[TableQuery]): return cls(config, metadata_config) - def prepare(self): - return super().prepare() - - def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]: - - rows = self.engine.execute(self.sql_stmt) - for row in rows: - yield row - def next_record(self) -> Iterable[TableQuery]: """ Using itertools.groupby and raw level iterator, @@ -89,20 +81,3 @@ class ClickhouseUsageSource(Source[TableQuery]): service_name=self.config.serviceName, ) yield table_query - - def get_report(self): - """ - get report - - Returns: - """ - return self.report - - def close(self): - pass - - def get_status(self) -> SourceStatus: - return self.report - - def test_connection(self) -> None: - test_connection(self.engine) diff --git a/ingestion/src/metadata/ingestion/source/mssql_usage.py b/ingestion/src/metadata/ingestion/source/mssql_usage.py index 95f31392c08..7d6970eaf2c 100644 --- a/ingestion/src/metadata/ingestion/source/mssql_usage.py +++ b/ingestion/src/metadata/ingestion/source/mssql_usage.py @@ -11,36 +11,30 @@ """ MSSQL usage module """ - -from typing import Any, Dict, Iterable - from metadata.generated.schema.entity.services.connections.database.mssqlConnection import ( MssqlConnection, ) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.usage_source import UsageSource # This import verifies that the dependencies are available. -from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus -from metadata.utils.connections import get_connection, test_connection from metadata.utils.helpers import get_start_and_end from metadata.utils.sql_queries import MSSQL_SQL_USAGE_STATEMENT -class MssqlUsageSource(Source[TableQuery]): - def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): - super().__init__() - self.config = config - self.connection = config.serviceConnection.__root__.config - start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) +class MssqlUsageSource(UsageSource): + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__(config, metadata_config) + start, end = get_start_and_end(config.sourceConfig.config.queryLogDuration) self.analysis_date = start self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format(start_date=start, end_date=end) - self.report = SQLSourceStatus() - self.engine = get_connection(self.connection) @classmethod def create(cls, config_dict, metadata_config: WorkflowConfig): @@ -52,53 +46,3 @@ class MssqlUsageSource(Source[TableQuery]): f"Expected MssqlConnection, but got {connection}" ) return cls(config, metadata_config) - - def prepare(self): - return super().prepare() - - def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]: - - rows = self.engine.execute(self.sql_stmt) - for row in rows: - yield row - - def next_record(self) -> Iterable[TableQuery]: - """ - Using itertools.groupby and raw level iterator, - it groups to table and yields TableMetadata - :return: - """ - for row in self._get_raw_extract_iter(): - table_query = TableQuery( - query=row["query_type"], - user_name=row["user_name"], - starttime=str(row["start_time"]), - endtime=str(row["end_time"]), - analysis_date=self.analysis_date, - aborted=row["aborted"], - database=row["database_name"], - sql=row["query_text"], - service_name=self.config.serviceName, - ) - if row["schema_name"] is not None: - self.report.scanned(f"{row['database_name']}.{row['schema_name']}") - else: - self.report.scanned(f"{row['database_name']}") - yield table_query - - def get_report(self): - """ - get report - - Returns: - """ - return self.report - - def close(self): - pass - - def get_status(self) -> SourceStatus: - return self.report - - def test_connection(self) -> None: - test_connection(self.engine) diff --git a/ingestion/src/metadata/ingestion/source/query_log_usage.py b/ingestion/src/metadata/ingestion/source/query_log_usage.py new file mode 100644 index 00000000000..de8b6778104 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/query_log_usage.py @@ -0,0 +1,35 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Common Query Log Connector +""" +from datetime import datetime + +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig +from metadata.ingestion.source.usage_source import UsageSource + + +class QueryLogUsageSource(UsageSource): + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__(config, metadata_config) + self.analysis_date = datetime.today().strftime("%Y-%m-%d %H:%M:%S") + + @classmethod + def create(cls, config_dict, metadata_config: WorkflowConfig): + """Create class instance""" + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/redshift_usage.py b/ingestion/src/metadata/ingestion/source/redshift_usage.py index bfeb7f8e856..09038b2ab30 100644 --- a/ingestion/src/metadata/ingestion/source/redshift_usage.py +++ b/ingestion/src/metadata/ingestion/source/redshift_usage.py @@ -13,7 +13,7 @@ Redshift usage module """ # This import verifies that the dependencies are available. -from typing import Any, Dict, Iterable, Iterator, Union +from typing import Iterator, Union from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( RedshiftConnection, @@ -25,11 +25,8 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus -from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus -from metadata.utils.connections import get_connection, test_connection +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.usage_source import UsageSource from metadata.utils.helpers import get_start_and_end # pylint: disable=useless-super-delegation @@ -39,7 +36,7 @@ from metadata.utils.sql_queries import REDSHIFT_SQL_STATEMENT logger = ingestion_logger() -class RedshiftUsageSource(Source[TableQuery]): +class RedshiftUsageSource(UsageSource): # SELECT statement from mysql information_schema to extract table and column metadata SQL_STATEMENT = REDSHIFT_SQL_STATEMENT # CONFIG KEYS @@ -52,11 +49,7 @@ class RedshiftUsageSource(Source[TableQuery]): DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()" def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): - super().__init__() - self.config = config - self.service_connection = config.serviceConnection.__root__.config - self.metadata_config = metadata_config - self.metadata = OpenMetadata(metadata_config) + super().__init__(config, metadata_config) start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format( start_time=start, end_time=end @@ -64,8 +57,6 @@ class RedshiftUsageSource(Source[TableQuery]): self.analysis_date = start self._extract_iter: Union[None, Iterator] = None self._database = "redshift" - self.status = SQLSourceStatus() - self.engine = get_connection(self.service_connection) @classmethod def create(cls, config_dict, metadata_config: WorkflowConfig): @@ -76,40 +67,3 @@ class RedshiftUsageSource(Source[TableQuery]): f"Expected RedshiftConnection, but got {connection}" ) return cls(config, metadata_config) - - def prepare(self): - pass - - def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]: - - rows = self.engine.execute(self.sql_stmt) - for row in rows: - yield row - - def next_record(self) -> Iterable[TableQuery]: - """ - Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata - :return: - """ - for row in self._get_raw_extract_iter(): - tq = TableQuery( - query=row["query"], - user_name=row["usename"], - starttime=str(row["starttime"]), - endtime=str(row["endtime"]), - analysis_date=str(self.analysis_date), - database=self.service_connection.database, - aborted=row["aborted"], - sql=row["querytxt"], - service_name=self.config.serviceName, - ) - yield tq - - def close(self): - pass - - def get_status(self) -> SourceStatus: - return self.status - - def test_connection(self) -> None: - test_connection(self.engine) diff --git a/ingestion/src/metadata/ingestion/source/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/snowflake_usage.py index 6e7970af6f1..049f34399d2 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/snowflake_usage.py @@ -14,7 +14,7 @@ Snowflake usage module import traceback from datetime import timedelta -from typing import Any, Dict, Iterable, Iterator, Union +from typing import Iterable, Iterator, Union from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( SnowflakeConnection, @@ -26,12 +26,11 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException # This import verifies that the dependencies are available. from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus -from metadata.utils.connections import get_connection, test_connection +from metadata.ingestion.source.usage_source import UsageSource from metadata.utils.helpers import get_start_and_end from metadata.utils.logger import ingestion_logger from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT @@ -39,8 +38,7 @@ from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT logger = ingestion_logger() -class SnowflakeUsageSource(Source[TableQuery]): - +class SnowflakeUsageSource(UsageSource): # SELECT statement from mysql information_schema # to extract table and column metadata SQL_STATEMENT = SNOWFLAKE_SQL_STATEMENT @@ -55,13 +53,10 @@ class SnowflakeUsageSource(Source[TableQuery]): DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()" def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): - super().__init__() - self.config = config - self.service_connection = config.serviceConnection.__root__.config + super().__init__(config, metadata_config) start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) end = end + timedelta(days=1) self.analysis_date = start - self.metadata_config = metadata_config self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format( start_date=start, end_date=end, @@ -69,8 +64,6 @@ class SnowflakeUsageSource(Source[TableQuery]): ) self._extract_iter: Union[None, Iterator] = None self._database = "Snowflake" - self.report = SQLSourceStatus() - self.engine = get_connection(self.service_connection) @classmethod def create(cls, config_dict, metadata_config: WorkflowConfig): @@ -82,15 +75,6 @@ class SnowflakeUsageSource(Source[TableQuery]): ) return cls(config, metadata_config) - def prepare(self): - pass - - def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]: - - rows = self.engine.execute(self.sql_stmt) - for row in rows: - yield row - def next_record(self) -> Iterable[TableQuery]: """ Using itertools.groupby and raw level iterator, @@ -121,20 +105,3 @@ class SnowflakeUsageSource(Source[TableQuery]): except Exception as err: logger.debug(traceback.format_exc()) logger.debug(repr(err)) - - def get_report(self): - """ - get report - - Returns: - """ - return self.report - - def test_connection(self) -> None: - test_connection(self.engine) - - def close(self): - pass - - def get_status(self) -> SourceStatus: - return self.report diff --git a/ingestion/src/metadata/ingestion/source/usage_source.py b/ingestion/src/metadata/ingestion/source/usage_source.py new file mode 100644 index 00000000000..b332bd6dbfd --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/usage_source.py @@ -0,0 +1,113 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Usage Souce Module +""" +import csv +from typing import Any, Dict, Iterable + +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus + +# This import verifies that the dependencies are available. +from metadata.ingestion.models.table_queries import TableQuery +from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus +from metadata.utils.connections import get_connection, test_connection +from metadata.utils.helpers import get_start_and_end + + +class UsageSource(Source[TableQuery]): + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__() + self.config = config + self.metadata_config = metadata_config + self.connection = config.serviceConnection.__root__.config + start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) + self.analysis_date = start + self.report = SQLSourceStatus() + self.engine = get_connection(self.connection) + + def prepare(self): + return super().prepare() + + def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]: + if self.config.sourceConfig.config.queryLogFilePath: + with open(self.config.sourceConfig.config.queryLogFilePath, "r") as fin: + for i in csv.DictReader(fin): + query_dict = dict(i) + row = { + "query_type": query_dict.get("query"), + "user_name": query_dict.get("user_name", ""), + "start_time": query_dict.get("start_time", ""), + "end_time": query_dict.get("end_time", ""), + "aborted": query_dict.get("aborted", False), + "database_name": query_dict.get( + "database_name", + self.connection.database + if self.connection.database + else "default", + ), + "query_text": query_dict.get("query"), + "schema_name": query_dict.get("schema_name"), + } + yield row + else: + rows = self.engine.execute(self.sql_stmt) + for row in rows: + yield row + + def next_record(self) -> Iterable[TableQuery]: + """ + Using itertools.groupby and raw level iterator, + it groups to table and yields TableMetadata + :return: + """ + + for row in self._get_raw_extract_iter(): + table_query = TableQuery( + query=row["query_type"], + user_name=row["user_name"], + starttime=str(row["start_time"]), + endtime=str(row["end_time"]), + analysis_date=self.analysis_date, + aborted=row["aborted"], + database=row["database_name"], + sql=row["query_text"], + service_name=self.config.serviceName, + ) + if not row["schema_name"]: + self.report.scanned(f"{row['database_name']}.{row['schema_name']}") + else: + self.report.scanned(f"{row['database_name']}") + yield table_query + + def get_report(self): + """ + get report + + Returns: + """ + return self.report + + def close(self): + pass + + def get_status(self) -> SourceStatus: + return self.report + + def test_connection(self) -> None: + test_connection(self.engine) diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index 6347ba9a542..2c09caeeaa7 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -2,16 +2,16 @@ import textwrap REDSHIFT_SQL_STATEMENT = """ SELECT DISTINCT ss.userid, - ss.query, - sui.usename, + ss.query query_type, + sui.usename user_name, ss.tbl, - sq.querytxt, - sti.database, - sti.schema, + sq.querytxt query_text, + sti.database database_name, + sti.schema schema_name, sti.table, - sq.starttime, - sq.endtime, - sq.aborted + sq.starttime start_time, + sq.endtime end_time, + sq.aborted aborted FROM stl_scan ss JOIN svv_table_info sti ON ss.tbl = sti.table_id JOIN stl_query sq ON ss.query = sq.query