MINOR: Improve UDF Lineage Processing & Better Logging Time & MultiProcessing (#20848)

This commit is contained in:
Mayur Singal 2025-04-17 00:09:52 +05:30 committed by GitHub
parent 03abcb60f7
commit 5ea9f22492
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 588 additions and 377 deletions

View File

@ -20,6 +20,7 @@ from sqlparse.sql import Comparison
from sqlparse.tokens import Literal, Number, String
from metadata.ingestion.lineage.models import Dialect
from metadata.utils.execution_time_tracker import calculate_execution_time
MASK_TOKEN = "?"
@ -112,8 +113,12 @@ def mask_literals_with_sqlfluff(query: str, parser: LineageRunner) -> str:
return query
@calculate_execution_time(context="MaskQuery")
def mask_query(
query: str, dialect: str = Dialect.ANSI.value, parser: LineageRunner = None
query: str,
dialect: str = Dialect.ANSI.value,
parser: LineageRunner = None,
parser_required: bool = False,
) -> str:
"""
Mask a query using sqlparse or sqlfluff.
@ -122,6 +127,8 @@ def mask_query(
try:
if masked_query_cache.get((query, dialect)):
return masked_query_cache.get((query, dialect))
if parser_required and not parser:
return None
if not parser:
try:
parser = LineageRunner(query, dialect=dialect)

View File

@ -28,6 +28,7 @@ from sqlparse.sql import Comparison, Identifier, Parenthesis, Statement
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin
from metadata.ingestion.lineage.masker import mask_query
from metadata.ingestion.lineage.models import Dialect
from metadata.utils.execution_time_tracker import calculate_execution_time
from metadata.utils.helpers import (
find_in_iter,
get_formatted_entity_name,
@ -77,7 +78,10 @@ class LineageParser:
self._clean_query, dialect=dialect, timeout_seconds=timeout_seconds
)
if self.masked_query is None:
self.masked_query = mask_query(self._clean_query, parser=self.parser)
self.masked_query = (
mask_query(self._clean_query, parser=self.parser, parser_required=True)
or self._clean_query
)
@cached_property
def involved_tables(self) -> Optional[List[Table]]:
@ -409,6 +413,7 @@ class LineageParser:
return clean_query.strip()
@calculate_execution_time(context="EvaluateBestParser")
def _evaluate_best_parser(
self, query: str, dialect: Dialect, timeout_seconds: int
) -> Optional[LineageRunner]:
@ -475,7 +480,6 @@ class LineageParser:
logger.debug(f"Failed to parse query with sqlparse & sqlfluff: {query}")
return lr_sqlfluff if lr_sqlfluff else None
self.masked_query = mask_query(self._clean_query, parser=lr_sqlparser)
logger.debug(
f"Using sqlparse for lineage parsing for query: {self.masked_query or self.query}"
)

View File

@ -11,12 +11,15 @@
"""
Helper functions to handle SQL lineage operations
"""
import functools
import itertools
import traceback
from collections import defaultdict
from copy import deepcopy
from typing import Any, Iterable, List, Optional, Tuple, Union
import networkx as nx
from collate_sqllineage.core.holders import SQLLineageHolder
from collate_sqllineage.core.models import Column, DataFunction
from collate_sqllineage.core.models import Table as LineageTable
from networkx import DiGraph
@ -48,6 +51,10 @@ from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT, LineagePa
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.execution_time_tracker import (
calculate_execution_time,
calculate_execution_time_generator,
)
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.logger import utils_logger
from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache
@ -56,7 +63,7 @@ logger = utils_logger()
DEFAULT_SCHEMA_NAME = "<default>"
CUTOFF_NODES = 20
# pylint: disable=too-many-function-args,protected-access
def get_column_fqn(table_entity: Table, column: str) -> Optional[str]:
"""
Get fqn of column if exist in table entity
@ -73,6 +80,7 @@ def get_column_fqn(table_entity: Table, column: str) -> Optional[str]:
search_cache = LRUCache(LRU_CACHE_SIZE)
@calculate_execution_time(context="SearchTableEntities")
def search_table_entities(
metadata: OpenMetadata,
service_name: str,
@ -236,6 +244,58 @@ def handle_udf_column_lineage(
logger.error(f"Error handling UDF column lineage: {exc}")
@functools.lru_cache(maxsize=1000)
def _get_udf_parser(
code: str, dialect: Dialect, timeout_seconds: int
) -> Optional[LineageParser]:
if code:
return LineageParser(
f"create table dummy_table_name as {code}",
dialect=dialect,
timeout_seconds=timeout_seconds,
)
return None
def _replace_target_table(
parser: LineageParser, expected_table_name: str
) -> LineageParser:
try:
# Create a new target table instead of modifying the existing one
new_table = Table(expected_table_name.replace(DEFAULT_SCHEMA_NAME, ""))
# Create a new statement holder with the updated target table
stmt_holder = parser.parser._stmt_holders[0]
old_write = list(stmt_holder.write)[0] # Get the original target table
# Remove old target table and add new one
stmt_holder.graph.remove_node(old_write)
stmt_holder.add_write(new_table)
# Rebuild column lineage
for col_lineage in parser.parser.get_column_lineage():
if col_lineage[-1].parent == old_write:
# Create new column with same name but parent is new table
tgt_col = col_lineage[-1]
new_tgt_col = Column(tgt_col.raw_name)
new_tgt_col.parent = new_table
# Add the column lineage from source to new target
stmt_holder.add_column_lineage(col_lineage[-2], new_tgt_col)
try:
# remove the old edge
stmt_holder.graph.remove_edge(col_lineage[-2], tgt_col)
except Exception as _:
# if the edge is not present, pass
pass
# Rebuild the SQL holder
parser.parser._sql_holder = SQLLineageHolder.of(*parser.parser._stmt_holders)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error replacing target table: {exc}")
# pylint: disable=too-many-arguments
def __process_udf_es_results(
metadata: OpenMetadata,
@ -255,27 +315,32 @@ def __process_udf_es_results(
and entity.storedProcedureCode
and entity.storedProcedureCode.language == Language.SQL
):
expected_table_name = str(source_table).replace(
f"{DEFAULT_SCHEMA_NAME}.", ""
lineage_parser = _get_udf_parser(
entity.storedProcedureCode.code, dialect, timeout_seconds
)
lineage_parser = LineageParser(
f"create table {str(expected_table_name)} as {entity.storedProcedureCode.code}",
dialect=dialect,
timeout_seconds=timeout_seconds,
)
handle_udf_column_lineage(column_lineage, lineage_parser.column_lineage)
for source in lineage_parser.source_tables or []:
yield from get_source_table_names(
metadata,
dialect,
source,
database_name,
schema_name,
service_name,
timeout_seconds,
column_lineage,
procedure or entity,
if lineage_parser and lineage_parser.parser:
expected_table_name = str(source_table).replace(
f"{DEFAULT_SCHEMA_NAME}.", ""
)
lineage_parser_copy = deepcopy(lineage_parser)
_replace_target_table(lineage_parser_copy, expected_table_name)
handle_udf_column_lineage(
column_lineage, lineage_parser_copy.column_lineage
)
for source in lineage_parser_copy.source_tables or []:
yield from get_source_table_names(
metadata,
dialect,
source,
database_name,
schema_name,
service_name,
timeout_seconds,
column_lineage,
procedure or entity,
)
def __process_udf_table_names(
@ -317,6 +382,7 @@ def __process_udf_table_names(
)
@calculate_execution_time_generator(context="GetSourceTableNames")
def get_source_table_names(
metadata: OpenMetadata,
dialect: Dialect,
@ -617,6 +683,7 @@ def populate_column_lineage_map(raw_column_lineage):
# pylint: disable=too-many-locals
@calculate_execution_time_generator(context="GetLineageByQuery")
def get_lineage_by_query(
metadata: OpenMetadata,
service_name: str,
@ -627,6 +694,7 @@ def get_lineage_by_query(
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
lineage_source: LineageSource = LineageSource.QueryLineage,
graph: DiGraph = None,
lineage_parser: Optional[LineageParser] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""
This method parses the query to get source, target and intermediate table names to create lineage,
@ -636,7 +704,10 @@ def get_lineage_by_query(
query_parsing_failures = QueryParsingFailures()
try:
lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds)
if not lineage_parser:
lineage_parser = LineageParser(
query, dialect, timeout_seconds=timeout_seconds
)
masked_query = lineage_parser.masked_query
logger.debug(f"Running lineage with query: {masked_query or query}")
@ -723,6 +794,7 @@ def get_lineage_by_query(
)
@calculate_execution_time_generator(context="GetLineageViaTableEntity")
def get_lineage_via_table_entity(
metadata: OpenMetadata,
table_entity: Table,
@ -734,13 +806,17 @@ def get_lineage_via_table_entity(
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
lineage_source: LineageSource = LineageSource.QueryLineage,
graph: DiGraph = None,
lineage_parser: Optional[LineageParser] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage from table entity"""
column_lineage = {}
query_parsing_failures = QueryParsingFailures()
try:
lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds)
if not lineage_parser:
lineage_parser = LineageParser(
query, dialect, timeout_seconds=timeout_seconds
)
masked_query = lineage_parser.masked_query
logger.debug(
f"Getting lineage via table entity using query: {masked_query or query}"

View File

@ -40,6 +40,7 @@ from metadata.ingestion.ometa.client import REST, APIError
from metadata.ingestion.ometa.utils import quote
from metadata.ingestion.source.models import TableView
from metadata.utils.elasticsearch import ES_INDEX_MAP, get_entity_from_es_result
from metadata.utils.execution_time_tracker import calculate_execution_time_generator
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
@ -398,6 +399,7 @@ class ESMixin(Generic[T]):
f"Error while getting {hit.source['fullyQualifiedName']} - {exc}"
)
@calculate_execution_time_generator(context="ES.FetchViewDefinition")
def yield_es_view_def(
self,
service_name: str,

View File

@ -0,0 +1,333 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mixin class with common Stored Procedures logic aimed at lineage.
"""
import re
import time
import traceback
from datetime import datetime
from multiprocessing import Queue
from typing import Iterable, List, Optional, Union
import networkx as nx
from pydantic import BaseModel, ConfigDict, Field
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.basic import SqlQuery, Timestamp
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import datetime_to_timestamp
logger = ingestion_logger()
# pylint: disable=invalid-name
class QueryByProcedure(BaseModel):
"""
Query(ies) executed by each stored procedure
"""
procedure_name: str = Field(None, alias="PROCEDURE_NAME")
query_type: str = Field(..., alias="QUERY_TYPE")
query_database_name: Optional[str] = Field(None, alias="QUERY_DATABASE_NAME")
query_schema_name: Optional[str] = Field(None, alias="QUERY_SCHEMA_NAME")
procedure_text: str = Field(..., alias="PROCEDURE_TEXT")
procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME")
procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME")
query_start_time: Optional[datetime] = Field(None, alias="QUERY_START_TIME")
query_duration: Optional[float] = Field(None, alias="QUERY_DURATION")
query_text: str = Field(..., alias="QUERY_TEXT")
query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME")
model_config = ConfigDict(populate_by_name=True)
class ProcedureAndQuery(BaseModel):
"""
Model to hold the procedure and its queries
"""
procedure: StoredProcedure
query_by_procedure: QueryByProcedure
model_config = ConfigDict(populate_by_name=True)
def is_lineage_query(query_type: str, query_text: str) -> bool:
"""Check if it's worth it to parse the query for lineage"""
logger.debug(
f"Validating query lineage for type [{query_type}] and text [{query_text}]"
)
if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"):
return True
if query_type == "INSERT" and re.search(
"^.*insert.*into.*select.*$", query_text.replace("\n", " "), re.IGNORECASE
):
return True
return False
def _yield_procedure_lineage(
metadata: OpenMetadata,
service_name: str,
dialect: Dialect,
parsingTimeoutLimit: int,
query_by_procedure: QueryByProcedure,
procedure: StoredProcedure,
) -> Iterable[Either[AddLineageRequest]]:
"""Add procedure lineage from its query"""
if is_lineage_query(
query_type=query_by_procedure.query_type,
query_text=query_by_procedure.query_text,
):
for either_lineage in get_lineage_by_query(
metadata,
query=query_by_procedure.query_text,
service_name=service_name,
database_name=query_by_procedure.query_database_name,
schema_name=query_by_procedure.query_schema_name,
dialect=dialect,
timeout_seconds=parsingTimeoutLimit,
lineage_source=LineageSource.QueryLineage,
):
if either_lineage.left is None and either_lineage.right.edge.lineageDetails:
either_lineage.right.edge.lineageDetails.pipeline = EntityReference(
id=procedure.id,
type="storedProcedure",
)
yield either_lineage
def procedure_lineage_processor(
procedure_and_queries: List[ProcedureAndQuery],
queue: Queue,
metadata: OpenMetadata,
service_name: str,
dialect: Dialect,
parsingTimeoutLimit: int,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""
Process the procedure and its queries to add lineage
"""
for procedure_and_query in procedure_and_queries:
try:
for lineage in _yield_procedure_lineage(
query_by_procedure=procedure_and_query.query_by_procedure,
procedure=procedure_and_query.procedure,
metadata=metadata,
service_name=service_name,
dialect=dialect,
parsingTimeoutLimit=parsingTimeoutLimit,
):
if lineage and lineage.right is not None:
queue.put(
Either(
right=OMetaLineageRequest(
override_lineage=False,
lineage_request=lineage.right,
entity=StoredProcedure,
entity_fqn=procedure_and_query.procedure.fullyQualifiedName.root,
)
)
)
else:
queue.put(lineage)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not get lineage for store procedure "
f"'{procedure_and_query.procedure.fullyQualifiedName}' due to [{exc}]."
)
try:
for lineage in yield_procedure_query(
query_by_procedure=procedure_and_query.query_by_procedure,
procedure=procedure_and_query.procedure,
service_name=service_name,
):
queue.put(lineage)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not get query for store procedure "
f"'{procedure_and_query.procedure.fullyQualifiedName}' due to [{exc}]."
)
def yield_procedure_query(
query_by_procedure: QueryByProcedure, procedure: StoredProcedure, service_name: str
) -> Iterable[Either[CreateQueryRequest]]:
"""Check the queries triggered by the procedure and add their lineage, if any"""
stored_procedure_query_lineage = is_lineage_query(
query_type=query_by_procedure.query_type,
query_text=query_by_procedure.query_text,
)
yield Either(
right=CreateQueryRequest(
query=SqlQuery(query_by_procedure.query_text),
query_type=query_by_procedure.query_type,
duration=query_by_procedure.query_duration,
queryDate=Timestamp(
root=datetime_to_timestamp(query_by_procedure.query_start_time, True)
),
triggeredBy=EntityReference(
id=procedure.id,
type="storedProcedure",
),
processedLineage=bool(stored_procedure_query_lineage),
service=service_name,
)
)
# Function that will run in separate processes - defined at module level for pickling
def _process_chunk_in_subprocess(chunk, processor_fn, queue, *args):
"""
Process a chunk of data in a subprocess.
Args:
chunk_and_processor_fn: Tuple containing (chunk, processor_fn, queue, *args)
Returns:
True if processing succeeded, False otherwise
"""
try:
# Process each item in the chunk
processor_fn(chunk, queue, *args)
time.sleep(0.1)
return True
except Exception as e:
logger.error(f"Error processing chunk in subprocess: {e}")
logger.error(traceback.format_exc())
return False
def _query_already_processed(metadata: OpenMetadata, table_query: TableQuery) -> bool:
"""
Check if a query has already been processed by validating if exists
in ES with lineageProcessed as True
"""
checksums = metadata.es_get_queries_with_lineage(
service_name=table_query.serviceName,
)
return fqn.get_query_checksum(table_query.query) in checksums or {}
def query_lineage_generator(
table_queries: List[TableQuery],
queue: Queue,
metadata: OpenMetadata,
dialect: Dialect,
graph: nx.DiGraph,
parsingTimeoutLimit: int,
serviceName: str,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""
Generate lineage for a list of table queries
"""
for table_query in table_queries or []:
if not _query_already_processed(metadata, table_query):
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=dialect,
timeout_seconds=parsingTimeoutLimit,
graph=graph,
)
for lineage_request in lineages or []:
queue.put(lineage_request)
# If we identified lineage properly, ingest the original query
if lineage_request.right:
queue.put(
Either(
right=CreateQueryRequest(
query=SqlQuery(table_query.query),
query_type=table_query.query_type,
duration=table_query.duration,
processedLineage=True,
service=serviceName,
)
)
)
def view_lineage_generator(
views: List[TableView],
queue: Queue,
metadata: OpenMetadata,
serviceName: str,
connectionType: str,
parsingTimeoutLimit: int,
overrideViewLineage: bool,
) -> Iterable[Either[AddLineageRequest]]:
"""
Generate lineage for a list of views
"""
try:
for view in views:
for lineage in get_view_lineage(
view=view,
metadata=metadata,
service_name=serviceName,
connection_type=connectionType,
timeout_seconds=parsingTimeoutLimit,
):
if lineage.right is not None:
view_fqn = fqn.build(
metadata=metadata,
entity_type=Table,
service_name=serviceName,
database_name=view.db_name,
schema_name=view.schema_name,
table_name=view.table_name,
skip_es_search=True,
)
queue.put(
Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=overrideViewLineage,
entity_fqn=view_fqn,
entity=Table,
)
)
)
else:
queue.put(lineage)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing view {view}: {exc}")

View File

@ -12,22 +12,21 @@
Lineage Source Module
"""
import csv
import multiprocessing
import os
import threading
import time
import traceback
from abc import ABC
from functools import partial
from typing import Any, Callable, Iterable, Iterator, List, Optional, Union
from multiprocessing import Process, Queue
from typing import Any, Callable, Iterable, Iterator, List, Optional, Tuple, Union
import networkx as nx
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.basic import (
FullyQualifiedEntityName,
SqlQuery,
Uuid,
)
from metadata.generated.schema.type.basic import Uuid
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
@ -38,25 +37,24 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.sql_lineage import (
get_column_fqn,
get_lineage_by_graph,
get_lineage_by_query,
from metadata.ingestion.lineage.sql_lineage import get_column_fqn, get_lineage_by_graph
from metadata.ingestion.source.database.lineage_processors import (
_process_chunk_in_subprocess,
query_lineage_generator,
view_lineage_generator,
)
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.models.topology import Queue
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
CHUNK_SIZE = 100
CHUNK_SIZE = 200
THREAD_TIMEOUT = 3 * 60 * 10 # 30 minutes in seconds
THREAD_TIMEOUT = 10 * 60
PROCESS_TIMEOUT = 10 * 60
# Maximum number of processes to use for parallel processing
MAX_PROCESSES = min(multiprocessing.cpu_count(), 8) # Limit to 8 or available CPUs
class LineageSource(QueryParserSource, ABC):
@ -120,94 +118,100 @@ class LineageSource(QueryParserSource, ABC):
yield from self.yield_table_query()
@staticmethod
def generate_lineage_in_thread(
def generate_lineage_with_processes(
producer_fn: Callable[[], Iterable[Any]],
processor_fn: Callable[[Any, Queue], None],
args: Tuple[Any, ...],
chunk_size: int = CHUNK_SIZE,
thread_timeout: int = THREAD_TIMEOUT,
max_threads: int = 10, # Default maximum number of concurrent threads
processor_timeout: int = PROCESS_TIMEOUT,
):
"""
Process data in separate daemon threads with timeout control.
Process data in separate processes with timeout control.
Args:
producer_fn: Function that yields data chunks
processor_fn: Function that processes data and adds results to the queue
chunk_size: Size of chunks to process
thread_timeout: Maximum time in seconds to wait for a processor thread
max_threads: Maximum number of concurrent threads to run
processor_timeout: Maximum time in seconds to wait for a processor process
"""
queue = Queue()
active_threads = []
def process_chunk(chunk):
"""Process a chunk of data in a thread."""
def chunk_generator():
"""Group items from producer into chunks of specified size."""
temp_chunk = []
for item in producer_fn():
temp_chunk.append(item)
if len(temp_chunk) >= chunk_size:
yield temp_chunk
temp_chunk = []
if temp_chunk:
yield temp_chunk
# Use multiprocessing Queue instead of threading Queue
queue = Queue()
active_processes = []
# Dictionary to track process start times
process_start_times = {}
# Start processing each chunk in a separate process
for _, chunk in enumerate(chunk_generator()):
# Start a process for processing
process = Process(
target=_process_chunk_in_subprocess,
args=(chunk, processor_fn, queue, *args),
)
process.daemon = True
process_start_times[
process.name
] = time.time() # Track when the process started
logger.info(
f"Process {process.name} started at {process_start_times[process.name]}"
)
active_processes.append(process)
process.start()
# Process results from the queue and check for timed-out processes
while active_processes or not queue.empty():
# Process any available results
try:
processor_fn(chunk, queue)
except Exception as e:
logger.error(f"Error processing chunk: {e}")
while not queue.empty():
yield queue.get_nowait()
except Exception as exc:
logger.warning(f"Error processing queue: {exc}")
logger.debug(traceback.format_exc())
# Create an iterator for the chunks but don't consume it all at once
chunk_iterator = iter(chunk_generator(producer_fn, chunk_size))
# Process results from the queue and check for timed-out threads
chunk_processed = False # Flag to track if all chunks have been processed
ignored_threads = 0
while True:
# Start new threads until we reach the max_threads limit
while (
len(active_threads) + ignored_threads
) < max_threads and not chunk_processed:
try:
# Only fetch a new chunk when we're ready to create a thread
chunk = next(chunk_iterator)
thread = threading.Thread(target=process_chunk, args=(chunk,))
thread.start_time = time.time() # Track when the thread started
thread.daemon = True
active_threads.append(thread)
thread.start()
except StopIteration:
# No more chunks to process
chunk_processed = True
break
if ignored_threads == max_threads:
logger.warning(f"Max threads reached, skipping remaining threads")
break
# Process any available results
if queue.has_tasks():
yield from queue.process()
# Check for completed or timed-out threads
# Check for completed or timed-out processes
still_active = []
for thread in active_threads:
if thread.is_alive():
# Check if the thread has timed out
if time.time() - thread.start_time > thread_timeout:
for process in active_processes:
if process.is_alive():
# Check if the process has timed out
if (
time.time() - process_start_times[process.name]
> processor_timeout
):
logger.warning(
f"Thread {thread.name} timed out after {thread_timeout}s"
f"Process {process.name} timed out after {processor_timeout}s"
)
ignored_threads += 1
process.terminate() # Force terminate the timed out process
else:
still_active.append(thread)
# If thread is not alive, it has completed normally
still_active.append(process)
else:
# Clean up completed process
process.join()
active_threads = still_active
# Exit conditions: no more active threads and no more chunks to process
if not active_threads and chunk_processed:
break
active_processes = still_active
# Small pause to prevent CPU spinning
if active_threads:
if active_processes:
time.sleep(0.1)
# Final check for any remaining results
while queue.has_tasks():
yield from queue.process()
try:
while not queue.empty():
yield queue.get_nowait()
except Exception as exc:
logger.warning(f"Error processing queue: {exc}")
logger.debug(traceback.format_exc())
def yield_table_query(self) -> Iterator[TableQuery]:
"""
@ -239,57 +243,6 @@ class LineageSource(QueryParserSource, ABC):
f"Error processing query_dict {query_dict}: {exc}"
)
def _query_already_processed(self, table_query: TableQuery) -> bool:
"""
Check if a query has already been processed by validating if exists
in ES with lineageProcessed as True
"""
checksums = self.metadata.es_get_queries_with_lineage(
service_name=table_query.serviceName,
)
return fqn.get_query_checksum(table_query.query) in checksums or {}
def query_lineage_generator(
self, table_queries: List[TableQuery], queue: Queue
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
if self.graph is None and self.source_config.enableTempTableLineage:
import networkx as nx
# Create a directed graph
self.graph = nx.DiGraph()
for table_query in table_queries or []:
if not self._query_already_processed(table_query):
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=self.dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
graph=self.graph,
)
for lineage_request in lineages or []:
queue.put(lineage_request)
# If we identified lineage properly, ingest the original query
if lineage_request.right:
queue.put(
Either(
right=CreateQueryRequest(
query=SqlQuery(table_query.query),
query_type=table_query.query_type,
duration=table_query.duration,
processedLineage=True,
service=FullyQualifiedEntityName(
self.config.serviceName
),
)
)
)
def yield_query_lineage(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
@ -301,51 +254,20 @@ class LineageSource(QueryParserSource, ABC):
connection_type = str(self.service_connection.type.value)
self.dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
producer_fn = self.get_table_query
processor_fn = self.query_lineage_generator
yield from self.generate_lineage_in_thread(
processor_fn = query_lineage_generator
args = (
self.metadata,
self.dialect,
self.graph,
self.source_config.parsingTimeoutLimit,
self.config.serviceName,
)
yield from self.generate_lineage_with_processes(
producer_fn,
processor_fn,
max_threads=self.source_config.threads,
args,
)
def view_lineage_generator(
self, views: List[TableView], queue: Queue
) -> Iterable[Either[AddLineageRequest]]:
try:
for view in views:
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.config.serviceName,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.parsingTimeoutLimit,
):
if lineage.right is not None:
view_fqn = fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=self.service_name,
database_name=view.db_name,
schema_name=view.schema_name,
table_name=view.table_name,
skip_es_search=True,
)
queue.put(
Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
entity_fqn=view_fqn,
entity=Table,
)
)
)
else:
queue.put(lineage)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing view {view}: {exc}")
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
logger.info("Processing View Lineage")
producer_fn = partial(
@ -353,16 +275,21 @@ class LineageSource(QueryParserSource, ABC):
self.config.serviceName,
self.source_config.incrementalLineageProcessing,
)
processor_fn = self.view_lineage_generator
yield from self.generate_lineage_in_thread(
producer_fn, processor_fn, max_threads=self.source_config.threads
processor_fn = view_lineage_generator
args = (
self.metadata,
self.config.serviceName,
self.service_connection.type.value,
self.source_config.parsingTimeoutLimit,
self.source_config.overrideViewLineage,
)
yield from self.generate_lineage_with_processes(producer_fn, processor_fn, args)
def yield_procedure_lineage(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""
By default stored procedure lineage is not supported.
By default stored procedure lineage is not supported.
"""
logger.info(
f"Processing Procedure Lineage not supported for {str(self.service_connection.type.value)}"
@ -389,6 +316,7 @@ class LineageSource(QueryParserSource, ABC):
except Exception as exc:
logger.debug(f"Error to get column lineage: {exc}")
logger.debug(traceback.format_exc())
return []
def get_add_cross_database_lineage_request(
self,
@ -396,6 +324,9 @@ class LineageSource(QueryParserSource, ABC):
to_entity: Table,
column_lineage: List[ColumnLineage] = None,
) -> Optional[Either[AddLineageRequest]]:
"""
Get the add cross database lineage request
"""
if from_entity and to_entity:
return Either(
right=AddLineageRequest(
@ -431,6 +362,9 @@ class LineageSource(QueryParserSource, ABC):
Based on the query logs, prepare the lineage
and send it to the sink
"""
if self.graph is None and self.source_config.enableTempTableLineage:
# Create a directed graph
self.graph = nx.DiGraph()
if self.source_config.processViewLineage:
yield from self.yield_view_lineage() or []
if self.source_config.processStoredProcedureLineage:
@ -450,18 +384,3 @@ class LineageSource(QueryParserSource, ABC):
and self.source_config.crossDatabaseServiceNames
):
yield from self.yield_cross_database_lineage() or []
def chunk_generator(producer_fn, chunk_size):
"""
Group items from producer into chunks of specified size.
This is a separate function to allow for better lazy evaluation.
"""
temp_chunk = []
for item in producer_fn():
temp_chunk.append(item)
if len(temp_chunk) >= chunk_size:
yield temp_chunk
temp_chunk = []
if temp_chunk:
yield temp_chunk

View File

@ -12,14 +12,11 @@
Mixin class with common Stored Procedures logic aimed at lineage.
"""
import json
import re
import traceback
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Union
from typing import Dict, Iterable, List, Union
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy.engine import Engine
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
@ -31,54 +28,21 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import
from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import (
DatabaseServiceQueryLineagePipeline,
)
from metadata.generated.schema.type.basic import SqlQuery, Timestamp
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.status import Status
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.models.topology import Queue
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_processors import (
ProcedureAndQuery,
QueryByProcedure,
procedure_lineage_processor,
)
from metadata.utils.logger import ingestion_logger
from metadata.utils.stored_procedures import get_procedure_name_from_call
from metadata.utils.time_utils import datetime_to_timestamp
logger = ingestion_logger()
class QueryByProcedure(BaseModel):
"""
Query(ies) executed by each stored procedure
"""
procedure_name: str = Field(None, alias="PROCEDURE_NAME")
query_type: str = Field(..., alias="QUERY_TYPE")
query_database_name: Optional[str] = Field(None, alias="QUERY_DATABASE_NAME")
query_schema_name: Optional[str] = Field(None, alias="QUERY_SCHEMA_NAME")
procedure_text: str = Field(..., alias="PROCEDURE_TEXT")
procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME")
procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME")
query_start_time: Optional[datetime] = Field(None, alias="QUERY_START_TIME")
query_duration: Optional[float] = Field(None, alias="QUERY_DURATION")
query_text: str = Field(..., alias="QUERY_TEXT")
query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME")
model_config = ConfigDict(populate_by_name=True)
class ProcedureAndQuery(BaseModel):
"""
Model to hold the procedure and its queries
"""
procedure: StoredProcedure
query_by_procedure: QueryByProcedure
model_config = ConfigDict(populate_by_name=True)
class StoredProcedureLineageMixin(ABC):
"""
The full flow is:
@ -138,121 +102,10 @@ class StoredProcedureLineageMixin(ABC):
return queries_dict
@staticmethod
def is_lineage_query(query_type: str, query_text: str) -> bool:
"""Check if it's worth it to parse the query for lineage"""
logger.debug(
f"Validating query lineage for type [{query_type}] and text [{query_text}]"
)
if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"):
return True
if query_type == "INSERT" and re.search(
"^.*insert.*into.*select.*$", query_text.replace("\n", " "), re.IGNORECASE
):
return True
return False
def _yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure, procedure: StoredProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Add procedure lineage from its query"""
self.stored_procedure_query_lineage = False
if self.is_lineage_query(
query_type=query_by_procedure.query_type,
query_text=query_by_procedure.query_text,
):
self.stored_procedure_query_lineage = True
for either_lineage in get_lineage_by_query(
self.metadata,
query=query_by_procedure.query_text,
service_name=self.service_name,
database_name=query_by_procedure.query_database_name,
schema_name=query_by_procedure.query_schema_name,
dialect=ConnectionTypeDialectMapper.dialect_of(
self.service_connection.type.value
),
timeout_seconds=self.source_config.parsingTimeoutLimit,
lineage_source=LineageSource.QueryLineage,
):
if (
either_lineage.left is None
and either_lineage.right.edge.lineageDetails
):
either_lineage.right.edge.lineageDetails.pipeline = EntityReference(
id=procedure.id,
type="storedProcedure",
)
yield either_lineage
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure, procedure: StoredProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Check the queries triggered by the procedure and add their lineage, if any"""
yield Either(
right=CreateQueryRequest(
query=SqlQuery(query_by_procedure.query_text),
query_type=query_by_procedure.query_type,
duration=query_by_procedure.query_duration,
queryDate=Timestamp(
root=datetime_to_timestamp(
query_by_procedure.query_start_time, True
)
),
triggeredBy=EntityReference(
id=procedure.id,
type="storedProcedure",
),
processedLineage=bool(self.stored_procedure_query_lineage),
service=self.service_name,
)
)
def procedure_lineage_processor(
self, procedure_and_queries: List[ProcedureAndQuery], queue: Queue
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
for procedure_and_query in procedure_and_queries:
try:
for lineage in self._yield_procedure_lineage(
query_by_procedure=procedure_and_query.query_by_procedure,
procedure=procedure_and_query.procedure,
):
if lineage and lineage.right is not None:
queue.put(
Either(
right=OMetaLineageRequest(
override_lineage=False,
lineage_request=lineage.right,
entity=StoredProcedure,
entity_fqn=procedure_and_query.procedure.fullyQualifiedName.root,
)
)
)
else:
queue.put(lineage)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not get lineage for store procedure '{procedure_and_query.procedure.fullyQualifiedName}' due to [{exc}]."
)
try:
for lineage in self.yield_procedure_query(
query_by_procedure=procedure_and_query.query_by_procedure,
procedure=procedure_and_query.procedure,
):
queue.put(lineage)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not get query for store procedure '{procedure_and_query.procedure.fullyQualifiedName}' due to [{exc}]."
)
def procedure_lineage_generator(self) -> Iterable[ProcedureAndQuery]:
"""
Generate lineage for a list of stored procedures
"""
query = {
"query": {
"bool": {
@ -303,7 +156,14 @@ class StoredProcedureLineageMixin(ABC):
"""Get all the queries and procedures list and yield them"""
logger.info("Processing Lineage for Stored Procedures")
producer_fn = self.procedure_lineage_generator
processor_fn = self.procedure_lineage_processor
yield from self.generate_lineage_in_thread(
producer_fn, processor_fn, max_threads=self.source_config.threads
processor_fn = procedure_lineage_processor
dialect = ConnectionTypeDialectMapper.dialect_of(
self.service_connection.type.value
)
args = (
self.metadata,
self.service_name,
dialect,
self.source_config.parsingTimeoutLimit,
)
yield from self.generate_lineage_with_processes(producer_fn, processor_fn, args)

View File

@ -12,7 +12,7 @@
"""
Helpers module for db sources
"""
import time
import traceback
from typing import Iterable
@ -32,6 +32,7 @@ from metadata.ingestion.lineage.sql_lineage import (
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.execution_time_tracker import calculate_execution_time_generator
from metadata.utils.logger import utils_logger
logger = utils_logger()
@ -47,6 +48,7 @@ def get_host_from_host_port(uri: str) -> str:
return uri.split(":")[0]
@calculate_execution_time_generator()
def get_view_lineage(
view: TableView,
metadata: OpenMetadata,
@ -81,6 +83,8 @@ def get_view_lineage(
try:
connection_type = str(connection_type)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
start_time = time.time()
logger.debug(f"Processing view lineage for: {table_fqn}")
lineage_parser = LineageParser(
view_definition, dialect, timeout_seconds=timeout_seconds
)
@ -89,6 +93,10 @@ def get_view_lineage(
# For Postgres, if schema is not defined, we need to use the public schema
schema_name = PUBLIC_SCHEMA
end_time = time.time()
logger.debug(
f"Time taken to parse view lineage for: {table_fqn} is {end_time - start_time} seconds"
)
if lineage_parser.source_tables and lineage_parser.target_tables:
yield from get_lineage_by_query(
metadata,
@ -99,6 +107,7 @@ def get_view_lineage(
dialect=dialect,
timeout_seconds=timeout_seconds,
lineage_source=LineageSource.ViewLineage,
lineage_parser=lineage_parser,
) or []
else:
@ -112,6 +121,7 @@ def get_view_lineage(
dialect=dialect,
timeout_seconds=timeout_seconds,
lineage_source=LineageSource.ViewLineage,
lineage_parser=lineage_parser,
) or []
except Exception as exc:
logger.debug(traceback.format_exc())