From 5ea9f22492749867f9ea53465b817f52bd383ca2 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Thu, 17 Apr 2025 00:09:52 +0530 Subject: [PATCH] MINOR: Improve UDF Lineage Processing & Better Logging Time & MultiProcessing (#20848) --- .../src/metadata/ingestion/lineage/masker.py | 9 +- .../src/metadata/ingestion/lineage/parser.py | 8 +- .../metadata/ingestion/lineage/sql_lineage.py | 120 +++++-- .../ingestion/ometa/mixins/es_mixin.py | 2 + .../source/database/lineage_processors.py | 333 ++++++++++++++++++ .../source/database/lineage_source.py | 303 ++++++---------- .../database/stored_procedures_mixin.py | 178 +--------- ingestion/src/metadata/utils/db_utils.py | 12 +- 8 files changed, 588 insertions(+), 377 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/database/lineage_processors.py diff --git a/ingestion/src/metadata/ingestion/lineage/masker.py b/ingestion/src/metadata/ingestion/lineage/masker.py index 2e00bdd6ede..408d5a72958 100644 --- a/ingestion/src/metadata/ingestion/lineage/masker.py +++ b/ingestion/src/metadata/ingestion/lineage/masker.py @@ -20,6 +20,7 @@ from sqlparse.sql import Comparison from sqlparse.tokens import Literal, Number, String from metadata.ingestion.lineage.models import Dialect +from metadata.utils.execution_time_tracker import calculate_execution_time MASK_TOKEN = "?" @@ -112,8 +113,12 @@ def mask_literals_with_sqlfluff(query: str, parser: LineageRunner) -> str: return query +@calculate_execution_time(context="MaskQuery") def mask_query( - query: str, dialect: str = Dialect.ANSI.value, parser: LineageRunner = None + query: str, + dialect: str = Dialect.ANSI.value, + parser: LineageRunner = None, + parser_required: bool = False, ) -> str: """ Mask a query using sqlparse or sqlfluff. @@ -122,6 +127,8 @@ def mask_query( try: if masked_query_cache.get((query, dialect)): return masked_query_cache.get((query, dialect)) + if parser_required and not parser: + return None if not parser: try: parser = LineageRunner(query, dialect=dialect) diff --git a/ingestion/src/metadata/ingestion/lineage/parser.py b/ingestion/src/metadata/ingestion/lineage/parser.py index d1ee6602ede..beee41c9fc2 100644 --- a/ingestion/src/metadata/ingestion/lineage/parser.py +++ b/ingestion/src/metadata/ingestion/lineage/parser.py @@ -28,6 +28,7 @@ from sqlparse.sql import Comparison, Identifier, Parenthesis, Statement from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin from metadata.ingestion.lineage.masker import mask_query from metadata.ingestion.lineage.models import Dialect +from metadata.utils.execution_time_tracker import calculate_execution_time from metadata.utils.helpers import ( find_in_iter, get_formatted_entity_name, @@ -77,7 +78,10 @@ class LineageParser: self._clean_query, dialect=dialect, timeout_seconds=timeout_seconds ) if self.masked_query is None: - self.masked_query = mask_query(self._clean_query, parser=self.parser) + self.masked_query = ( + mask_query(self._clean_query, parser=self.parser, parser_required=True) + or self._clean_query + ) @cached_property def involved_tables(self) -> Optional[List[Table]]: @@ -409,6 +413,7 @@ class LineageParser: return clean_query.strip() + @calculate_execution_time(context="EvaluateBestParser") def _evaluate_best_parser( self, query: str, dialect: Dialect, timeout_seconds: int ) -> Optional[LineageRunner]: @@ -475,7 +480,6 @@ class LineageParser: logger.debug(f"Failed to parse query with sqlparse & sqlfluff: {query}") return lr_sqlfluff if lr_sqlfluff else None - self.masked_query = mask_query(self._clean_query, parser=lr_sqlparser) logger.debug( f"Using sqlparse for lineage parsing for query: {self.masked_query or self.query}" ) diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index 4bbd0321518..27bedef846b 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -11,12 +11,15 @@ """ Helper functions to handle SQL lineage operations """ +import functools import itertools import traceback from collections import defaultdict +from copy import deepcopy from typing import Any, Iterable, List, Optional, Tuple, Union import networkx as nx +from collate_sqllineage.core.holders import SQLLineageHolder from collate_sqllineage.core.models import Column, DataFunction from collate_sqllineage.core.models import Table as LineageTable from networkx import DiGraph @@ -48,6 +51,10 @@ from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT, LineagePa from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn from metadata.utils.elasticsearch import get_entity_from_es_result +from metadata.utils.execution_time_tracker import ( + calculate_execution_time, + calculate_execution_time_generator, +) from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.logger import utils_logger from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache @@ -56,7 +63,7 @@ logger = utils_logger() DEFAULT_SCHEMA_NAME = "" CUTOFF_NODES = 20 - +# pylint: disable=too-many-function-args,protected-access def get_column_fqn(table_entity: Table, column: str) -> Optional[str]: """ Get fqn of column if exist in table entity @@ -73,6 +80,7 @@ def get_column_fqn(table_entity: Table, column: str) -> Optional[str]: search_cache = LRUCache(LRU_CACHE_SIZE) +@calculate_execution_time(context="SearchTableEntities") def search_table_entities( metadata: OpenMetadata, service_name: str, @@ -236,6 +244,58 @@ def handle_udf_column_lineage( logger.error(f"Error handling UDF column lineage: {exc}") +@functools.lru_cache(maxsize=1000) +def _get_udf_parser( + code: str, dialect: Dialect, timeout_seconds: int +) -> Optional[LineageParser]: + if code: + return LineageParser( + f"create table dummy_table_name as {code}", + dialect=dialect, + timeout_seconds=timeout_seconds, + ) + return None + + +def _replace_target_table( + parser: LineageParser, expected_table_name: str +) -> LineageParser: + try: + # Create a new target table instead of modifying the existing one + new_table = Table(expected_table_name.replace(DEFAULT_SCHEMA_NAME, "")) + + # Create a new statement holder with the updated target table + stmt_holder = parser.parser._stmt_holders[0] + old_write = list(stmt_holder.write)[0] # Get the original target table + + # Remove old target table and add new one + stmt_holder.graph.remove_node(old_write) + stmt_holder.add_write(new_table) + + # Rebuild column lineage + for col_lineage in parser.parser.get_column_lineage(): + if col_lineage[-1].parent == old_write: + # Create new column with same name but parent is new table + tgt_col = col_lineage[-1] + new_tgt_col = Column(tgt_col.raw_name) + new_tgt_col.parent = new_table + + # Add the column lineage from source to new target + stmt_holder.add_column_lineage(col_lineage[-2], new_tgt_col) + try: + # remove the old edge + stmt_holder.graph.remove_edge(col_lineage[-2], tgt_col) + except Exception as _: + # if the edge is not present, pass + pass + + # Rebuild the SQL holder + parser.parser._sql_holder = SQLLineageHolder.of(*parser.parser._stmt_holders) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.debug(f"Error replacing target table: {exc}") + + # pylint: disable=too-many-arguments def __process_udf_es_results( metadata: OpenMetadata, @@ -255,27 +315,32 @@ def __process_udf_es_results( and entity.storedProcedureCode and entity.storedProcedureCode.language == Language.SQL ): - expected_table_name = str(source_table).replace( - f"{DEFAULT_SCHEMA_NAME}.", "" + + lineage_parser = _get_udf_parser( + entity.storedProcedureCode.code, dialect, timeout_seconds ) - lineage_parser = LineageParser( - f"create table {str(expected_table_name)} as {entity.storedProcedureCode.code}", - dialect=dialect, - timeout_seconds=timeout_seconds, - ) - handle_udf_column_lineage(column_lineage, lineage_parser.column_lineage) - for source in lineage_parser.source_tables or []: - yield from get_source_table_names( - metadata, - dialect, - source, - database_name, - schema_name, - service_name, - timeout_seconds, - column_lineage, - procedure or entity, + if lineage_parser and lineage_parser.parser: + expected_table_name = str(source_table).replace( + f"{DEFAULT_SCHEMA_NAME}.", "" ) + lineage_parser_copy = deepcopy(lineage_parser) + _replace_target_table(lineage_parser_copy, expected_table_name) + + handle_udf_column_lineage( + column_lineage, lineage_parser_copy.column_lineage + ) + for source in lineage_parser_copy.source_tables or []: + yield from get_source_table_names( + metadata, + dialect, + source, + database_name, + schema_name, + service_name, + timeout_seconds, + column_lineage, + procedure or entity, + ) def __process_udf_table_names( @@ -317,6 +382,7 @@ def __process_udf_table_names( ) +@calculate_execution_time_generator(context="GetSourceTableNames") def get_source_table_names( metadata: OpenMetadata, dialect: Dialect, @@ -617,6 +683,7 @@ def populate_column_lineage_map(raw_column_lineage): # pylint: disable=too-many-locals +@calculate_execution_time_generator(context="GetLineageByQuery") def get_lineage_by_query( metadata: OpenMetadata, service_name: str, @@ -627,6 +694,7 @@ def get_lineage_by_query( timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, lineage_source: LineageSource = LineageSource.QueryLineage, graph: DiGraph = None, + lineage_parser: Optional[LineageParser] = None, ) -> Iterable[Either[AddLineageRequest]]: """ This method parses the query to get source, target and intermediate table names to create lineage, @@ -636,7 +704,10 @@ def get_lineage_by_query( query_parsing_failures = QueryParsingFailures() try: - lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds) + if not lineage_parser: + lineage_parser = LineageParser( + query, dialect, timeout_seconds=timeout_seconds + ) masked_query = lineage_parser.masked_query logger.debug(f"Running lineage with query: {masked_query or query}") @@ -723,6 +794,7 @@ def get_lineage_by_query( ) +@calculate_execution_time_generator(context="GetLineageViaTableEntity") def get_lineage_via_table_entity( metadata: OpenMetadata, table_entity: Table, @@ -734,13 +806,17 @@ def get_lineage_via_table_entity( timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, lineage_source: LineageSource = LineageSource.QueryLineage, graph: DiGraph = None, + lineage_parser: Optional[LineageParser] = None, ) -> Iterable[Either[AddLineageRequest]]: """Get lineage from table entity""" column_lineage = {} query_parsing_failures = QueryParsingFailures() try: - lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds) + if not lineage_parser: + lineage_parser = LineageParser( + query, dialect, timeout_seconds=timeout_seconds + ) masked_query = lineage_parser.masked_query logger.debug( f"Getting lineage via table entity using query: {masked_query or query}" diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 72c35f4da7c..7d5411852ce 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -40,6 +40,7 @@ from metadata.ingestion.ometa.client import REST, APIError from metadata.ingestion.ometa.utils import quote from metadata.ingestion.source.models import TableView from metadata.utils.elasticsearch import ES_INDEX_MAP, get_entity_from_es_result +from metadata.utils.execution_time_tracker import calculate_execution_time_generator from metadata.utils.logger import ometa_logger logger = ometa_logger() @@ -398,6 +399,7 @@ class ESMixin(Generic[T]): f"Error while getting {hit.source['fullyQualifiedName']} - {exc}" ) + @calculate_execution_time_generator(context="ES.FetchViewDefinition") def yield_es_view_def( self, service_name: str, diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_processors.py b/ingestion/src/metadata/ingestion/source/database/lineage_processors.py new file mode 100644 index 00000000000..a9192b0bb10 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/lineage_processors.py @@ -0,0 +1,333 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Mixin class with common Stored Procedures logic aimed at lineage. +""" +import re +import time +import traceback +from datetime import datetime +from multiprocessing import Queue +from typing import Iterable, List, Optional, Union + +import networkx as nx +from pydantic import BaseModel, ConfigDict, Field + +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.type.basic import SqlQuery, Timestamp +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.tableQuery import TableQuery +from metadata.ingestion.api.models import Either +from metadata.ingestion.lineage.models import Dialect +from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query +from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.models import TableView +from metadata.utils import fqn +from metadata.utils.db_utils import get_view_lineage +from metadata.utils.logger import ingestion_logger +from metadata.utils.time_utils import datetime_to_timestamp + +logger = ingestion_logger() + +# pylint: disable=invalid-name + + +class QueryByProcedure(BaseModel): + """ + Query(ies) executed by each stored procedure + """ + + procedure_name: str = Field(None, alias="PROCEDURE_NAME") + query_type: str = Field(..., alias="QUERY_TYPE") + query_database_name: Optional[str] = Field(None, alias="QUERY_DATABASE_NAME") + query_schema_name: Optional[str] = Field(None, alias="QUERY_SCHEMA_NAME") + procedure_text: str = Field(..., alias="PROCEDURE_TEXT") + procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME") + procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME") + query_start_time: Optional[datetime] = Field(None, alias="QUERY_START_TIME") + query_duration: Optional[float] = Field(None, alias="QUERY_DURATION") + query_text: str = Field(..., alias="QUERY_TEXT") + query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME") + + model_config = ConfigDict(populate_by_name=True) + + +class ProcedureAndQuery(BaseModel): + """ + Model to hold the procedure and its queries + """ + + procedure: StoredProcedure + query_by_procedure: QueryByProcedure + + model_config = ConfigDict(populate_by_name=True) + + +def is_lineage_query(query_type: str, query_text: str) -> bool: + """Check if it's worth it to parse the query for lineage""" + + logger.debug( + f"Validating query lineage for type [{query_type}] and text [{query_text}]" + ) + + if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"): + return True + + if query_type == "INSERT" and re.search( + "^.*insert.*into.*select.*$", query_text.replace("\n", " "), re.IGNORECASE + ): + return True + + return False + + +def _yield_procedure_lineage( + metadata: OpenMetadata, + service_name: str, + dialect: Dialect, + parsingTimeoutLimit: int, + query_by_procedure: QueryByProcedure, + procedure: StoredProcedure, +) -> Iterable[Either[AddLineageRequest]]: + """Add procedure lineage from its query""" + if is_lineage_query( + query_type=query_by_procedure.query_type, + query_text=query_by_procedure.query_text, + ): + for either_lineage in get_lineage_by_query( + metadata, + query=query_by_procedure.query_text, + service_name=service_name, + database_name=query_by_procedure.query_database_name, + schema_name=query_by_procedure.query_schema_name, + dialect=dialect, + timeout_seconds=parsingTimeoutLimit, + lineage_source=LineageSource.QueryLineage, + ): + if either_lineage.left is None and either_lineage.right.edge.lineageDetails: + either_lineage.right.edge.lineageDetails.pipeline = EntityReference( + id=procedure.id, + type="storedProcedure", + ) + + yield either_lineage + + +def procedure_lineage_processor( + procedure_and_queries: List[ProcedureAndQuery], + queue: Queue, + metadata: OpenMetadata, + service_name: str, + dialect: Dialect, + parsingTimeoutLimit: int, +) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: + """ + Process the procedure and its queries to add lineage + """ + for procedure_and_query in procedure_and_queries: + try: + for lineage in _yield_procedure_lineage( + query_by_procedure=procedure_and_query.query_by_procedure, + procedure=procedure_and_query.procedure, + metadata=metadata, + service_name=service_name, + dialect=dialect, + parsingTimeoutLimit=parsingTimeoutLimit, + ): + if lineage and lineage.right is not None: + queue.put( + Either( + right=OMetaLineageRequest( + override_lineage=False, + lineage_request=lineage.right, + entity=StoredProcedure, + entity_fqn=procedure_and_query.procedure.fullyQualifiedName.root, + ) + ) + ) + else: + queue.put(lineage) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Could not get lineage for store procedure " + f"'{procedure_and_query.procedure.fullyQualifiedName}' due to [{exc}]." + ) + try: + for lineage in yield_procedure_query( + query_by_procedure=procedure_and_query.query_by_procedure, + procedure=procedure_and_query.procedure, + service_name=service_name, + ): + queue.put(lineage) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Could not get query for store procedure " + f"'{procedure_and_query.procedure.fullyQualifiedName}' due to [{exc}]." + ) + + +def yield_procedure_query( + query_by_procedure: QueryByProcedure, procedure: StoredProcedure, service_name: str +) -> Iterable[Either[CreateQueryRequest]]: + """Check the queries triggered by the procedure and add their lineage, if any""" + stored_procedure_query_lineage = is_lineage_query( + query_type=query_by_procedure.query_type, + query_text=query_by_procedure.query_text, + ) + + yield Either( + right=CreateQueryRequest( + query=SqlQuery(query_by_procedure.query_text), + query_type=query_by_procedure.query_type, + duration=query_by_procedure.query_duration, + queryDate=Timestamp( + root=datetime_to_timestamp(query_by_procedure.query_start_time, True) + ), + triggeredBy=EntityReference( + id=procedure.id, + type="storedProcedure", + ), + processedLineage=bool(stored_procedure_query_lineage), + service=service_name, + ) + ) + + +# Function that will run in separate processes - defined at module level for pickling +def _process_chunk_in_subprocess(chunk, processor_fn, queue, *args): + """ + Process a chunk of data in a subprocess. + + Args: + chunk_and_processor_fn: Tuple containing (chunk, processor_fn, queue, *args) + + Returns: + True if processing succeeded, False otherwise + """ + try: + # Process each item in the chunk + processor_fn(chunk, queue, *args) + time.sleep(0.1) + return True + except Exception as e: + logger.error(f"Error processing chunk in subprocess: {e}") + logger.error(traceback.format_exc()) + return False + + +def _query_already_processed(metadata: OpenMetadata, table_query: TableQuery) -> bool: + """ + Check if a query has already been processed by validating if exists + in ES with lineageProcessed as True + """ + checksums = metadata.es_get_queries_with_lineage( + service_name=table_query.serviceName, + ) + return fqn.get_query_checksum(table_query.query) in checksums or {} + + +def query_lineage_generator( + table_queries: List[TableQuery], + queue: Queue, + metadata: OpenMetadata, + dialect: Dialect, + graph: nx.DiGraph, + parsingTimeoutLimit: int, + serviceName: str, +) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: + """ + Generate lineage for a list of table queries + """ + + for table_query in table_queries or []: + if not _query_already_processed(metadata, table_query): + lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query( + metadata, + query=table_query.query, + service_name=table_query.serviceName, + database_name=table_query.databaseName, + schema_name=table_query.databaseSchema, + dialect=dialect, + timeout_seconds=parsingTimeoutLimit, + graph=graph, + ) + + for lineage_request in lineages or []: + queue.put(lineage_request) + + # If we identified lineage properly, ingest the original query + if lineage_request.right: + queue.put( + Either( + right=CreateQueryRequest( + query=SqlQuery(table_query.query), + query_type=table_query.query_type, + duration=table_query.duration, + processedLineage=True, + service=serviceName, + ) + ) + ) + + +def view_lineage_generator( + views: List[TableView], + queue: Queue, + metadata: OpenMetadata, + serviceName: str, + connectionType: str, + parsingTimeoutLimit: int, + overrideViewLineage: bool, +) -> Iterable[Either[AddLineageRequest]]: + """ + Generate lineage for a list of views + """ + try: + for view in views: + for lineage in get_view_lineage( + view=view, + metadata=metadata, + service_name=serviceName, + connection_type=connectionType, + timeout_seconds=parsingTimeoutLimit, + ): + if lineage.right is not None: + view_fqn = fqn.build( + metadata=metadata, + entity_type=Table, + service_name=serviceName, + database_name=view.db_name, + schema_name=view.schema_name, + table_name=view.table_name, + skip_es_search=True, + ) + queue.put( + Either( + right=OMetaLineageRequest( + lineage_request=lineage.right, + override_lineage=overrideViewLineage, + entity_fqn=view_fqn, + entity=Table, + ) + ) + ) + else: + queue.put(lineage) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error processing view {view}: {exc}") diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index 7980ce0d1dd..755f757ca52 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -12,22 +12,21 @@ Lineage Source Module """ import csv +import multiprocessing import os -import threading import time import traceback from abc import ABC from functools import partial -from typing import Any, Callable, Iterable, Iterator, List, Optional, Union +from multiprocessing import Process, Queue +from typing import Any, Callable, Iterable, Iterator, List, Optional, Tuple, Union + +import networkx as nx from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.type.basic import ( - FullyQualifiedEntityName, - SqlQuery, - Uuid, -) +from metadata.generated.schema.type.basic import Uuid from metadata.generated.schema.type.entityLineage import ( ColumnLineage, EntitiesEdge, @@ -38,25 +37,24 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.api.models import Either from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect -from metadata.ingestion.lineage.sql_lineage import ( - get_column_fqn, - get_lineage_by_graph, - get_lineage_by_query, +from metadata.ingestion.lineage.sql_lineage import get_column_fqn, get_lineage_by_graph +from metadata.ingestion.source.database.lineage_processors import ( + _process_chunk_in_subprocess, + query_lineage_generator, + view_lineage_generator, ) -from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest -from metadata.ingestion.models.topology import Queue from metadata.ingestion.source.database.query_parser_source import QueryParserSource -from metadata.ingestion.source.models import TableView -from metadata.utils import fqn -from metadata.utils.db_utils import get_view_lineage from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -CHUNK_SIZE = 100 +CHUNK_SIZE = 200 -THREAD_TIMEOUT = 3 * 60 * 10 # 30 minutes in seconds +THREAD_TIMEOUT = 10 * 60 +PROCESS_TIMEOUT = 10 * 60 +# Maximum number of processes to use for parallel processing +MAX_PROCESSES = min(multiprocessing.cpu_count(), 8) # Limit to 8 or available CPUs class LineageSource(QueryParserSource, ABC): @@ -120,94 +118,100 @@ class LineageSource(QueryParserSource, ABC): yield from self.yield_table_query() @staticmethod - def generate_lineage_in_thread( + def generate_lineage_with_processes( producer_fn: Callable[[], Iterable[Any]], processor_fn: Callable[[Any, Queue], None], + args: Tuple[Any, ...], chunk_size: int = CHUNK_SIZE, - thread_timeout: int = THREAD_TIMEOUT, - max_threads: int = 10, # Default maximum number of concurrent threads + processor_timeout: int = PROCESS_TIMEOUT, ): """ - Process data in separate daemon threads with timeout control. + Process data in separate processes with timeout control. Args: producer_fn: Function that yields data chunks processor_fn: Function that processes data and adds results to the queue chunk_size: Size of chunks to process - thread_timeout: Maximum time in seconds to wait for a processor thread - max_threads: Maximum number of concurrent threads to run + processor_timeout: Maximum time in seconds to wait for a processor process """ - queue = Queue() - active_threads = [] - def process_chunk(chunk): - """Process a chunk of data in a thread.""" + def chunk_generator(): + """Group items from producer into chunks of specified size.""" + temp_chunk = [] + for item in producer_fn(): + temp_chunk.append(item) + if len(temp_chunk) >= chunk_size: + yield temp_chunk + temp_chunk = [] + if temp_chunk: + yield temp_chunk + + # Use multiprocessing Queue instead of threading Queue + queue = Queue() + active_processes = [] + + # Dictionary to track process start times + process_start_times = {} + + # Start processing each chunk in a separate process + for _, chunk in enumerate(chunk_generator()): + # Start a process for processing + process = Process( + target=_process_chunk_in_subprocess, + args=(chunk, processor_fn, queue, *args), + ) + process.daemon = True + process_start_times[ + process.name + ] = time.time() # Track when the process started + logger.info( + f"Process {process.name} started at {process_start_times[process.name]}" + ) + active_processes.append(process) + process.start() + + # Process results from the queue and check for timed-out processes + while active_processes or not queue.empty(): + # Process any available results try: - processor_fn(chunk, queue) - except Exception as e: - logger.error(f"Error processing chunk: {e}") + while not queue.empty(): + yield queue.get_nowait() + except Exception as exc: + logger.warning(f"Error processing queue: {exc}") logger.debug(traceback.format_exc()) - # Create an iterator for the chunks but don't consume it all at once - chunk_iterator = iter(chunk_generator(producer_fn, chunk_size)) - - # Process results from the queue and check for timed-out threads - chunk_processed = False # Flag to track if all chunks have been processed - ignored_threads = 0 - - while True: - # Start new threads until we reach the max_threads limit - while ( - len(active_threads) + ignored_threads - ) < max_threads and not chunk_processed: - try: - # Only fetch a new chunk when we're ready to create a thread - chunk = next(chunk_iterator) - thread = threading.Thread(target=process_chunk, args=(chunk,)) - thread.start_time = time.time() # Track when the thread started - thread.daemon = True - active_threads.append(thread) - thread.start() - except StopIteration: - # No more chunks to process - chunk_processed = True - break - - if ignored_threads == max_threads: - logger.warning(f"Max threads reached, skipping remaining threads") - break - - # Process any available results - if queue.has_tasks(): - yield from queue.process() - - # Check for completed or timed-out threads + # Check for completed or timed-out processes still_active = [] - for thread in active_threads: - if thread.is_alive(): - # Check if the thread has timed out - if time.time() - thread.start_time > thread_timeout: + for process in active_processes: + if process.is_alive(): + # Check if the process has timed out + if ( + time.time() - process_start_times[process.name] + > processor_timeout + ): logger.warning( - f"Thread {thread.name} timed out after {thread_timeout}s" + f"Process {process.name} timed out after {processor_timeout}s" ) - ignored_threads += 1 + process.terminate() # Force terminate the timed out process else: - still_active.append(thread) - # If thread is not alive, it has completed normally + still_active.append(process) + else: + # Clean up completed process + process.join() - active_threads = still_active - - # Exit conditions: no more active threads and no more chunks to process - if not active_threads and chunk_processed: - break + active_processes = still_active # Small pause to prevent CPU spinning - if active_threads: + if active_processes: time.sleep(0.1) # Final check for any remaining results - while queue.has_tasks(): - yield from queue.process() + try: + while not queue.empty(): + yield queue.get_nowait() + except Exception as exc: + logger.warning(f"Error processing queue: {exc}") + logger.debug(traceback.format_exc()) def yield_table_query(self) -> Iterator[TableQuery]: """ @@ -239,57 +243,6 @@ class LineageSource(QueryParserSource, ABC): f"Error processing query_dict {query_dict}: {exc}" ) - def _query_already_processed(self, table_query: TableQuery) -> bool: - """ - Check if a query has already been processed by validating if exists - in ES with lineageProcessed as True - """ - checksums = self.metadata.es_get_queries_with_lineage( - service_name=table_query.serviceName, - ) - return fqn.get_query_checksum(table_query.query) in checksums or {} - - def query_lineage_generator( - self, table_queries: List[TableQuery], queue: Queue - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - if self.graph is None and self.source_config.enableTempTableLineage: - import networkx as nx - - # Create a directed graph - self.graph = nx.DiGraph() - - for table_query in table_queries or []: - if not self._query_already_processed(table_query): - lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query( - self.metadata, - query=table_query.query, - service_name=table_query.serviceName, - database_name=table_query.databaseName, - schema_name=table_query.databaseSchema, - dialect=self.dialect, - timeout_seconds=self.source_config.parsingTimeoutLimit, - graph=self.graph, - ) - - for lineage_request in lineages or []: - queue.put(lineage_request) - - # If we identified lineage properly, ingest the original query - if lineage_request.right: - queue.put( - Either( - right=CreateQueryRequest( - query=SqlQuery(table_query.query), - query_type=table_query.query_type, - duration=table_query.duration, - processedLineage=True, - service=FullyQualifiedEntityName( - self.config.serviceName - ), - ) - ) - ) - def yield_query_lineage( self, ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: @@ -301,51 +254,20 @@ class LineageSource(QueryParserSource, ABC): connection_type = str(self.service_connection.type.value) self.dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) producer_fn = self.get_table_query - processor_fn = self.query_lineage_generator - yield from self.generate_lineage_in_thread( + processor_fn = query_lineage_generator + args = ( + self.metadata, + self.dialect, + self.graph, + self.source_config.parsingTimeoutLimit, + self.config.serviceName, + ) + yield from self.generate_lineage_with_processes( producer_fn, processor_fn, - max_threads=self.source_config.threads, + args, ) - def view_lineage_generator( - self, views: List[TableView], queue: Queue - ) -> Iterable[Either[AddLineageRequest]]: - try: - for view in views: - for lineage in get_view_lineage( - view=view, - metadata=self.metadata, - service_name=self.config.serviceName, - connection_type=self.service_connection.type.value, - timeout_seconds=self.source_config.parsingTimeoutLimit, - ): - if lineage.right is not None: - view_fqn = fqn.build( - metadata=self.metadata, - entity_type=Table, - service_name=self.service_name, - database_name=view.db_name, - schema_name=view.schema_name, - table_name=view.table_name, - skip_es_search=True, - ) - queue.put( - Either( - right=OMetaLineageRequest( - lineage_request=lineage.right, - override_lineage=self.source_config.overrideViewLineage, - entity_fqn=view_fqn, - entity=Table, - ) - ) - ) - else: - queue.put(lineage) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Error processing view {view}: {exc}") - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: logger.info("Processing View Lineage") producer_fn = partial( @@ -353,16 +275,21 @@ class LineageSource(QueryParserSource, ABC): self.config.serviceName, self.source_config.incrementalLineageProcessing, ) - processor_fn = self.view_lineage_generator - yield from self.generate_lineage_in_thread( - producer_fn, processor_fn, max_threads=self.source_config.threads + processor_fn = view_lineage_generator + args = ( + self.metadata, + self.config.serviceName, + self.service_connection.type.value, + self.source_config.parsingTimeoutLimit, + self.source_config.overrideViewLineage, ) + yield from self.generate_lineage_with_processes(producer_fn, processor_fn, args) def yield_procedure_lineage( self, ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: """ - By default stored procedure lineage is not supported. + By default stored procedure lineage is not supported. """ logger.info( f"Processing Procedure Lineage not supported for {str(self.service_connection.type.value)}" @@ -389,6 +316,7 @@ class LineageSource(QueryParserSource, ABC): except Exception as exc: logger.debug(f"Error to get column lineage: {exc}") logger.debug(traceback.format_exc()) + return [] def get_add_cross_database_lineage_request( self, @@ -396,6 +324,9 @@ class LineageSource(QueryParserSource, ABC): to_entity: Table, column_lineage: List[ColumnLineage] = None, ) -> Optional[Either[AddLineageRequest]]: + """ + Get the add cross database lineage request + """ if from_entity and to_entity: return Either( right=AddLineageRequest( @@ -431,6 +362,9 @@ class LineageSource(QueryParserSource, ABC): Based on the query logs, prepare the lineage and send it to the sink """ + if self.graph is None and self.source_config.enableTempTableLineage: + # Create a directed graph + self.graph = nx.DiGraph() if self.source_config.processViewLineage: yield from self.yield_view_lineage() or [] if self.source_config.processStoredProcedureLineage: @@ -450,18 +384,3 @@ class LineageSource(QueryParserSource, ABC): and self.source_config.crossDatabaseServiceNames ): yield from self.yield_cross_database_lineage() or [] - - -def chunk_generator(producer_fn, chunk_size): - """ - Group items from producer into chunks of specified size. - This is a separate function to allow for better lazy evaluation. - """ - temp_chunk = [] - for item in producer_fn(): - temp_chunk.append(item) - if len(temp_chunk) >= chunk_size: - yield temp_chunk - temp_chunk = [] - if temp_chunk: - yield temp_chunk diff --git a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py index 0d5ee51fcc8..a7f94f254fd 100644 --- a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py +++ b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py @@ -12,14 +12,11 @@ Mixin class with common Stored Procedures logic aimed at lineage. """ import json -import re import traceback from abc import ABC, abstractmethod from collections import defaultdict -from datetime import datetime -from typing import Dict, Iterable, List, Optional, Union +from typing import Dict, Iterable, List, Union -from pydantic import BaseModel, ConfigDict, Field from sqlalchemy.engine import Engine from metadata.generated.schema.api.data.createQuery import CreateQueryRequest @@ -31,54 +28,21 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import ( DatabaseServiceQueryLineagePipeline, ) -from metadata.generated.schema.type.basic import SqlQuery, Timestamp -from metadata.generated.schema.type.entityLineage import Source as LineageSource -from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.status import Status from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper -from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query -from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest -from metadata.ingestion.models.topology import Queue from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.lineage_processors import ( + ProcedureAndQuery, + QueryByProcedure, + procedure_lineage_processor, +) from metadata.utils.logger import ingestion_logger from metadata.utils.stored_procedures import get_procedure_name_from_call -from metadata.utils.time_utils import datetime_to_timestamp logger = ingestion_logger() -class QueryByProcedure(BaseModel): - """ - Query(ies) executed by each stored procedure - """ - - procedure_name: str = Field(None, alias="PROCEDURE_NAME") - query_type: str = Field(..., alias="QUERY_TYPE") - query_database_name: Optional[str] = Field(None, alias="QUERY_DATABASE_NAME") - query_schema_name: Optional[str] = Field(None, alias="QUERY_SCHEMA_NAME") - procedure_text: str = Field(..., alias="PROCEDURE_TEXT") - procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME") - procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME") - query_start_time: Optional[datetime] = Field(None, alias="QUERY_START_TIME") - query_duration: Optional[float] = Field(None, alias="QUERY_DURATION") - query_text: str = Field(..., alias="QUERY_TEXT") - query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME") - - model_config = ConfigDict(populate_by_name=True) - - -class ProcedureAndQuery(BaseModel): - """ - Model to hold the procedure and its queries - """ - - procedure: StoredProcedure - query_by_procedure: QueryByProcedure - - model_config = ConfigDict(populate_by_name=True) - - class StoredProcedureLineageMixin(ABC): """ The full flow is: @@ -138,121 +102,10 @@ class StoredProcedureLineageMixin(ABC): return queries_dict - @staticmethod - def is_lineage_query(query_type: str, query_text: str) -> bool: - """Check if it's worth it to parse the query for lineage""" - - logger.debug( - f"Validating query lineage for type [{query_type}] and text [{query_text}]" - ) - - if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"): - return True - - if query_type == "INSERT" and re.search( - "^.*insert.*into.*select.*$", query_text.replace("\n", " "), re.IGNORECASE - ): - return True - - return False - - def _yield_procedure_lineage( - self, query_by_procedure: QueryByProcedure, procedure: StoredProcedure - ) -> Iterable[Either[AddLineageRequest]]: - """Add procedure lineage from its query""" - self.stored_procedure_query_lineage = False - if self.is_lineage_query( - query_type=query_by_procedure.query_type, - query_text=query_by_procedure.query_text, - ): - self.stored_procedure_query_lineage = True - for either_lineage in get_lineage_by_query( - self.metadata, - query=query_by_procedure.query_text, - service_name=self.service_name, - database_name=query_by_procedure.query_database_name, - schema_name=query_by_procedure.query_schema_name, - dialect=ConnectionTypeDialectMapper.dialect_of( - self.service_connection.type.value - ), - timeout_seconds=self.source_config.parsingTimeoutLimit, - lineage_source=LineageSource.QueryLineage, - ): - if ( - either_lineage.left is None - and either_lineage.right.edge.lineageDetails - ): - either_lineage.right.edge.lineageDetails.pipeline = EntityReference( - id=procedure.id, - type="storedProcedure", - ) - - yield either_lineage - - def yield_procedure_query( - self, query_by_procedure: QueryByProcedure, procedure: StoredProcedure - ) -> Iterable[Either[CreateQueryRequest]]: - """Check the queries triggered by the procedure and add their lineage, if any""" - - yield Either( - right=CreateQueryRequest( - query=SqlQuery(query_by_procedure.query_text), - query_type=query_by_procedure.query_type, - duration=query_by_procedure.query_duration, - queryDate=Timestamp( - root=datetime_to_timestamp( - query_by_procedure.query_start_time, True - ) - ), - triggeredBy=EntityReference( - id=procedure.id, - type="storedProcedure", - ), - processedLineage=bool(self.stored_procedure_query_lineage), - service=self.service_name, - ) - ) - - def procedure_lineage_processor( - self, procedure_and_queries: List[ProcedureAndQuery], queue: Queue - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - for procedure_and_query in procedure_and_queries: - try: - for lineage in self._yield_procedure_lineage( - query_by_procedure=procedure_and_query.query_by_procedure, - procedure=procedure_and_query.procedure, - ): - if lineage and lineage.right is not None: - queue.put( - Either( - right=OMetaLineageRequest( - override_lineage=False, - lineage_request=lineage.right, - entity=StoredProcedure, - entity_fqn=procedure_and_query.procedure.fullyQualifiedName.root, - ) - ) - ) - else: - queue.put(lineage) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Could not get lineage for store procedure '{procedure_and_query.procedure.fullyQualifiedName}' due to [{exc}]." - ) - try: - for lineage in self.yield_procedure_query( - query_by_procedure=procedure_and_query.query_by_procedure, - procedure=procedure_and_query.procedure, - ): - queue.put(lineage) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Could not get query for store procedure '{procedure_and_query.procedure.fullyQualifiedName}' due to [{exc}]." - ) - def procedure_lineage_generator(self) -> Iterable[ProcedureAndQuery]: + """ + Generate lineage for a list of stored procedures + """ query = { "query": { "bool": { @@ -303,7 +156,14 @@ class StoredProcedureLineageMixin(ABC): """Get all the queries and procedures list and yield them""" logger.info("Processing Lineage for Stored Procedures") producer_fn = self.procedure_lineage_generator - processor_fn = self.procedure_lineage_processor - yield from self.generate_lineage_in_thread( - producer_fn, processor_fn, max_threads=self.source_config.threads + processor_fn = procedure_lineage_processor + dialect = ConnectionTypeDialectMapper.dialect_of( + self.service_connection.type.value ) + args = ( + self.metadata, + self.service_name, + dialect, + self.source_config.parsingTimeoutLimit, + ) + yield from self.generate_lineage_with_processes(producer_fn, processor_fn, args) diff --git a/ingestion/src/metadata/utils/db_utils.py b/ingestion/src/metadata/utils/db_utils.py index a41cdc413b7..4cbfb6136c0 100644 --- a/ingestion/src/metadata/utils/db_utils.py +++ b/ingestion/src/metadata/utils/db_utils.py @@ -12,7 +12,7 @@ """ Helpers module for db sources """ - +import time import traceback from typing import Iterable @@ -32,6 +32,7 @@ from metadata.ingestion.lineage.sql_lineage import ( from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.models import TableView from metadata.utils import fqn +from metadata.utils.execution_time_tracker import calculate_execution_time_generator from metadata.utils.logger import utils_logger logger = utils_logger() @@ -47,6 +48,7 @@ def get_host_from_host_port(uri: str) -> str: return uri.split(":")[0] +@calculate_execution_time_generator() def get_view_lineage( view: TableView, metadata: OpenMetadata, @@ -81,6 +83,8 @@ def get_view_lineage( try: connection_type = str(connection_type) dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) + start_time = time.time() + logger.debug(f"Processing view lineage for: {table_fqn}") lineage_parser = LineageParser( view_definition, dialect, timeout_seconds=timeout_seconds ) @@ -89,6 +93,10 @@ def get_view_lineage( # For Postgres, if schema is not defined, we need to use the public schema schema_name = PUBLIC_SCHEMA + end_time = time.time() + logger.debug( + f"Time taken to parse view lineage for: {table_fqn} is {end_time - start_time} seconds" + ) if lineage_parser.source_tables and lineage_parser.target_tables: yield from get_lineage_by_query( metadata, @@ -99,6 +107,7 @@ def get_view_lineage( dialect=dialect, timeout_seconds=timeout_seconds, lineage_source=LineageSource.ViewLineage, + lineage_parser=lineage_parser, ) or [] else: @@ -112,6 +121,7 @@ def get_view_lineage( dialect=dialect, timeout_seconds=timeout_seconds, lineage_source=LineageSource.ViewLineage, + lineage_parser=lineage_parser, ) or [] except Exception as exc: logger.debug(traceback.format_exc())