Fix #15208: Lineage - log query parsing issue summary (#15921)

This commit is contained in:
Mayur Singal 2024-04-18 19:07:51 +05:30 committed by GitHub
parent b8dc12b30b
commit 18da8a5964
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 95 additions and 10 deletions

View File

@ -12,7 +12,9 @@
Models related to lineage parsing
"""
from enum import Enum
from typing import Dict
from typing import Dict, List, Optional
from pydantic import BaseModel, Extra, Field
from metadata.generated.schema.entity.services.connections.database.athenaConnection import (
AthenaType,
@ -62,6 +64,7 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon
from metadata.generated.schema.entity.services.connections.database.sqliteConnection import (
SQLiteType,
)
from metadata.utils.singleton import Singleton
class Dialect(Enum):
@ -126,3 +129,33 @@ class ConnectionTypeDialectMapper:
Returns: a dialect
"""
return MAP_CONNECTION_TYPE_DIALECT.get(connection_type, Dialect.ANSI)
class QueryParsingError(BaseModel):
"""
Represents an error that occurs during query parsing.
Attributes:
query (str): The query text of the failed query.
error (str): The error message of the failed query.
"""
class Config:
extra = Extra.forbid
query: str = Field(..., description="query text of the failed query")
error: Optional[str] = Field(..., description="error message of the failed query")
class QueryParsingFailures(metaclass=Singleton):
"""Tracks the Queries that failed to parse."""
def __init__(self):
"""Initializes the list of parsing failures."""
self._query_list: List[QueryParsingError] = []
def add(self, parsing_error: QueryParsingError):
self._query_list.append(parsing_error)
def __iter__(self):
return iter(self._query_list)

View File

@ -67,6 +67,8 @@ class LineageParser:
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
):
self.query = query
self.query_parsing_success = True
self.query_parsing_failure_reason = None
self._clean_query = self.clean_raw_query(query)
self.parser = self._evaluate_best_parser(
self._clean_query, dialect=dialect, timeout_seconds=timeout_seconds
@ -400,9 +402,8 @@ class LineageParser:
return clean_query.strip()
@staticmethod
def _evaluate_best_parser(
query: str, dialect: Dialect, timeout_seconds: int
self, query: str, dialect: Dialect, timeout_seconds: int
) -> Optional[LineageRunner]:
if query is None:
return None
@ -424,15 +425,19 @@ class LineageParser:
)
)
except TimeoutError:
logger.debug(
f"Lineage with SqlFluff failed for the [{dialect.value}] query: [{query}]: "
self.query_parsing_success = False
self.query_parsing_failure_reason = (
f"Lineage with SqlFluff failed for the [{dialect.value}]. "
f"Parser has been running for more than {timeout_seconds} seconds."
)
logger.debug(f"{self.query_parsing_failure_reason}] query: [{query}]")
lr_sqlfluff = None
except Exception:
logger.debug(
f"Lineage with SqlFluff failed for the [{dialect.value}] query: [{query}]"
self.query_parsing_success = False
self.query_parsing_failure_reason = (
f"Lineage with SqlFluff failed for the [{dialect.value}]"
)
logger.debug(f"{self.query_parsing_failure_reason} query: [{query}]")
lr_sqlfluff = None
lr_sqlparser = LineageRunner(query)
@ -451,10 +456,12 @@ class LineageParser:
if lr_sqlfluff:
# if sqlparser retrieve more lineage info that sqlfluff
if sqlparser_count > sqlfluff_count:
logger.debug(
self.query_parsing_success = False
self.query_parsing_failure_reason = (
"Lineage computed with SqlFluff did not perform as expected "
f"for the [{dialect.value}] query: [{query}]"
f"for the [{dialect.value}]"
)
logger.debug(f"{self.query_parsing_failure_reason} query: [{query}]")
return lr_sqlparser
return lr_sqlfluff
return lr_sqlparser

View File

@ -27,7 +27,11 @@ from metadata.generated.schema.type.entityLineage import (
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.models import (
Dialect,
QueryParsingError,
QueryParsingFailures,
)
from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT, LineageParser
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
@ -365,6 +369,7 @@ def populate_column_lineage_map(raw_column_lineage):
return lineage_map
# pylint: disable=too-many-locals
def get_lineage_by_query(
metadata: OpenMetadata,
service_name: str,
@ -380,6 +385,7 @@ def get_lineage_by_query(
and returns True if target table is found to create lineage otherwise returns False.
"""
column_lineage = {}
query_parsing_failures = QueryParsingFailures()
try:
logger.debug(f"Running lineage with query: {query}")
@ -427,6 +433,12 @@ def get_lineage_by_query(
column_lineage_map=column_lineage,
lineage_source=lineage_source,
)
if not lineage_parser.query_parsing_success:
query_parsing_failures.add(
QueryParsingError(
query=query, error=lineage_parser.query_parsing_failure_reason
)
)
except Exception as exc:
yield Either(
left=StackTraceError(
@ -450,6 +462,7 @@ def get_lineage_via_table_entity(
) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage from table entity"""
column_lineage = {}
query_parsing_failures = QueryParsingFailures()
try:
logger.debug(f"Getting lineage via table entity using query: {query}")
@ -468,6 +481,12 @@ def get_lineage_via_table_entity(
column_lineage_map=column_lineage,
lineage_source=lineage_source,
) or []
if not lineage_parser.query_parsing_success:
query_parsing_failures.add(
QueryParsingError(
query=query, error=lineage_parser.query_parsing_failure_reason
)
)
except Exception as exc: # pylint: disable=broad-except
Either(
left=StackTraceError(

View File

@ -184,3 +184,7 @@ class OMetaLineageMixin(Generic[T]):
logger.info(
f"added lineage between table {node.get('name')} and {entity_name} "
)
elif lineage_request.left:
logger.error(
f"Error while adding lineage: {lineage_request.left.error}"
)

View File

@ -26,6 +26,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.ingestion.api.step import Summary
from metadata.ingestion.lineage.models import QueryParsingFailures
from metadata.utils.execution_time_tracker import ExecutionTimeTracker
from metadata.utils.helpers import pretty_print_time_duration
from metadata.utils.logger import ANSI, log_ansi_encoded_string
@ -158,6 +159,26 @@ def print_execution_time_summary():
log_ansi_encoded_string(message=f"\n{tabulate(summary_table, tablefmt='grid')}")
def print_query_parsing_issues():
"""Log the QueryParsingFailures Summary."""
query_failures = QueryParsingFailures()
summary_table = {
"Query": [],
"Error": [],
}
for failure in query_failures:
summary_table["Query"].append(failure.query)
summary_table["Error"].append(failure.error)
if summary_table["Query"]:
log_ansi_encoded_string(bold=True, message="Query Parsing Error Summary")
log_ansi_encoded_string(
message=f"\n{tabulate(summary_table, tablefmt='grid', headers=summary_table.keys())}"
)
def print_workflow_summary(workflow: "BaseWorkflow") -> None:
"""
Args:
@ -170,6 +191,7 @@ def print_workflow_summary(workflow: "BaseWorkflow") -> None:
if is_debug_enabled(workflow):
print_workflow_status_debug(workflow)
print_execution_time_summary()
print_query_parsing_issues()
failures = []
total_records = 0