diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index 577e6a87498..bf9ec0a856b 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -47,6 +47,7 @@ from metadata.ingestion.lineage.models import ( from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT, LineageParser from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn +from metadata.utils.elasticsearch import get_entity_from_es_result from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.logger import utils_logger from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache @@ -516,7 +517,6 @@ def _create_lineage_by_table_name( """ This method is to create a lineage between two tables """ - try: from_table_entities = get_table_entities_from_query( metadata=metadata, @@ -543,9 +543,20 @@ def _create_lineage_by_table_name( f"WARNING: Table entity [{table_name}] not found in OpenMetadata" ) if graph is not None and (not from_table_entities or not to_table_entities): - graph.add_node(from_table, entity=from_table_entities) - graph.add_node(to_table, entity=to_table_entities) - graph.add_edge(from_table, to_table, query=masked_query) + # Add nodes and edges with minimal data + graph.add_node( + from_table, + fqns=[table.fullyQualifiedName.root for table in from_table_entities] + if from_table_entities + else [], + ) + graph.add_node( + to_table, + fqns=[table.fullyQualifiedName.root for table in to_table_entities] + if to_table_entities + else [], + ) + graph.add_edge(from_table, to_table) return for from_entity, to_entity in itertools.product( @@ -776,57 +787,107 @@ def get_lineage_via_table_entity( ) +def _get_lineage_for_path( + from_fqn: str, + to_fqn: str, + from_node: Any, + current_node: Any, + table_chain: List[str], + metadata: OpenMetadata, +) -> Optional[Either[AddLineageRequest]]: + """ + Get lineage for a pair of FQNs in the path + """ + try: + to_entity = get_entity_from_es_result( + entity_list=metadata.es_search_from_fqn( + entity_type=Table, + fqn_search_string=to_fqn, + ), + ) + from_entity = get_entity_from_es_result( + entity_list=metadata.es_search_from_fqn( + entity_type=Table, + fqn_search_string=from_fqn, + ), + ) + if to_entity and from_entity: + # Create the table chain string + table_relationship = "--- TEMPT TABLE LINEAGE \n--- " + table_relationship += " > ".join(table_chain) + return _build_table_lineage( + to_entity=to_entity, + from_entity=from_entity, + to_table_raw_name=str(current_node), + from_table_raw_name=str(from_node), + masked_query=table_relationship, # Using table chain as the query + column_lineage_map={}, + lineage_source=LineageSource.QueryLineage, + procedure=None, + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Error fetching table entities [{from_fqn} -> {to_fqn}]: {exc}") + return None + + def _process_sequence( - sequence: List[Any], graph: DiGraph + sequence: List[Any], graph: DiGraph, metadata: OpenMetadata ) -> Iterable[Either[AddLineageRequest]]: """ Process a sequence of nodes to generate lineage information. """ from_node = None - queries = set() - clean_queries = False - previous_node = None + table_chain = [] for node in sequence: try: - if clean_queries: - queries.clear() - clean_queries = False current_node = graph.nodes[node] - current_entity = current_node.get("entity") + current_fqns = current_node.get("fqns", []) - if ( - previous_node is not None - and graph.edges[(previous_node, node)].get("query") is not None - ): - queries.add(graph.edges[(previous_node, node)].get("query")) + # Add the current node name to the chain + table_chain.append(str(node).replace(f"{DEFAULT_SCHEMA_NAME}.", "")) - if current_entity and from_node is not None: - for from_entity, to_entity in itertools.product( - from_node.get("entity") or [], current_entity or [] - ): - if to_entity and from_entity: - yield _build_table_lineage( - to_entity=to_entity, - from_entity=from_entity, - to_table_raw_name=str(node), - from_table_raw_name=str(from_node), - masked_query="\n--------\n".join(queries), - column_lineage_map={}, - lineage_source=LineageSource.QueryLineage, - procedure=None, - ) - clean_queries = True + if current_fqns and from_node is not None: + from_fqns = from_node.get("fqns", []) + for from_fqn, to_fqn in itertools.product(from_fqns, current_fqns): + lineage = _get_lineage_for_path( + from_fqn=from_fqn, + to_fqn=to_fqn, + from_node=from_node, + current_node=node, + table_chain=table_chain, + metadata=metadata, + ) + if lineage: + yield lineage - if current_entity: + if current_fqns: from_node = graph.nodes[node] - previous_node = node except Exception as exc: logger.debug(traceback.format_exc()) logger.error(f"Error creating lineage for node [{node}]: {exc}") +def _get_paths_from_subtree(subtree: DiGraph) -> List[List[Any]]: + """ + Get all paths from root nodes to leaf nodes in a subtree + """ + paths = [] + # Find all root nodes (nodes with no incoming edges) + root_nodes = [node for node in subtree if subtree.in_degree(node) == 0] + # Find all leaf nodes (nodes with no outgoing edges) + leaf_nodes = [node for node in subtree if subtree.out_degree(node) == 0] + + # Find all simple paths from each root to each leaf + for root in root_nodes: + for leaf in leaf_nodes: + paths.extend(nx.all_simple_paths(subtree, root, leaf)) + return paths + + def get_lineage_by_graph( graph: DiGraph, + metadata: OpenMetadata, ) -> Iterable[Either[AddLineageRequest]]: """ Generate lineage information from a directed graph. @@ -835,6 +896,7 @@ def get_lineage_by_graph( It then yields lineage information for each sequence. Args: graph (DiGraph): A directed graph representing the lineage. + metadata (OpenMetadata): OpenMetadata client instance to fetch table entities Raises: Exception: If an error occurs during the lineage creation process, it logs the error. """ @@ -844,24 +906,8 @@ def get_lineage_by_graph( # Get all weakly connected components components = list(nx.weakly_connected_components(graph)) - # Extract each component as an independent subgraph - independent_subtrees = [ - graph.subgraph(component).copy() for component in components - ] - - # Print results in the desired format - for subtree in independent_subtrees: - # Find a root node (node with no incoming edges) - root = [node for node in subtree if subtree.in_degree(node) == 0][0] - - # Traverse from the root to get the sequence of nodes - current = root - sequence = [current] - while True: - successors = list(subtree.successors(current)) - if not successors: - break - current = successors[0] - sequence.append(current) - - yield from _process_sequence(sequence, subtree) + # Extract each component as an independent subgraph and process paths + for component in components: + subtree = graph.subgraph(component).copy() + for path in _get_paths_from_subtree(subtree): + yield from _process_sequence(path, subtree, metadata) diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index 3918eabc4df..d220564e4fb 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -378,7 +378,9 @@ class LineageSource(QueryParserSource, ABC): if self.source_config.processQueryLineage: if hasattr(self.service_connection, "supportsLineageExtraction"): yield from self.yield_query_lineage() or [] - yield from get_lineage_by_graph(graph=self.graph) + yield from get_lineage_by_graph( + graph=self.graph, metadata=self.metadata + ) else: logger.warning( f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection"