Fixes #9919: Cleaned up postgres lineage and fixed function in postgres query_parser (#13678)

* Replaced s.query with query_text in filters for clarity

* Removed redundant _iter function

* Updated process_table_query to return None when there are no queries to process

* Modified process_table_query to yield None if there are no queries

* Resolved formating issues and modified process_table_query to return None instead of yielding None

* pylint, fix bugs and improve code

* fix import

* pyformat

---------

Co-authored-by: ulixius9 <mayursingal9@gmail.com>
This commit is contained in:
Tom Bushell 2023-12-16 01:44:19 +11:00 committed by GitHub
parent 33f2fdd49c
commit bb3f9a4441
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 86 deletions

View File

@ -11,15 +11,17 @@
"""
Postgres lineage module
"""
import traceback
from datetime import datetime
from typing import Iterable
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresScheme,
)
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.postgres.pgspider.lineage import (
get_lineage_from_multi_tenant_table,
@ -55,19 +57,8 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource):
Based on the query logs, prepare the lineage
and send it to the sink
"""
for table_queries in self.get_table_query():
for table_query in table_queries.queries:
lineages = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=Dialect.POSTGRES,
)
for lineage_request in lineages or []:
yield lineage_request
yield from super()._iter()
if self.service_connection.scheme == PostgresScheme.pgspider_psycopg2:
lineages = get_lineage_from_multi_tenant_table(
@ -78,3 +69,30 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource):
for lineage_request in lineages or []:
yield lineage_request
def process_table_query(self) -> Iterable[TableQuery]:
"""
Process Query
"""
try:
with get_connection(self.service_connection).connect() as conn:
rows = conn.execute(self.get_sql_statement())
for row in rows:
row = dict(row)
try:
yield TableQuery(
query=row["query_text"],
userName=row["usename"],
analysisDate=datetime.now(),
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row),
duration=row.get("duration"),
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(str(err))
except Exception as err:
logger.error(f"Source usage processing error - {err}")
logger.debug(traceback.format_exc())

View File

@ -11,11 +11,9 @@
"""
Postgres Query parser module
"""
import csv
import traceback
from abc import ABC
from datetime import datetime
from typing import Iterable, Optional
from typing import Iterable
from sqlalchemy.engine.base import Engine
@ -25,7 +23,7 @@ from metadata.generated.schema.entity.services.connections.database.postgresConn
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
@ -75,46 +73,11 @@ class PostgresQueryParserSource(QueryParserSource, ABC):
time_column_name=get_postgres_time_column_name(engine=self.engine),
)
# pylint: disable=no-member
def get_table_query(self) -> Iterable[TableQuery]:
try:
if self.config.sourceConfig.config.queryLogFilePath:
table_query_list = []
with open(
self.config.sourceConfig.config.queryLogFilePath,
"r",
encoding="utf-8",
) as query_log_file:
for record in csv.DictReader(query_log_file):
query_dict = dict(record)
analysis_date = (
datetime.utcnow()
if not query_dict.get("session_start_time")
else datetime.strptime(
query_dict.get("session_start_time"),
"%Y-%m-%d %H:%M:%S+%f",
)
)
query_dict["aborted"] = query_dict["sql_state_code"] == "00000"
if "statement" in query_dict["message"]:
query_dict["message"] = query_dict["message"].split(":")[1]
table_query_list.append(
TableQuery(
query=query_dict["message"],
userName=query_dict.get("user_name", ""),
startTime=query_dict.get("session_start_time", ""),
endTime=query_dict.get("log_time", ""),
analysisDate=analysis_date,
aborted=self.get_aborted_status(query_dict),
databaseName=self.get_database_name(query_dict),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(query_dict),
)
)
yield TableQueries(queries=table_query_list)
yield from super().yield_table_queries_from_logs()
else:
database = self.config.serviceConnection.__root__.config.database
if database:
@ -133,37 +96,6 @@ class PostgresQueryParserSource(QueryParserSource, ABC):
logger.error(f"Source usage processing error - {err}")
logger.debug(traceback.format_exc())
def process_table_query(self) -> Optional[Iterable[TableQuery]]:
"""
Process Query
"""
try:
with get_connection(self.service_connection).connect() as conn:
rows = conn.execute(self.get_sql_statement())
queries = []
for row in rows:
row = dict(row)
try:
queries.append(
TableQuery(
query=row["query_text"],
userName=row["usename"],
analysisDate=datetime.now(),
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row),
duration=row.get("duration"),
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(str(err))
yield TableQueries(queries=queries)
except Exception as err:
logger.error(f"Source usage processing error - {err}")
logger.debug(traceback.format_exc())
@staticmethod
def get_database_name(data: dict) -> str:
"""

View File

@ -11,11 +11,20 @@
"""
Postgres usage module
"""
import traceback
from datetime import datetime
from typing import Iterable
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.postgres.queries import POSTGRES_SQL_STATEMENT
from metadata.ingestion.source.database.postgres.query_parser import (
PostgresQueryParserSource,
)
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class PostgresUsageSource(PostgresQueryParserSource, UsageSource):
@ -25,3 +34,35 @@ class PostgresUsageSource(PostgresQueryParserSource, UsageSource):
sql_stmt = POSTGRES_SQL_STATEMENT
filters = ""
def process_table_query(self) -> Iterable[TableQueries]:
"""
Process Query
"""
try:
with get_connection(self.service_connection).connect() as conn:
rows = conn.execute(self.get_sql_statement())
queries = []
for row in rows:
row = dict(row)
try:
queries.append(
TableQuery(
query=row["query_text"],
userName=row["usename"],
analysisDate=datetime.now(),
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row),
duration=row.get("duration"),
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(str(err))
if queries:
yield TableQueries(queries=queries)
except Exception as err:
logger.error(f"Source usage processing error - {err}")
logger.debug(traceback.format_exc())