From 18da8a5964c451ca1fb54d65bab703440f0371e5 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Thu, 18 Apr 2024 19:07:51 +0530 Subject: [PATCH] Fix #15208: Lineage - log query parsing issue summary (#15921) --- .../src/metadata/ingestion/lineage/models.py | 35 ++++++++++++++++++- .../src/metadata/ingestion/lineage/parser.py | 23 +++++++----- .../metadata/ingestion/lineage/sql_lineage.py | 21 ++++++++++- .../ingestion/ometa/mixins/lineage_mixin.py | 4 +++ .../src/metadata/workflow/output_handler.py | 22 ++++++++++++ 5 files changed, 95 insertions(+), 10 deletions(-) diff --git a/ingestion/src/metadata/ingestion/lineage/models.py b/ingestion/src/metadata/ingestion/lineage/models.py index 419c788b747..d62855809f3 100644 --- a/ingestion/src/metadata/ingestion/lineage/models.py +++ b/ingestion/src/metadata/ingestion/lineage/models.py @@ -12,7 +12,9 @@ Models related to lineage parsing """ from enum import Enum -from typing import Dict +from typing import Dict, List, Optional + +from pydantic import BaseModel, Extra, Field from metadata.generated.schema.entity.services.connections.database.athenaConnection import ( AthenaType, @@ -62,6 +64,7 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( SQLiteType, ) +from metadata.utils.singleton import Singleton class Dialect(Enum): @@ -126,3 +129,33 @@ class ConnectionTypeDialectMapper: Returns: a dialect """ return MAP_CONNECTION_TYPE_DIALECT.get(connection_type, Dialect.ANSI) + + +class QueryParsingError(BaseModel): + """ + Represents an error that occurs during query parsing. + + Attributes: + query (str): The query text of the failed query. + error (str): The error message of the failed query. + """ + + class Config: + extra = Extra.forbid + + query: str = Field(..., description="query text of the failed query") + error: Optional[str] = Field(..., description="error message of the failed query") + + +class QueryParsingFailures(metaclass=Singleton): + """Tracks the Queries that failed to parse.""" + + def __init__(self): + """Initializes the list of parsing failures.""" + self._query_list: List[QueryParsingError] = [] + + def add(self, parsing_error: QueryParsingError): + self._query_list.append(parsing_error) + + def __iter__(self): + return iter(self._query_list) diff --git a/ingestion/src/metadata/ingestion/lineage/parser.py b/ingestion/src/metadata/ingestion/lineage/parser.py index 368dd738aba..812207ff070 100644 --- a/ingestion/src/metadata/ingestion/lineage/parser.py +++ b/ingestion/src/metadata/ingestion/lineage/parser.py @@ -67,6 +67,8 @@ class LineageParser: timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, ): self.query = query + self.query_parsing_success = True + self.query_parsing_failure_reason = None self._clean_query = self.clean_raw_query(query) self.parser = self._evaluate_best_parser( self._clean_query, dialect=dialect, timeout_seconds=timeout_seconds @@ -400,9 +402,8 @@ class LineageParser: return clean_query.strip() - @staticmethod def _evaluate_best_parser( - query: str, dialect: Dialect, timeout_seconds: int + self, query: str, dialect: Dialect, timeout_seconds: int ) -> Optional[LineageRunner]: if query is None: return None @@ -424,15 +425,19 @@ class LineageParser: ) ) except TimeoutError: - logger.debug( - f"Lineage with SqlFluff failed for the [{dialect.value}] query: [{query}]: " + self.query_parsing_success = False + self.query_parsing_failure_reason = ( + f"Lineage with SqlFluff failed for the [{dialect.value}]. " f"Parser has been running for more than {timeout_seconds} seconds." ) + logger.debug(f"{self.query_parsing_failure_reason}] query: [{query}]") lr_sqlfluff = None except Exception: - logger.debug( - f"Lineage with SqlFluff failed for the [{dialect.value}] query: [{query}]" + self.query_parsing_success = False + self.query_parsing_failure_reason = ( + f"Lineage with SqlFluff failed for the [{dialect.value}]" ) + logger.debug(f"{self.query_parsing_failure_reason} query: [{query}]") lr_sqlfluff = None lr_sqlparser = LineageRunner(query) @@ -451,10 +456,12 @@ class LineageParser: if lr_sqlfluff: # if sqlparser retrieve more lineage info that sqlfluff if sqlparser_count > sqlfluff_count: - logger.debug( + self.query_parsing_success = False + self.query_parsing_failure_reason = ( "Lineage computed with SqlFluff did not perform as expected " - f"for the [{dialect.value}] query: [{query}]" + f"for the [{dialect.value}]" ) + logger.debug(f"{self.query_parsing_failure_reason} query: [{query}]") return lr_sqlparser return lr_sqlfluff return lr_sqlparser diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index 3bd41a13e46..b1e3b989a5f 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -27,7 +27,11 @@ from metadata.generated.schema.type.entityLineage import ( from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either -from metadata.ingestion.lineage.models import Dialect +from metadata.ingestion.lineage.models import ( + Dialect, + QueryParsingError, + QueryParsingFailures, +) from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT, LineageParser from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn @@ -365,6 +369,7 @@ def populate_column_lineage_map(raw_column_lineage): return lineage_map +# pylint: disable=too-many-locals def get_lineage_by_query( metadata: OpenMetadata, service_name: str, @@ -380,6 +385,7 @@ def get_lineage_by_query( and returns True if target table is found to create lineage otherwise returns False. """ column_lineage = {} + query_parsing_failures = QueryParsingFailures() try: logger.debug(f"Running lineage with query: {query}") @@ -427,6 +433,12 @@ def get_lineage_by_query( column_lineage_map=column_lineage, lineage_source=lineage_source, ) + if not lineage_parser.query_parsing_success: + query_parsing_failures.add( + QueryParsingError( + query=query, error=lineage_parser.query_parsing_failure_reason + ) + ) except Exception as exc: yield Either( left=StackTraceError( @@ -450,6 +462,7 @@ def get_lineage_via_table_entity( ) -> Iterable[Either[AddLineageRequest]]: """Get lineage from table entity""" column_lineage = {} + query_parsing_failures = QueryParsingFailures() try: logger.debug(f"Getting lineage via table entity using query: {query}") @@ -468,6 +481,12 @@ def get_lineage_via_table_entity( column_lineage_map=column_lineage, lineage_source=lineage_source, ) or [] + if not lineage_parser.query_parsing_success: + query_parsing_failures.add( + QueryParsingError( + query=query, error=lineage_parser.query_parsing_failure_reason + ) + ) except Exception as exc: # pylint: disable=broad-except Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index b0ac257b36f..cd8d55d795f 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -184,3 +184,7 @@ class OMetaLineageMixin(Generic[T]): logger.info( f"added lineage between table {node.get('name')} and {entity_name} " ) + elif lineage_request.left: + logger.error( + f"Error while adding lineage: {lineage_request.left.error}" + ) diff --git a/ingestion/src/metadata/workflow/output_handler.py b/ingestion/src/metadata/workflow/output_handler.py index 51c3d849ad2..a8c0e6000d0 100644 --- a/ingestion/src/metadata/workflow/output_handler.py +++ b/ingestion/src/metadata/workflow/output_handler.py @@ -26,6 +26,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import ) from metadata.generated.schema.metadataIngestion.workflow import LogLevels from metadata.ingestion.api.step import Summary +from metadata.ingestion.lineage.models import QueryParsingFailures from metadata.utils.execution_time_tracker import ExecutionTimeTracker from metadata.utils.helpers import pretty_print_time_duration from metadata.utils.logger import ANSI, log_ansi_encoded_string @@ -158,6 +159,26 @@ def print_execution_time_summary(): log_ansi_encoded_string(message=f"\n{tabulate(summary_table, tablefmt='grid')}") +def print_query_parsing_issues(): + """Log the QueryParsingFailures Summary.""" + query_failures = QueryParsingFailures() + + summary_table = { + "Query": [], + "Error": [], + } + + for failure in query_failures: + summary_table["Query"].append(failure.query) + summary_table["Error"].append(failure.error) + + if summary_table["Query"]: + log_ansi_encoded_string(bold=True, message="Query Parsing Error Summary") + log_ansi_encoded_string( + message=f"\n{tabulate(summary_table, tablefmt='grid', headers=summary_table.keys())}" + ) + + def print_workflow_summary(workflow: "BaseWorkflow") -> None: """ Args: @@ -170,6 +191,7 @@ def print_workflow_summary(workflow: "BaseWorkflow") -> None: if is_debug_enabled(workflow): print_workflow_status_debug(workflow) print_execution_time_summary() + print_query_parsing_issues() failures = [] total_records = 0