postgres-sql-query-updated (#7421)

This commit is contained in:
Abhishek Pandey 2022-09-21 07:04:06 +05:30 committed by GitHub
parent 301433e0d0
commit d40be7a9aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 48 additions and 78 deletions

View File

@ -28,8 +28,8 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource):
filters = """
AND (
s.query ILIKE 'create table %% as select %%'
OR s.query ILIKE 'insert %%'
s.query ILIKE '%%create table %% as select %%'
OR s.query ILIKE '%%insert %%'
)
"""

View File

@ -14,7 +14,7 @@ Postgres Query parser module
import csv
import traceback
from abc import ABC
from datetime import datetime, timedelta
from datetime import datetime
from typing import Iterable, Optional
from sqlalchemy.engine.base import Engine
@ -64,13 +64,11 @@ class PostgresQueryParserSource(QueryParserSource, ABC):
)
return cls(config, metadata_config)
def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:
def get_sql_statement(self) -> str:
"""
returns sql statement to fetch query logs
"""
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
result_limit=self.config.sourceConfig.config.resultLimit,
filters=self.filters,
)
@ -135,65 +133,42 @@ class PostgresQueryParserSource(QueryParserSource, ABC):
logger.debug(traceback.format_exc())
def process_table_query(self) -> Optional[Iterable[TableQuery]]:
daydiff = self.end - self.start
for i in range(daydiff.days):
logger.info(
f"Scanning query logs for {(self.start + timedelta(days=i)).date()} - "
f"{(self.start + timedelta(days=i+1)).date()}"
)
try:
with get_connection(self.connection).connect() as conn:
rows = conn.execute(
self.get_sql_statement(
start_time=self.start + timedelta(days=i),
end_time=self.start + timedelta(days=i + 2),
try:
with get_connection(self.connection).connect() as conn:
rows = conn.execute(self.get_sql_statement())
queries = []
for row in rows:
row = dict(row)
try:
if filter_by_database(
self.source_config.databaseFilterPattern,
self.get_database_name(row),
) or filter_by_schema(
self.source_config.schemaFilterPattern,
schema_name=row.get("schema_name"),
):
continue
queries.append(
TableQuery(
query=row["query_text"],
userName=row["usename"],
analysisDate=datetime.now(),
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row),
)
)
)
queries = []
for row in rows:
row = dict(row)
try:
if filter_by_database(
self.source_config.databaseFilterPattern,
self.get_database_name(row),
) or filter_by_schema(
self.source_config.schemaFilterPattern,
schema_name=row.get("schema_name"),
):
continue
end_time = row["start_time"] + timedelta(
milliseconds=row["total_exec_time"]
)
date_time = end_time.strftime("%m/%d/%Y, %H:%M:%S")
queries.append(
TableQuery(
query=row["query_text"],
userName=row["usename"],
startTime=str(row["start_time"]),
endTime=date_time,
analysisDate=row["start_time"],
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row),
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(str(err))
yield TableQueries(queries=queries)
except Exception as err:
logger.error(f"Source usage processing error - {err}")
logger.debug(traceback.format_exc())
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(str(err))
yield TableQueries(queries=queries)
except Exception as err:
logger.error(f"Source usage processing error - {err}")
logger.debug(traceback.format_exc())
def get_database_name(self, data: dict) -> str:
"""
Method to get database name
"""
key = "datname"
if self.config.sourceConfig.config.queryLogFilePath:
key = "database_name"
if not data[key] and self.connection.database:
return self.connection.database
return data[key]
return data.get("database_name")

View File

@ -26,9 +26,4 @@ class PostgresUsageSource(PostgresQueryParserSource, UsageSource):
"""
sql_stmt = POSTGRES_SQL_STATEMENT
filters = """
AND (
NOT s.query ILIKE 'create table %% as select %%'
OR NOT s.query ILIKE 'insert %%'
)
"""
filters = ""

View File

@ -468,22 +468,16 @@ POSTGRES_SQL_STATEMENT = textwrap.dedent(
"""
SELECT
u.usename,
d.datname,
d.datname database_name,
s.query query_text,
a.query_start start_time,
s.total_exec_time,
s.mean_exec_time,
s.calls
s.total_exec_time
FROM
pg_stat_statements s
JOIN pg_catalog.pg_database d ON s.dbid = d.oid
JOIN pg_catalog.pg_user u ON s.userid = u.usesysid
JOIN pg_catalog.pg_stat_activity a ON d.datname = a.datname
WHERE
a.query_start >= '{start_time}' AND
a.state_change < current_timestamp
AND s.query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND s.query NOT LIKE '/* {{"app": "dbt", %%}} */%%'
s.query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' AND
s.query NOT LIKE '/* {{"app": "dbt", %%}} */%%'
{filters}
LIMIT {result_limit}
"""

View File

@ -65,6 +65,12 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsUsageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction"
},
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"