From 636a83514d173727e9c64bf482a5a60d83af48fb Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Tue, 4 Feb 2025 11:40:10 +0530 Subject: [PATCH] Fix #12094: Add support for temp table lineage (#19654) --- .../metadata/ingestion/lineage/sql_lineage.py | 104 ++++++++++++++++++ .../source/database/lineage_source.py | 14 ++- .../source/database/query_parser_source.py | 1 + .../source/database/snowflake/lineage.py | 2 + .../databaseServiceQueryLineagePipeline.json | 6 + .../databaseServiceQueryLineagePipeline.ts | 6 +- 6 files changed, 131 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index f2876197ac9..f7bd21265fb 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -16,8 +16,10 @@ import traceback from collections import defaultdict from typing import Any, Iterable, List, Optional, Tuple, Union +import networkx as nx from collate_sqllineage.core.models import Column, DataFunction from collate_sqllineage.core.models import Table as LineageTable +from networkx import DiGraph from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.storedProcedure import ( @@ -509,6 +511,7 @@ def _create_lineage_by_table_name( column_lineage_map: dict, lineage_source: LineageSource = LineageSource.QueryLineage, procedure: Optional[EntityReference] = None, + graph: DiGraph = None, ) -> Iterable[Either[AddLineageRequest]]: """ This method is to create a lineage between two tables @@ -539,6 +542,11 @@ def _create_lineage_by_table_name( logger.debug( f"WARNING: Table entity [{table_name}] not found in OpenMetadata" ) + if graph is not None and (not from_table_entities or not to_table_entities): + graph.add_node(from_table, entity=from_table_entities) + graph.add_node(to_table, entity=to_table_entities) + graph.add_edge(from_table, to_table, query=masked_query) + return for from_entity, to_entity in itertools.product( from_table_entities or [], to_table_entities or [] @@ -606,6 +614,7 @@ def get_lineage_by_query( dialect: Dialect, timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, lineage_source: LineageSource = LineageSource.QueryLineage, + graph: DiGraph = None, ) -> Iterable[Either[AddLineageRequest]]: """ This method parses the query to get source, target and intermediate table names to create lineage, @@ -645,6 +654,7 @@ def get_lineage_by_query( column_lineage_map=column_lineage, lineage_source=lineage_source, procedure=procedure, + graph=graph, ) for target_table in lineage_parser.target_tables: yield from _create_lineage_by_table_name( @@ -682,6 +692,7 @@ def get_lineage_by_query( column_lineage_map=column_lineage, lineage_source=lineage_source, procedure=procedure, + graph=graph, ) if not lineage_parser.query_parsing_success: query_parsing_failures.add( @@ -710,6 +721,7 @@ def get_lineage_via_table_entity( dialect: Dialect, timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, lineage_source: LineageSource = LineageSource.QueryLineage, + graph: DiGraph = None, ) -> Iterable[Either[AddLineageRequest]]: """Get lineage from table entity""" column_lineage = {} @@ -743,6 +755,7 @@ def get_lineage_via_table_entity( column_lineage_map=column_lineage, lineage_source=lineage_source, procedure=procedure, + graph=graph, ) or [] if not lineage_parser.query_parsing_success: query_parsing_failures.add( @@ -759,3 +772,94 @@ def get_lineage_via_table_entity( stackTrace=traceback.format_exc(), ) ) + + +def _process_sequence( + sequence: List[Any], graph: DiGraph +) -> Iterable[Either[AddLineageRequest]]: + """ + Process a sequence of nodes to generate lineage information. + """ + from_node = None + queries = set() + clean_queries = False + previous_node = None + for node in sequence: + try: + if clean_queries: + queries.clear() + clean_queries = False + current_node = graph.nodes[node] + current_entity = current_node.get("entity") + + if ( + previous_node is not None + and graph.edges[(previous_node, node)].get("query") is not None + ): + queries.add(graph.edges[(previous_node, node)].get("query")) + + if current_entity and from_node is not None: + for from_entity, to_entity in itertools.product( + from_node.get("entity") or [], current_entity or [] + ): + if to_entity and from_entity: + yield _build_table_lineage( + to_entity=to_entity, + from_entity=from_entity, + to_table_raw_name=str(node), + from_table_raw_name=str(from_node), + masked_query="\n--------\n".join(queries), + column_lineage_map={}, + lineage_source=LineageSource.QueryLineage, + procedure=None, + ) + clean_queries = True + + if current_entity: + from_node = graph.nodes[node] + previous_node = node + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Error creating lineage for node [{node}]: {exc}") + + +def get_lineage_by_graph( + graph: DiGraph, +) -> Iterable[Either[AddLineageRequest]]: + """ + Generate lineage information from a directed graph. + This method processes a directed graph to extract lineage information by identifying + weakly connected components and traversing each component to generate sequences of nodes. + It then yields lineage information for each sequence. + Args: + graph (DiGraph): A directed graph representing the lineage. + Raises: + Exception: If an error occurs during the lineage creation process, it logs the error. + """ + if graph is None: + return + + # Get all weakly connected components + components = list(nx.weakly_connected_components(graph)) + + # Extract each component as an independent subgraph + independent_subtrees = [ + graph.subgraph(component).copy() for component in components + ] + + # Print results in the desired format + for subtree in independent_subtrees: + # Find a root node (node with no incoming edges) + root = [node for node in subtree if subtree.in_degree(node) == 0][0] + + # Traverse from the root to get the sequence of nodes + current = root + sequence = [current] + while True: + successors = list(subtree.successors(current)) + if not successors: + break + current = successors[0] + sequence.append(current) + + yield from _process_sequence(sequence, subtree) diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index 5b13a651601..983690fede4 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -38,7 +38,11 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.api.models import Either from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect -from metadata.ingestion.lineage.sql_lineage import get_column_fqn, get_lineage_by_query +from metadata.ingestion.lineage.sql_lineage import ( + get_column_fqn, + get_lineage_by_graph, + get_lineage_by_query, +) from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest from metadata.ingestion.models.topology import Queue from metadata.ingestion.source.database.query_parser_source import QueryParserSource @@ -207,6 +211,12 @@ class LineageSource(QueryParserSource, ABC): def query_lineage_generator( self, table_queries: List[TableQuery], queue: Queue ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: + if self.graph is None and self.source_config.enableTempTableLineage: + import networkx as nx + + # Create a directed graph + self.graph = nx.DiGraph() + for table_query in table_queries or []: if not self._query_already_processed(table_query): lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query( @@ -217,6 +227,7 @@ class LineageSource(QueryParserSource, ABC): schema_name=table_query.databaseSchema, dialect=self.dialect, timeout_seconds=self.source_config.parsingTimeoutLimit, + graph=self.graph, ) for lineage_request in lineages or []: @@ -366,6 +377,7 @@ class LineageSource(QueryParserSource, ABC): if self.source_config.processQueryLineage: if hasattr(self.service_connection, "supportsLineageExtraction"): yield from self.yield_query_lineage() or [] + yield from get_lineage_by_graph(graph=self.graph) else: logger.warning( f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection" diff --git a/ingestion/src/metadata/ingestion/source/database/query_parser_source.py b/ingestion/src/metadata/ingestion/source/database/query_parser_source.py index e24396099c3..f7fe9def6c1 100644 --- a/ingestion/src/metadata/ingestion/source/database/query_parser_source.py +++ b/ingestion/src/metadata/ingestion/source/database/query_parser_source.py @@ -65,6 +65,7 @@ class QueryParserSource(Source, ABC): self.dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) self.source_config = self.config.sourceConfig.config self.start, self.end = get_start_and_end(self.source_config.queryLogDuration) + self.graph = None self.engine = None if get_engine: diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py b/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py index 2d85ed8772e..3ef0232b790 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py @@ -47,6 +47,8 @@ class SnowflakeLineageSource( AND ( QUERY_TYPE IN ('MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT') OR (QUERY_TYPE = 'INSERT' and query_text ILIKE '%%insert%%into%%select%%') + OR (QUERY_TYPE = 'ALTER' and query_text ILIKE '%%alter%%table%%swap%%') + OR (QUERY_TYPE = 'CREATE_TABLE' and query_text ILIKE '%%clone%%') ) """ diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json index 13ba6e286df..d4102765ff9 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json @@ -105,6 +105,12 @@ "items": { "type": "string" } + }, + "enableTempTableLineage": { + "title": "Enable Temp Table Lineage", + "description": "Handle Lineage for Snowflake Temporary and Transient Tables. ", + "type": "boolean", + "default": false } }, "additionalProperties": false diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceQueryLineagePipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceQueryLineagePipeline.ts index eb9fa3aff5d..e255c008af5 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceQueryLineagePipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceQueryLineagePipeline.ts @@ -1,5 +1,5 @@ /* - * Copyright 2024 Collate. + * Copyright 2025 Collate. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -22,6 +22,10 @@ export interface DatabaseServiceQueryLineagePipeline { * Regex to only fetch databases that matches the pattern. */ databaseFilterPattern?: FilterPattern; + /** + * Handle Lineage for Snowflake Temporary and Transient Tables. + */ + enableTempTableLineage?: boolean; /** * Configuration the condition to filter the query history. */