added-query-log-parser-in-postgres (#6750)

added-query-log-parser-in-postgres (#6750)
This commit is contained in:
Abhishek Pandey 2022-08-29 10:18:29 +05:30 committed by GitHub
parent 4c6f0c7bda
commit fde53809eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 386 additions and 0 deletions

View File

@ -0,0 +1,16 @@
source:
type: postgres-lineage
serviceName: local_postgres11
sourceConfig:
config:
type: DatabaseLineage
queryLogDuration: 1
resultLimit: 10000
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: INFO
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -0,0 +1,29 @@
source:
type: postgres-usage
serviceName: local_postgres
serviceConnection:
config:
type: Postgres
username: username
password: password
hostPort: localhost:5432
database: database_name
sourceConfig:
config:
queryLogDuration: 1
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: /tmp/postgres_usage
bulkSink:
type: metadata-usage
config:
filename: /tmp/postgres_usage
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -0,0 +1,52 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Postgres lineage module
"""
from typing import Iterable
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.postgres_query_parser import (
PostgresQueryParserSource,
)
from metadata.utils.sql_queries import POSTGRES_SQL_STATEMENT
class PostgresLineageSource(PostgresQueryParserSource, LineageSource):
sql_stmt = POSTGRES_SQL_STATEMENT
filters = """
AND (
s.query ILIKE 'create table %% as select %%'
OR s.query ILIKE 'insert %%'
)
"""
def next_record(self) -> Iterable[AddLineageRequest]:
"""
Based on the query logs, prepare the lineage
and send it to the sink
"""
for table_queries in self.get_table_query():
for table_query in table_queries.queries:
lineages = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
)
for lineage_request in lineages or []:
yield lineage_request

View File

@ -0,0 +1,199 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Postgres Query parser module
"""
import csv
import traceback
from abc import ABC
from datetime import datetime, timedelta
from typing import Iterable, Optional
from sqlalchemy.engine.base import Engine
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.utils.connections import get_connection
from metadata.utils.filters import filter_by_database, filter_by_schema
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class PostgresQueryParserSource(QueryParserSource, ABC):
"""
Postgres base for Usage and Lineage
"""
filters: str
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
# Postgres does not allow retrieval of data older than 7 days
# Update start and end based on this
duration = min(self.source_config.queryLogDuration, 6)
self.start, self.end = get_start_and_end(duration)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: PostgresConnection = config.serviceConnection.__root__.config
if not isinstance(connection, PostgresConnection):
raise InvalidSourceException(
f"Expected PostgresConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:
"""
returns sql statement to fetch query logs
"""
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
result_limit=self.config.sourceConfig.config.resultLimit,
filters=self.filters,
)
def get_table_query(self) -> Iterable[TableQuery]:
try:
if self.config.sourceConfig.config.queryLogFilePath:
table_query_list = []
with open(
self.config.sourceConfig.config.queryLogFilePath, "r"
) as query_log_file:
for i in csv.DictReader(query_log_file):
query_dict = dict(i)
analysis_date = (
datetime.utcnow()
if not query_dict.get("session_start_time")
else datetime.strptime(
query_dict.get("session_start_time"),
"%Y-%m-%d %H:%M:%S+%f",
)
)
query_dict["aborted"] = query_dict["sql_state_code"] == "00000"
if "statement" in query_dict["message"]:
query_dict["message"] = query_dict["message"].split(":")[1]
table_query_list.append(
TableQuery(
query=query_dict["message"],
userName=query_dict.get("user_name", ""),
startTime=query_dict.get("session_start_time", ""),
endTime=query_dict.get("log_time", ""),
analysisDate=analysis_date,
aborted=self.get_aborted_status(query_dict),
databaseName=self.get_database_name(query_dict),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(query_dict),
)
)
yield TableQueries(queries=table_query_list)
else:
database = self.config.serviceConnection.__root__.config.database
if database:
self.engine: Engine = get_connection(self.connection)
yield from self.process_table_query()
else:
query = "select datname from pg_catalog.pg_database"
results = self.engine.execute(query)
for res in results:
row = list(res)
logger.info(f"Ingesting from database: {row[0]}")
self.config.serviceConnection.__root__.config.database = row[0]
self.engine = get_connection(self.connection)
yield from self.process_table_query()
except Exception as err:
logger.error(f"Source usage processing error - {err}")
logger.debug(traceback.format_exc())
def process_table_query(self) -> Optional[Iterable[TableQuery]]:
daydiff = self.end - self.start
for i in range(daydiff.days):
logger.info(
f"Scanning query logs for {(self.start + timedelta(days=i)).date()} - "
f"{(self.start + timedelta(days=i+1)).date()}"
)
try:
with get_connection(self.connection).connect() as conn:
rows = conn.execute(
self.get_sql_statement(
start_time=self.start + timedelta(days=i),
end_time=self.start + timedelta(days=i + 2),
)
)
queries = []
for row in rows:
row = dict(row)
try:
if filter_by_database(
self.source_config.databaseFilterPattern,
self.get_database_name(row),
) or filter_by_schema(
self.source_config.schemaFilterPattern,
schema_name=row.get("schema_name"),
):
continue
end_time = row["start_time"] + timedelta(
milliseconds=row["total_exec_time"]
)
date_time = end_time.strftime("%m/%d/%Y, %H:%M:%S")
queries.append(
TableQuery(
query=row["query_text"],
userName=row["usename"],
startTime=str(row["start_time"]),
endTime=date_time,
analysisDate=row["start_time"],
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row),
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(str(err))
yield TableQueries(queries=queries)
except Exception as err:
logger.error(f"Source usage processing error - {err}")
logger.debug(traceback.format_exc())
def get_database_name(self, data: dict) -> str:
"""
Method to get database name
"""
key = "datname"
if self.config.sourceConfig.config.queryLogFilePath:
key = "database_name"
if not data[key] and self.connection.database:
return self.connection.database
return data[key]

View File

@ -0,0 +1,34 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Postgres usage module
"""
from metadata.ingestion.source.database.postgres_query_parser import (
PostgresQueryParserSource,
)
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.sql_queries import POSTGRES_SQL_STATEMENT
class PostgresUsageSource(PostgresQueryParserSource, UsageSource):
"""
Postgres class for Usage
"""
sql_stmt = POSTGRES_SQL_STATEMENT
filters = """
AND (
NOT s.query ILIKE 'create table %% as select %%'
OR NOT s.query ILIKE 'insert %%'
)
"""

View File

@ -461,3 +461,29 @@ TRINO_GET_COLUMNS = textwrap.dedent(
ORDER BY "ordinal_position" ASC
"""
)
POSTGRES_SQL_STATEMENT = textwrap.dedent(
"""
SELECT
u.usename,
d.datname,
s.query query_text,
a.query_start start_time,
s.total_exec_time,
s.mean_exec_time,
s.calls
FROM
pg_stat_statements s
JOIN pg_catalog.pg_database d ON s.dbid = d.oid
JOIN pg_catalog.pg_user u ON s.userid = u.usesysid
JOIN pg_catalog.pg_stat_activity a ON d.datname = a.datname
WHERE
a.query_start >= '{start_time}' AND
a.state_change < current_timestamp
AND s.query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND s.query NOT LIKE '/* {{"app": "dbt", %%}} */%%'
{filters}
LIMIT {result_limit}
"""
)

View File

@ -0,0 +1,30 @@
source:
type: postgres-usage
serviceName: "<service name>"
serviceConnection:
config:
type: Postgres
username: openmetadata_user
password: openmetadata_password
hostPort: localhost:5432
database: <database username>
sourceConfig:
config:
queryLogDuration: "<query log duration integer>"
stageFileLocation: "<path to store the stage file>"
resultLimit: "<query log limit integer>"
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: "/tmp/postgres_usage"
bulkSink:
type: metadata-usage
config:
filename: "/tmp/postgres_usage"
workflowConfig:
openMetadataServerConfig:
hostPort: "<OpenMetadata host and port>"
authProvider: "<OpenMetadata auth provider>"