MINOR: Improve memeory in temp table lineage (#20041)

This commit is contained in:
Mayur Singal 2025-03-04 15:24:47 +05:30 committed by GitHub
parent 67a0795f7b
commit 2af5e30f4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 105 additions and 57 deletions

View File

@ -47,6 +47,7 @@ from metadata.ingestion.lineage.models import (
from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT, LineageParser from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT, LineageParser
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.logger import utils_logger from metadata.utils.logger import utils_logger
from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache
@ -516,7 +517,6 @@ def _create_lineage_by_table_name(
""" """
This method is to create a lineage between two tables This method is to create a lineage between two tables
""" """
try: try:
from_table_entities = get_table_entities_from_query( from_table_entities = get_table_entities_from_query(
metadata=metadata, metadata=metadata,
@ -543,9 +543,20 @@ def _create_lineage_by_table_name(
f"WARNING: Table entity [{table_name}] not found in OpenMetadata" f"WARNING: Table entity [{table_name}] not found in OpenMetadata"
) )
if graph is not None and (not from_table_entities or not to_table_entities): if graph is not None and (not from_table_entities or not to_table_entities):
graph.add_node(from_table, entity=from_table_entities) # Add nodes and edges with minimal data
graph.add_node(to_table, entity=to_table_entities) graph.add_node(
graph.add_edge(from_table, to_table, query=masked_query) from_table,
fqns=[table.fullyQualifiedName.root for table in from_table_entities]
if from_table_entities
else [],
)
graph.add_node(
to_table,
fqns=[table.fullyQualifiedName.root for table in to_table_entities]
if to_table_entities
else [],
)
graph.add_edge(from_table, to_table)
return return
for from_entity, to_entity in itertools.product( for from_entity, to_entity in itertools.product(
@ -776,57 +787,107 @@ def get_lineage_via_table_entity(
) )
def _get_lineage_for_path(
from_fqn: str,
to_fqn: str,
from_node: Any,
current_node: Any,
table_chain: List[str],
metadata: OpenMetadata,
) -> Optional[Either[AddLineageRequest]]:
"""
Get lineage for a pair of FQNs in the path
"""
try:
to_entity = get_entity_from_es_result(
entity_list=metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=to_fqn,
),
)
from_entity = get_entity_from_es_result(
entity_list=metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=from_fqn,
),
)
if to_entity and from_entity:
# Create the table chain string
table_relationship = "--- TEMPT TABLE LINEAGE \n--- "
table_relationship += " > ".join(table_chain)
return _build_table_lineage(
to_entity=to_entity,
from_entity=from_entity,
to_table_raw_name=str(current_node),
from_table_raw_name=str(from_node),
masked_query=table_relationship, # Using table chain as the query
column_lineage_map={},
lineage_source=LineageSource.QueryLineage,
procedure=None,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Error fetching table entities [{from_fqn} -> {to_fqn}]: {exc}")
return None
def _process_sequence( def _process_sequence(
sequence: List[Any], graph: DiGraph sequence: List[Any], graph: DiGraph, metadata: OpenMetadata
) -> Iterable[Either[AddLineageRequest]]: ) -> Iterable[Either[AddLineageRequest]]:
""" """
Process a sequence of nodes to generate lineage information. Process a sequence of nodes to generate lineage information.
""" """
from_node = None from_node = None
queries = set() table_chain = []
clean_queries = False
previous_node = None
for node in sequence: for node in sequence:
try: try:
if clean_queries:
queries.clear()
clean_queries = False
current_node = graph.nodes[node] current_node = graph.nodes[node]
current_entity = current_node.get("entity") current_fqns = current_node.get("fqns", [])
if ( # Add the current node name to the chain
previous_node is not None table_chain.append(str(node).replace(f"{DEFAULT_SCHEMA_NAME}.", ""))
and graph.edges[(previous_node, node)].get("query") is not None
):
queries.add(graph.edges[(previous_node, node)].get("query"))
if current_entity and from_node is not None: if current_fqns and from_node is not None:
for from_entity, to_entity in itertools.product( from_fqns = from_node.get("fqns", [])
from_node.get("entity") or [], current_entity or [] for from_fqn, to_fqn in itertools.product(from_fqns, current_fqns):
): lineage = _get_lineage_for_path(
if to_entity and from_entity: from_fqn=from_fqn,
yield _build_table_lineage( to_fqn=to_fqn,
to_entity=to_entity, from_node=from_node,
from_entity=from_entity, current_node=node,
to_table_raw_name=str(node), table_chain=table_chain,
from_table_raw_name=str(from_node), metadata=metadata,
masked_query="\n--------\n".join(queries),
column_lineage_map={},
lineage_source=LineageSource.QueryLineage,
procedure=None,
) )
clean_queries = True if lineage:
yield lineage
if current_entity: if current_fqns:
from_node = graph.nodes[node] from_node = graph.nodes[node]
previous_node = node
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error(f"Error creating lineage for node [{node}]: {exc}") logger.error(f"Error creating lineage for node [{node}]: {exc}")
def _get_paths_from_subtree(subtree: DiGraph) -> List[List[Any]]:
"""
Get all paths from root nodes to leaf nodes in a subtree
"""
paths = []
# Find all root nodes (nodes with no incoming edges)
root_nodes = [node for node in subtree if subtree.in_degree(node) == 0]
# Find all leaf nodes (nodes with no outgoing edges)
leaf_nodes = [node for node in subtree if subtree.out_degree(node) == 0]
# Find all simple paths from each root to each leaf
for root in root_nodes:
for leaf in leaf_nodes:
paths.extend(nx.all_simple_paths(subtree, root, leaf))
return paths
def get_lineage_by_graph( def get_lineage_by_graph(
graph: DiGraph, graph: DiGraph,
metadata: OpenMetadata,
) -> Iterable[Either[AddLineageRequest]]: ) -> Iterable[Either[AddLineageRequest]]:
""" """
Generate lineage information from a directed graph. Generate lineage information from a directed graph.
@ -835,6 +896,7 @@ def get_lineage_by_graph(
It then yields lineage information for each sequence. It then yields lineage information for each sequence.
Args: Args:
graph (DiGraph): A directed graph representing the lineage. graph (DiGraph): A directed graph representing the lineage.
metadata (OpenMetadata): OpenMetadata client instance to fetch table entities
Raises: Raises:
Exception: If an error occurs during the lineage creation process, it logs the error. Exception: If an error occurs during the lineage creation process, it logs the error.
""" """
@ -844,24 +906,8 @@ def get_lineage_by_graph(
# Get all weakly connected components # Get all weakly connected components
components = list(nx.weakly_connected_components(graph)) components = list(nx.weakly_connected_components(graph))
# Extract each component as an independent subgraph # Extract each component as an independent subgraph and process paths
independent_subtrees = [ for component in components:
graph.subgraph(component).copy() for component in components subtree = graph.subgraph(component).copy()
] for path in _get_paths_from_subtree(subtree):
yield from _process_sequence(path, subtree, metadata)
# Print results in the desired format
for subtree in independent_subtrees:
# Find a root node (node with no incoming edges)
root = [node for node in subtree if subtree.in_degree(node) == 0][0]
# Traverse from the root to get the sequence of nodes
current = root
sequence = [current]
while True:
successors = list(subtree.successors(current))
if not successors:
break
current = successors[0]
sequence.append(current)
yield from _process_sequence(sequence, subtree)

View File

@ -378,7 +378,9 @@ class LineageSource(QueryParserSource, ABC):
if self.source_config.processQueryLineage: if self.source_config.processQueryLineage:
if hasattr(self.service_connection, "supportsLineageExtraction"): if hasattr(self.service_connection, "supportsLineageExtraction"):
yield from self.yield_query_lineage() or [] yield from self.yield_query_lineage() or []
yield from get_lineage_by_graph(graph=self.graph) yield from get_lineage_by_graph(
graph=self.graph, metadata=self.metadata
)
else: else:
logger.warning( logger.warning(
f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection" f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection"