diff --git a/ingestion/src/metadata/ingestion/source/database/postgres_lineage.py b/ingestion/src/metadata/ingestion/source/database/postgres_lineage.py index 6cc5f730e59..21b9d7136d7 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres_lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres_lineage.py @@ -28,8 +28,8 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource): filters = """ AND ( - s.query ILIKE 'create table %% as select %%' - OR s.query ILIKE 'insert %%' + s.query ILIKE '%%create table %% as select %%' + OR s.query ILIKE '%%insert %%' ) """ diff --git a/ingestion/src/metadata/ingestion/source/database/postgres_query_parser.py b/ingestion/src/metadata/ingestion/source/database/postgres_query_parser.py index 78b7ef46577..d881e1f749e 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres_query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres_query_parser.py @@ -14,7 +14,7 @@ Postgres Query parser module import csv import traceback from abc import ABC -from datetime import datetime, timedelta +from datetime import datetime from typing import Iterable, Optional from sqlalchemy.engine.base import Engine @@ -64,13 +64,11 @@ class PostgresQueryParserSource(QueryParserSource, ABC): ) return cls(config, metadata_config) - def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str: + def get_sql_statement(self) -> str: """ returns sql statement to fetch query logs """ return self.sql_stmt.format( - start_time=start_time, - end_time=end_time, result_limit=self.config.sourceConfig.config.resultLimit, filters=self.filters, ) @@ -135,65 +133,42 @@ class PostgresQueryParserSource(QueryParserSource, ABC): logger.debug(traceback.format_exc()) def process_table_query(self) -> Optional[Iterable[TableQuery]]: - daydiff = self.end - self.start - for i in range(daydiff.days): - logger.info( - f"Scanning query logs for {(self.start + timedelta(days=i)).date()} - " - f"{(self.start + timedelta(days=i+1)).date()}" - ) - try: - with get_connection(self.connection).connect() as conn: - rows = conn.execute( - self.get_sql_statement( - start_time=self.start + timedelta(days=i), - end_time=self.start + timedelta(days=i + 2), + try: + with get_connection(self.connection).connect() as conn: + rows = conn.execute(self.get_sql_statement()) + queries = [] + for row in rows: + row = dict(row) + try: + if filter_by_database( + self.source_config.databaseFilterPattern, + self.get_database_name(row), + ) or filter_by_schema( + self.source_config.schemaFilterPattern, + schema_name=row.get("schema_name"), + ): + continue + queries.append( + TableQuery( + query=row["query_text"], + userName=row["usename"], + analysisDate=datetime.now(), + aborted=self.get_aborted_status(row), + databaseName=self.get_database_name(row), + serviceName=self.config.serviceName, + databaseSchema=self.get_schema_name(row), + ) ) - ) - queries = [] - for row in rows: - row = dict(row) - try: - if filter_by_database( - self.source_config.databaseFilterPattern, - self.get_database_name(row), - ) or filter_by_schema( - self.source_config.schemaFilterPattern, - schema_name=row.get("schema_name"), - ): - continue - - end_time = row["start_time"] + timedelta( - milliseconds=row["total_exec_time"] - ) - date_time = end_time.strftime("%m/%d/%Y, %H:%M:%S") - queries.append( - TableQuery( - query=row["query_text"], - userName=row["usename"], - startTime=str(row["start_time"]), - endTime=date_time, - analysisDate=row["start_time"], - aborted=self.get_aborted_status(row), - databaseName=self.get_database_name(row), - serviceName=self.config.serviceName, - databaseSchema=self.get_schema_name(row), - ) - ) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(str(err)) - yield TableQueries(queries=queries) - except Exception as err: - logger.error(f"Source usage processing error - {err}") - logger.debug(traceback.format_exc()) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(str(err)) + yield TableQueries(queries=queries) + except Exception as err: + logger.error(f"Source usage processing error - {err}") + logger.debug(traceback.format_exc()) def get_database_name(self, data: dict) -> str: """ Method to get database name """ - key = "datname" - if self.config.sourceConfig.config.queryLogFilePath: - key = "database_name" - if not data[key] and self.connection.database: - return self.connection.database - return data[key] + return data.get("database_name") diff --git a/ingestion/src/metadata/ingestion/source/database/postgres_usage.py b/ingestion/src/metadata/ingestion/source/database/postgres_usage.py index 5c2c7bdaa6e..a5fd7788d78 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres_usage.py @@ -26,9 +26,4 @@ class PostgresUsageSource(PostgresQueryParserSource, UsageSource): """ sql_stmt = POSTGRES_SQL_STATEMENT - filters = """ - AND ( - NOT s.query ILIKE 'create table %% as select %%' - OR NOT s.query ILIKE 'insert %%' - ) - """ + filters = "" diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index 3730619e66a..42254da6171 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -468,22 +468,16 @@ POSTGRES_SQL_STATEMENT = textwrap.dedent( """ SELECT u.usename, - d.datname, + d.datname database_name, s.query query_text, - a.query_start start_time, - s.total_exec_time, - s.mean_exec_time, - s.calls + s.total_exec_time FROM pg_stat_statements s JOIN pg_catalog.pg_database d ON s.dbid = d.oid JOIN pg_catalog.pg_user u ON s.userid = u.usesysid - JOIN pg_catalog.pg_stat_activity a ON d.datname = a.datname WHERE - a.query_start >= '{start_time}' AND - a.state_change < current_timestamp - AND s.query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' - AND s.query NOT LIKE '/* {{"app": "dbt", %%}} */%%' + s.query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' AND + s.query NOT LIKE '/* {{"app": "dbt", %%}} */%%' {filters} LIMIT {result_limit} """ diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json index c0058c471df..6d068d87ffe 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json @@ -65,6 +65,12 @@ "title": "Supports Metadata Extraction", "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" }, + "supportsUsageExtraction": { + "$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction" + }, + "supportsLineageExtraction": { + "$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction" + }, "supportsProfiler": { "title": "Supports Profiler", "$ref": "../connectionBasicType.json#/definitions/supportsProfiler"