Fix #12094: Add support for temp table lineage (#19654)

This commit is contained in:
Mayur Singal 2025-02-04 11:40:10 +05:30 committed by GitHub
parent 2308d4a4f3
commit 636a83514d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 131 additions and 2 deletions

View File

@ -16,8 +16,10 @@ import traceback
from collections import defaultdict from collections import defaultdict
from typing import Any, Iterable, List, Optional, Tuple, Union from typing import Any, Iterable, List, Optional, Tuple, Union
import networkx as nx
from collate_sqllineage.core.models import Column, DataFunction from collate_sqllineage.core.models import Column, DataFunction
from collate_sqllineage.core.models import Table as LineageTable from collate_sqllineage.core.models import Table as LineageTable
from networkx import DiGraph
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.storedProcedure import ( from metadata.generated.schema.entity.data.storedProcedure import (
@ -509,6 +511,7 @@ def _create_lineage_by_table_name(
column_lineage_map: dict, column_lineage_map: dict,
lineage_source: LineageSource = LineageSource.QueryLineage, lineage_source: LineageSource = LineageSource.QueryLineage,
procedure: Optional[EntityReference] = None, procedure: Optional[EntityReference] = None,
graph: DiGraph = None,
) -> Iterable[Either[AddLineageRequest]]: ) -> Iterable[Either[AddLineageRequest]]:
""" """
This method is to create a lineage between two tables This method is to create a lineage between two tables
@ -539,6 +542,11 @@ def _create_lineage_by_table_name(
logger.debug( logger.debug(
f"WARNING: Table entity [{table_name}] not found in OpenMetadata" f"WARNING: Table entity [{table_name}] not found in OpenMetadata"
) )
if graph is not None and (not from_table_entities or not to_table_entities):
graph.add_node(from_table, entity=from_table_entities)
graph.add_node(to_table, entity=to_table_entities)
graph.add_edge(from_table, to_table, query=masked_query)
return
for from_entity, to_entity in itertools.product( for from_entity, to_entity in itertools.product(
from_table_entities or [], to_table_entities or [] from_table_entities or [], to_table_entities or []
@ -606,6 +614,7 @@ def get_lineage_by_query(
dialect: Dialect, dialect: Dialect,
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
lineage_source: LineageSource = LineageSource.QueryLineage, lineage_source: LineageSource = LineageSource.QueryLineage,
graph: DiGraph = None,
) -> Iterable[Either[AddLineageRequest]]: ) -> Iterable[Either[AddLineageRequest]]:
""" """
This method parses the query to get source, target and intermediate table names to create lineage, This method parses the query to get source, target and intermediate table names to create lineage,
@ -645,6 +654,7 @@ def get_lineage_by_query(
column_lineage_map=column_lineage, column_lineage_map=column_lineage,
lineage_source=lineage_source, lineage_source=lineage_source,
procedure=procedure, procedure=procedure,
graph=graph,
) )
for target_table in lineage_parser.target_tables: for target_table in lineage_parser.target_tables:
yield from _create_lineage_by_table_name( yield from _create_lineage_by_table_name(
@ -682,6 +692,7 @@ def get_lineage_by_query(
column_lineage_map=column_lineage, column_lineage_map=column_lineage,
lineage_source=lineage_source, lineage_source=lineage_source,
procedure=procedure, procedure=procedure,
graph=graph,
) )
if not lineage_parser.query_parsing_success: if not lineage_parser.query_parsing_success:
query_parsing_failures.add( query_parsing_failures.add(
@ -710,6 +721,7 @@ def get_lineage_via_table_entity(
dialect: Dialect, dialect: Dialect,
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
lineage_source: LineageSource = LineageSource.QueryLineage, lineage_source: LineageSource = LineageSource.QueryLineage,
graph: DiGraph = None,
) -> Iterable[Either[AddLineageRequest]]: ) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage from table entity""" """Get lineage from table entity"""
column_lineage = {} column_lineage = {}
@ -743,6 +755,7 @@ def get_lineage_via_table_entity(
column_lineage_map=column_lineage, column_lineage_map=column_lineage,
lineage_source=lineage_source, lineage_source=lineage_source,
procedure=procedure, procedure=procedure,
graph=graph,
) or [] ) or []
if not lineage_parser.query_parsing_success: if not lineage_parser.query_parsing_success:
query_parsing_failures.add( query_parsing_failures.add(
@ -759,3 +772,94 @@ def get_lineage_via_table_entity(
stackTrace=traceback.format_exc(), stackTrace=traceback.format_exc(),
) )
) )
def _process_sequence(
sequence: List[Any], graph: DiGraph
) -> Iterable[Either[AddLineageRequest]]:
"""
Process a sequence of nodes to generate lineage information.
"""
from_node = None
queries = set()
clean_queries = False
previous_node = None
for node in sequence:
try:
if clean_queries:
queries.clear()
clean_queries = False
current_node = graph.nodes[node]
current_entity = current_node.get("entity")
if (
previous_node is not None
and graph.edges[(previous_node, node)].get("query") is not None
):
queries.add(graph.edges[(previous_node, node)].get("query"))
if current_entity and from_node is not None:
for from_entity, to_entity in itertools.product(
from_node.get("entity") or [], current_entity or []
):
if to_entity and from_entity:
yield _build_table_lineage(
to_entity=to_entity,
from_entity=from_entity,
to_table_raw_name=str(node),
from_table_raw_name=str(from_node),
masked_query="\n--------\n".join(queries),
column_lineage_map={},
lineage_source=LineageSource.QueryLineage,
procedure=None,
)
clean_queries = True
if current_entity:
from_node = graph.nodes[node]
previous_node = node
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Error creating lineage for node [{node}]: {exc}")
def get_lineage_by_graph(
graph: DiGraph,
) -> Iterable[Either[AddLineageRequest]]:
"""
Generate lineage information from a directed graph.
This method processes a directed graph to extract lineage information by identifying
weakly connected components and traversing each component to generate sequences of nodes.
It then yields lineage information for each sequence.
Args:
graph (DiGraph): A directed graph representing the lineage.
Raises:
Exception: If an error occurs during the lineage creation process, it logs the error.
"""
if graph is None:
return
# Get all weakly connected components
components = list(nx.weakly_connected_components(graph))
# Extract each component as an independent subgraph
independent_subtrees = [
graph.subgraph(component).copy() for component in components
]
# Print results in the desired format
for subtree in independent_subtrees:
# Find a root node (node with no incoming edges)
root = [node for node in subtree if subtree.in_degree(node) == 0][0]
# Traverse from the root to get the sequence of nodes
current = root
sequence = [current]
while True:
successors = list(subtree.successors(current))
if not successors:
break
current = successors[0]
sequence.append(current)
yield from _process_sequence(sequence, subtree)

View File

@ -38,7 +38,11 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tableQuery import TableQuery from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.models import Either from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.sql_lineage import get_column_fqn, get_lineage_by_query from metadata.ingestion.lineage.sql_lineage import (
get_column_fqn,
get_lineage_by_graph,
get_lineage_by_query,
)
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.models.topology import Queue from metadata.ingestion.models.topology import Queue
from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.ingestion.source.database.query_parser_source import QueryParserSource
@ -207,6 +211,12 @@ class LineageSource(QueryParserSource, ABC):
def query_lineage_generator( def query_lineage_generator(
self, table_queries: List[TableQuery], queue: Queue self, table_queries: List[TableQuery], queue: Queue
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
if self.graph is None and self.source_config.enableTempTableLineage:
import networkx as nx
# Create a directed graph
self.graph = nx.DiGraph()
for table_query in table_queries or []: for table_query in table_queries or []:
if not self._query_already_processed(table_query): if not self._query_already_processed(table_query):
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query( lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
@ -217,6 +227,7 @@ class LineageSource(QueryParserSource, ABC):
schema_name=table_query.databaseSchema, schema_name=table_query.databaseSchema,
dialect=self.dialect, dialect=self.dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit, timeout_seconds=self.source_config.parsingTimeoutLimit,
graph=self.graph,
) )
for lineage_request in lineages or []: for lineage_request in lineages or []:
@ -366,6 +377,7 @@ class LineageSource(QueryParserSource, ABC):
if self.source_config.processQueryLineage: if self.source_config.processQueryLineage:
if hasattr(self.service_connection, "supportsLineageExtraction"): if hasattr(self.service_connection, "supportsLineageExtraction"):
yield from self.yield_query_lineage() or [] yield from self.yield_query_lineage() or []
yield from get_lineage_by_graph(graph=self.graph)
else: else:
logger.warning( logger.warning(
f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection" f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection"

View File

@ -65,6 +65,7 @@ class QueryParserSource(Source, ABC):
self.dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) self.dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
self.source_config = self.config.sourceConfig.config self.source_config = self.config.sourceConfig.config
self.start, self.end = get_start_and_end(self.source_config.queryLogDuration) self.start, self.end = get_start_and_end(self.source_config.queryLogDuration)
self.graph = None
self.engine = None self.engine = None
if get_engine: if get_engine:

View File

@ -47,6 +47,8 @@ class SnowflakeLineageSource(
AND ( AND (
QUERY_TYPE IN ('MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT') QUERY_TYPE IN ('MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT')
OR (QUERY_TYPE = 'INSERT' and query_text ILIKE '%%insert%%into%%select%%') OR (QUERY_TYPE = 'INSERT' and query_text ILIKE '%%insert%%into%%select%%')
OR (QUERY_TYPE = 'ALTER' and query_text ILIKE '%%alter%%table%%swap%%')
OR (QUERY_TYPE = 'CREATE_TABLE' and query_text ILIKE '%%clone%%')
) )
""" """

View File

@ -105,6 +105,12 @@
"items": { "items": {
"type": "string" "type": "string"
} }
},
"enableTempTableLineage": {
"title": "Enable Temp Table Lineage",
"description": "Handle Lineage for Snowflake Temporary and Transient Tables. ",
"type": "boolean",
"default": false
} }
}, },
"additionalProperties": false "additionalProperties": false

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2024 Collate. * Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
@ -22,6 +22,10 @@ export interface DatabaseServiceQueryLineagePipeline {
* Regex to only fetch databases that matches the pattern. * Regex to only fetch databases that matches the pattern.
*/ */
databaseFilterPattern?: FilterPattern; databaseFilterPattern?: FilterPattern;
/**
* Handle Lineage for Snowflake Temporary and Transient Tables.
*/
enableTempTableLineage?: boolean;
/** /**
* Configuration the condition to filter the query history. * Configuration the condition to filter the query history.
*/ */