From fde53809ebfeee7f44b13777d7d91aad384404e3 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Mon, 29 Aug 2022 10:18:29 +0530 Subject: [PATCH] added-query-log-parser-in-postgres (#6750) added-query-log-parser-in-postgres (#6750) --- .../examples/workflows/postgres_lineage.yaml | 16 ++ .../examples/workflows/postgres_usage.yaml | 29 +++ .../source/database/postgres_lineage.py | 52 +++++ .../source/database/postgres_query_parser.py | 199 ++++++++++++++++++ .../source/database/postgres_usage.py | 34 +++ ingestion/src/metadata/utils/sql_queries.py | 26 +++ .../ingestion/connectors/postgres/usage.yaml | 30 +++ 7 files changed, 386 insertions(+) create mode 100644 ingestion/examples/workflows/postgres_lineage.yaml create mode 100644 ingestion/examples/workflows/postgres_usage.yaml create mode 100644 ingestion/src/metadata/ingestion/source/database/postgres_lineage.py create mode 100644 ingestion/src/metadata/ingestion/source/database/postgres_query_parser.py create mode 100644 ingestion/src/metadata/ingestion/source/database/postgres_usage.py create mode 100644 openmetadata-docs/ingestion/connectors/postgres/usage.yaml diff --git a/ingestion/examples/workflows/postgres_lineage.yaml b/ingestion/examples/workflows/postgres_lineage.yaml new file mode 100644 index 00000000000..7a285d7f203 --- /dev/null +++ b/ingestion/examples/workflows/postgres_lineage.yaml @@ -0,0 +1,16 @@ +source: + type: postgres-lineage + serviceName: local_postgres11 + sourceConfig: + config: + type: DatabaseLineage + queryLogDuration: 1 + resultLimit: 10000 +sink: + type: metadata-rest + config: {} +workflowConfig: + loggerLevel: INFO + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth diff --git a/ingestion/examples/workflows/postgres_usage.yaml b/ingestion/examples/workflows/postgres_usage.yaml new file mode 100644 index 00000000000..1ae1b3ea013 --- /dev/null +++ b/ingestion/examples/workflows/postgres_usage.yaml @@ -0,0 +1,29 @@ +source: + type: postgres-usage + serviceName: local_postgres + serviceConnection: + config: + type: Postgres + username: username + password: password + hostPort: localhost:5432 + database: database_name + sourceConfig: + config: + queryLogDuration: 1 +processor: + type: query-parser + config: {} +stage: + type: table-usage + config: + filename: /tmp/postgres_usage +bulkSink: + type: metadata-usage + config: + filename: /tmp/postgres_usage +workflowConfig: + loggerLevel: DEBUG + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth diff --git a/ingestion/src/metadata/ingestion/source/database/postgres_lineage.py b/ingestion/src/metadata/ingestion/source/database/postgres_lineage.py new file mode 100644 index 00000000000..6cc5f730e59 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/postgres_lineage.py @@ -0,0 +1,52 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Postgres lineage module +""" +from typing import Iterable + +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query +from metadata.ingestion.source.database.lineage_source import LineageSource +from metadata.ingestion.source.database.postgres_query_parser import ( + PostgresQueryParserSource, +) +from metadata.utils.sql_queries import POSTGRES_SQL_STATEMENT + + +class PostgresLineageSource(PostgresQueryParserSource, LineageSource): + + sql_stmt = POSTGRES_SQL_STATEMENT + + filters = """ + AND ( + s.query ILIKE 'create table %% as select %%' + OR s.query ILIKE 'insert %%' + ) + """ + + def next_record(self) -> Iterable[AddLineageRequest]: + """ + Based on the query logs, prepare the lineage + and send it to the sink + """ + for table_queries in self.get_table_query(): + for table_query in table_queries.queries: + lineages = get_lineage_by_query( + self.metadata, + query=table_query.query, + service_name=table_query.serviceName, + database_name=table_query.databaseName, + schema_name=table_query.databaseSchema, + ) + + for lineage_request in lineages or []: + yield lineage_request diff --git a/ingestion/src/metadata/ingestion/source/database/postgres_query_parser.py b/ingestion/src/metadata/ingestion/source/database/postgres_query_parser.py new file mode 100644 index 00000000000..78b7ef46577 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/postgres_query_parser.py @@ -0,0 +1,199 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Postgres Query parser module +""" +import csv +import traceback +from abc import ABC +from datetime import datetime, timedelta +from typing import Iterable, Optional + +from sqlalchemy.engine.base import Engine + +from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( + PostgresConnection, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.database.query_parser_source import QueryParserSource +from metadata.utils.connections import get_connection +from metadata.utils.filters import filter_by_database, filter_by_schema +from metadata.utils.helpers import get_start_and_end +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class PostgresQueryParserSource(QueryParserSource, ABC): + """ + Postgres base for Usage and Lineage + """ + + filters: str + + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__(config, metadata_config) + + # Postgres does not allow retrieval of data older than 7 days + # Update start and end based on this + duration = min(self.source_config.queryLogDuration, 6) + self.start, self.end = get_start_and_end(duration) + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: PostgresConnection = config.serviceConnection.__root__.config + if not isinstance(connection, PostgresConnection): + raise InvalidSourceException( + f"Expected PostgresConnection, but got {connection}" + ) + return cls(config, metadata_config) + + def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str: + """ + returns sql statement to fetch query logs + """ + return self.sql_stmt.format( + start_time=start_time, + end_time=end_time, + result_limit=self.config.sourceConfig.config.resultLimit, + filters=self.filters, + ) + + def get_table_query(self) -> Iterable[TableQuery]: + + try: + if self.config.sourceConfig.config.queryLogFilePath: + table_query_list = [] + with open( + self.config.sourceConfig.config.queryLogFilePath, "r" + ) as query_log_file: + + for i in csv.DictReader(query_log_file): + query_dict = dict(i) + + analysis_date = ( + datetime.utcnow() + if not query_dict.get("session_start_time") + else datetime.strptime( + query_dict.get("session_start_time"), + "%Y-%m-%d %H:%M:%S+%f", + ) + ) + + query_dict["aborted"] = query_dict["sql_state_code"] == "00000" + if "statement" in query_dict["message"]: + query_dict["message"] = query_dict["message"].split(":")[1] + + table_query_list.append( + TableQuery( + query=query_dict["message"], + userName=query_dict.get("user_name", ""), + startTime=query_dict.get("session_start_time", ""), + endTime=query_dict.get("log_time", ""), + analysisDate=analysis_date, + aborted=self.get_aborted_status(query_dict), + databaseName=self.get_database_name(query_dict), + serviceName=self.config.serviceName, + databaseSchema=self.get_schema_name(query_dict), + ) + ) + yield TableQueries(queries=table_query_list) + + else: + database = self.config.serviceConnection.__root__.config.database + if database: + self.engine: Engine = get_connection(self.connection) + yield from self.process_table_query() + else: + query = "select datname from pg_catalog.pg_database" + results = self.engine.execute(query) + for res in results: + row = list(res) + logger.info(f"Ingesting from database: {row[0]}") + self.config.serviceConnection.__root__.config.database = row[0] + self.engine = get_connection(self.connection) + yield from self.process_table_query() + + except Exception as err: + logger.error(f"Source usage processing error - {err}") + logger.debug(traceback.format_exc()) + + def process_table_query(self) -> Optional[Iterable[TableQuery]]: + daydiff = self.end - self.start + for i in range(daydiff.days): + logger.info( + f"Scanning query logs for {(self.start + timedelta(days=i)).date()} - " + f"{(self.start + timedelta(days=i+1)).date()}" + ) + try: + with get_connection(self.connection).connect() as conn: + rows = conn.execute( + self.get_sql_statement( + start_time=self.start + timedelta(days=i), + end_time=self.start + timedelta(days=i + 2), + ) + ) + queries = [] + for row in rows: + row = dict(row) + try: + if filter_by_database( + self.source_config.databaseFilterPattern, + self.get_database_name(row), + ) or filter_by_schema( + self.source_config.schemaFilterPattern, + schema_name=row.get("schema_name"), + ): + continue + + end_time = row["start_time"] + timedelta( + milliseconds=row["total_exec_time"] + ) + date_time = end_time.strftime("%m/%d/%Y, %H:%M:%S") + queries.append( + TableQuery( + query=row["query_text"], + userName=row["usename"], + startTime=str(row["start_time"]), + endTime=date_time, + analysisDate=row["start_time"], + aborted=self.get_aborted_status(row), + databaseName=self.get_database_name(row), + serviceName=self.config.serviceName, + databaseSchema=self.get_schema_name(row), + ) + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(str(err)) + yield TableQueries(queries=queries) + except Exception as err: + logger.error(f"Source usage processing error - {err}") + logger.debug(traceback.format_exc()) + + def get_database_name(self, data: dict) -> str: + """ + Method to get database name + """ + key = "datname" + if self.config.sourceConfig.config.queryLogFilePath: + key = "database_name" + if not data[key] and self.connection.database: + return self.connection.database + return data[key] diff --git a/ingestion/src/metadata/ingestion/source/database/postgres_usage.py b/ingestion/src/metadata/ingestion/source/database/postgres_usage.py new file mode 100644 index 00000000000..5c2c7bdaa6e --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/postgres_usage.py @@ -0,0 +1,34 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Postgres usage module +""" + + +from metadata.ingestion.source.database.postgres_query_parser import ( + PostgresQueryParserSource, +) +from metadata.ingestion.source.database.usage_source import UsageSource +from metadata.utils.sql_queries import POSTGRES_SQL_STATEMENT + + +class PostgresUsageSource(PostgresQueryParserSource, UsageSource): + """ + Postgres class for Usage + """ + + sql_stmt = POSTGRES_SQL_STATEMENT + filters = """ + AND ( + NOT s.query ILIKE 'create table %% as select %%' + OR NOT s.query ILIKE 'insert %%' + ) + """ diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index 72409a1eeca..829238fe2e8 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -461,3 +461,29 @@ TRINO_GET_COLUMNS = textwrap.dedent( ORDER BY "ordinal_position" ASC """ ) + + +POSTGRES_SQL_STATEMENT = textwrap.dedent( + """ + SELECT + u.usename, + d.datname, + s.query query_text, + a.query_start start_time, + s.total_exec_time, + s.mean_exec_time, + s.calls + FROM + pg_stat_statements s + JOIN pg_catalog.pg_database d ON s.dbid = d.oid + JOIN pg_catalog.pg_user u ON s.userid = u.usesysid + JOIN pg_catalog.pg_stat_activity a ON d.datname = a.datname + WHERE + a.query_start >= '{start_time}' AND + a.state_change < current_timestamp + AND s.query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' + AND s.query NOT LIKE '/* {{"app": "dbt", %%}} */%%' + {filters} + LIMIT {result_limit} + """ +) diff --git a/openmetadata-docs/ingestion/connectors/postgres/usage.yaml b/openmetadata-docs/ingestion/connectors/postgres/usage.yaml new file mode 100644 index 00000000000..7b2189e8247 --- /dev/null +++ b/openmetadata-docs/ingestion/connectors/postgres/usage.yaml @@ -0,0 +1,30 @@ +source: + type: postgres-usage + serviceName: "" + serviceConnection: + config: + type: Postgres + username: openmetadata_user + password: openmetadata_password + hostPort: localhost:5432 + database: + sourceConfig: + config: + queryLogDuration: "" + stageFileLocation: "" + resultLimit: "" +processor: + type: query-parser + config: {} +stage: + type: table-usage + config: + filename: "/tmp/postgres_usage" +bulkSink: + type: metadata-usage + config: + filename: "/tmp/postgres_usage" +workflowConfig: + openMetadataServerConfig: + hostPort: "" + authProvider: ""