FIX #21180: Implement Cross Service Lineage (#22665)

This commit is contained in:
Mayur Singal 2025-08-12 16:54:40 +05:30 committed by GitHub
parent 0c9f5a950b
commit dc4c75ded4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 771 additions and 87 deletions

View File

@ -75,33 +75,33 @@ search_cache = LRUCache(LRU_CACHE_SIZE)
def search_table_entities(
metadata: OpenMetadata,
service_name: str,
service_names: Union[str, List[str]],
database: Optional[str],
database_schema: Optional[str],
table: str,
) -> Optional[List[Table]]:
"""
Method to get table entity from database, database_schema & table name.
It will try to search first in ES and doing an extra call to get Table entities
with the needed fields like columns for column lineage.
If the ES result is empty, it will try by running
a request against the API to find the Entity.
Now supports searching across multiple services (cross-database lineage).
Args:
metadata: OMeta client
service_name: service name
service_names: service name or list of service names (current + cross db)
database: database name
database_schema: schema name
table: table name
Returns:
A list of Table entities, otherwise, None
A list of Table entities from the first service where found, otherwise None
"""
if isinstance(service_names, str):
service_names = [service_names]
for service_name in service_names:
search_tuple = (service_name, database, database_schema, table)
if search_tuple in search_cache:
return search_cache.get(search_tuple)
result = search_cache.get(search_tuple)
if result:
return result
try:
table_entities: Optional[List[Table]] = []
# search on ES first
@ -132,6 +132,7 @@ def search_table_entities(
table_entities.append(table_entity)
# added the search tuple to the cache
search_cache.put(search_tuple, table_entities)
if table_entities:
return table_entities
except Exception as exc:
logger.debug(traceback.format_exc())
@ -360,7 +361,7 @@ def get_source_table_names(
def get_table_entities_from_query(
metadata: OpenMetadata,
service_name: str,
service_names: Union[str, List[str]],
database_name: str,
database_schema: str,
table_name: str,
@ -371,8 +372,7 @@ def get_table_entities_from_query(
If the sys data is incorrect, use the table name ingredients.
:param metadata: OpenMetadata client
:param service_name: Service being ingested.
:param service_names: Service(s) being ingested (current + cross db)
:param database_name: Name of the database informed on db sys results
:param database_schema: Name of the schema informed on db sys results
:param table_name: Table name extracted from query. Can be `table`, `schema.table` or `db.schema.table`
@ -386,7 +386,7 @@ def get_table_entities_from_query(
table_entities = search_table_entities(
metadata=metadata,
service_name=service_name,
service_names=service_names,
database=database_query if database_query else database_name,
database_schema=schema_query if schema_query else database_schema,
table=table,
@ -536,7 +536,7 @@ def _create_lineage_by_table_name(
try:
from_table_entities = get_table_entities_from_query(
metadata=metadata,
service_name=service_name,
service_names=service_name,
database_name=database_name,
database_schema=schema_name,
table_name=from_table,
@ -545,7 +545,7 @@ def _create_lineage_by_table_name(
to_table_entities = get_table_entities_from_query(
metadata=metadata,
service_name=service_name,
service_names=service_name,
database_name=database_name,
database_schema=schema_name,
table_name=to_table,
@ -640,7 +640,7 @@ def populate_column_lineage_map(raw_column_lineage):
# pylint: disable=too-many-locals
def get_lineage_by_query(
metadata: OpenMetadata,
service_name: str,
service_names: Union[str, List[str]],
database_name: Optional[str],
schema_name: Optional[str],
query: str,
@ -649,14 +649,23 @@ def get_lineage_by_query(
lineage_source: LineageSource = LineageSource.QueryLineage,
graph: DiGraph = None,
schema_fallback: bool = False,
service_name: Optional[str] = None, # backward compatibility for python sdk
) -> Iterable[Either[AddLineageRequest]]:
"""
This method parses the query to get source, target and intermediate table names to create lineage,
and returns True if target table is found to create lineage otherwise returns False.
Now supports cross-database lineage by accepting a list of service names.
"""
column_lineage = {}
query_parsing_failures = QueryParsingFailures()
if service_name and isinstance(service_name, str):
service_names = [service_name]
logger.warning(
"Deprecated: service_name is deprecated, use service_names instead"
)
if isinstance(service_names, str):
service_names = [service_names]
try:
lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds)
masked_query = lineage_parser.masked_query
@ -673,7 +682,7 @@ def get_lineage_by_query(
source_table=source_table,
database_name=database_name,
schema_name=schema_name,
service_name=service_name,
service_name=service_names,
timeout_seconds=timeout_seconds,
column_lineage=column_lineage,
):
@ -681,7 +690,7 @@ def get_lineage_by_query(
metadata,
from_table=str(from_table_name),
to_table=str(intermediate_table),
service_name=service_name,
service_name=service_names,
database_name=database_name,
schema_name=schema_name,
masked_query=masked_query,
@ -696,13 +705,14 @@ def get_lineage_by_query(
metadata,
from_table=str(intermediate_table),
to_table=str(target_table),
service_name=service_name,
service_name=service_names,
database_name=database_name,
schema_name=schema_name,
masked_query=masked_query,
column_lineage_map=column_lineage,
lineage_source=lineage_source,
schema_fallback=schema_fallback,
graph=graph,
)
if not lineage_parser.intermediate_tables:
for target_table in lineage_parser.target_tables:
@ -713,7 +723,7 @@ def get_lineage_by_query(
source_table=source_table,
database_name=database_name,
schema_name=schema_name,
service_name=service_name,
service_name=service_names,
timeout_seconds=timeout_seconds,
column_lineage=column_lineage,
):
@ -721,7 +731,7 @@ def get_lineage_by_query(
metadata,
from_table=str(from_table_name),
to_table=str(target_table),
service_name=service_name,
service_name=service_names,
database_name=database_name,
schema_name=schema_name,
masked_query=masked_query,
@ -742,7 +752,7 @@ def get_lineage_by_query(
yield Either(
left=StackTraceError(
name="Lineage",
error=f"Ingesting lineage failed for service [{service_name}]: {exc}",
error=f"Ingesting lineage failed for service(s) [{service_names}]: {exc}",
stackTrace=traceback.format_exc(),
)
)
@ -753,7 +763,7 @@ def get_lineage_via_table_entity(
table_entity: Table,
database_name: str,
schema_name: str,
service_name: str,
service_names: Union[str, List[str]],
query: str,
dialect: Dialect,
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
@ -765,6 +775,8 @@ def get_lineage_via_table_entity(
column_lineage = {}
query_parsing_failures = QueryParsingFailures()
if isinstance(service_names, str):
service_names = [service_names]
try:
lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds)
masked_query = lineage_parser.masked_query
@ -780,7 +792,7 @@ def get_lineage_via_table_entity(
source_table=from_table_name,
database_name=database_name,
schema_name=schema_name,
service_name=service_name,
service_name=service_names,
timeout_seconds=timeout_seconds,
column_lineage=column_lineage,
):
@ -788,7 +800,7 @@ def get_lineage_via_table_entity(
metadata,
from_table=str(source_table),
to_table=f"{schema_name}.{to_table_name}",
service_name=service_name,
service_name=service_names,
database_name=database_name,
schema_name=schema_name,
masked_query=masked_query,
@ -809,7 +821,7 @@ def get_lineage_via_table_entity(
Either(
left=StackTraceError(
name="Lineage",
error=f"Failed to create view lineage for database [{database_name}] and table [{table_entity}]: {exc}",
error=f"Failed to create view lineage for database [{database_name}] and table [{table_entity}] with service(s) [{service_names}]: {exc}",
stackTrace=traceback.format_exc(),
)
)

View File

@ -383,7 +383,7 @@ class OMetaLineageMixin(Generic[T]):
connection_type = database_service.serviceType.value
add_lineage_request = get_lineage_by_query(
metadata=self,
service_name=database_service.name.root,
service_names=database_service.name.root,
dialect=ConnectionTypeDialectMapper.dialect_of(connection_type),
query=sql,
database_name=database_name,

View File

@ -924,7 +924,7 @@ class DbtSource(DbtServiceSource):
lineages = get_lineage_by_query(
self.metadata,
query=query,
service_name=source_elements[0],
service_names=source_elements[0],
database_name=source_elements[1],
schema_name=source_elements[2],
dialect=dialect,

View File

@ -262,10 +262,18 @@ class LineageSource(QueryParserSource, ABC):
for table_query in table_queries or []:
if not self._query_already_processed(table_query):
# Prepare service names for lineage processing
service_names = [table_query.serviceName]
if (
self.source_config.processCrossDatabaseLineage
and self.source_config.crossDatabaseServiceNames
):
service_names.extend(self.source_config.crossDatabaseServiceNames)
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
service_names=service_names,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=self.dialect,
@ -334,10 +342,18 @@ class LineageSource(QueryParserSource, ABC):
"View Filtered Out",
)
continue
# Prepare service names for view lineage processing
service_names = [self.config.serviceName]
if (
self.source_config.processCrossDatabaseLineage
and self.source_config.crossDatabaseServiceNames
):
service_names.extend(self.source_config.crossDatabaseServiceNames)
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.config.serviceName,
service_names=service_names,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.parsingTimeoutLimit,
):

View File

@ -207,10 +207,18 @@ class StoredProcedureLineageMixin(ABC):
).graph
self.stored_procedure_query_lineage = True
# Prepare service names for lineage processing
service_names = [self.service_name]
if (
self.source_config.processCrossDatabaseLineage
and self.source_config.crossDatabaseServiceNames
):
service_names.extend(self.source_config.crossDatabaseServiceNames)
for either_lineage in get_lineage_by_query(
self.metadata,
query=query_by_procedure.query_text,
service_name=self.service_name,
service_names=service_names,
database_name=query_by_procedure.query_database_name,
schema_name=query_by_procedure.query_schema_name,
dialect=ConnectionTypeDialectMapper.dialect_of(

View File

@ -14,7 +14,7 @@ Helpers module for db sources
"""
import traceback
from typing import Iterable
from typing import Iterable, List, Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table
@ -50,13 +50,16 @@ def get_host_from_host_port(uri: str) -> str:
def get_view_lineage(
view: TableView,
metadata: OpenMetadata,
service_name: str,
service_names: Union[str, List[str]],
connection_type: str,
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
) -> Iterable[Either[AddLineageRequest]]:
"""
Method to generate view lineage
Now supports cross-database lineage by accepting a list of service names.
"""
if isinstance(service_names, str):
service_names = [service_names]
table_name = view.table_name
schema_name = view.schema_name
db_name = view.db_name
@ -65,7 +68,7 @@ def get_view_lineage(
table_fqn = fqn.build(
metadata,
entity_type=Table,
service_name=service_name,
service_name=service_names[0], # Use first service for table entity lookup
database_name=db_name,
schema_name=schema_name,
table_name=table_name,
@ -95,7 +98,7 @@ def get_view_lineage(
yield from get_lineage_by_query(
metadata,
query=view_definition,
service_name=service_name,
service_names=service_names,
database_name=db_name,
schema_name=schema_name,
dialect=dialect,
@ -108,7 +111,7 @@ def get_view_lineage(
yield from get_lineage_via_table_entity(
metadata,
table_entity=table_entity,
service_name=service_name,
service_names=service_names,
database_name=db_name,
schema_name=schema_name,
query=view_definition,

View File

@ -0,0 +1,644 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test cross database lineage functionality in SQL lineage module
"""
import uuid
from datetime import datetime
from unittest import TestCase
from unittest.mock import MagicMock, patch
from metadata.generated.schema.entity.data.storedProcedure import (
StoredProcedure,
StoredProcedureCode,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import (
DatabaseServiceQueryLineagePipeline,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.sql_lineage import (
get_lineage_by_query,
get_lineage_via_table_entity,
get_table_entities_from_query,
search_table_entities,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureLineageMixin,
)
from metadata.utils.db_utils import get_view_lineage
class CrossDatabaseLineageSQLTest(TestCase):
"""
Test cross database lineage functionality in SQL lineage module
"""
def setUp(self):
"""Set up test fixtures"""
self.mock_metadata = MagicMock()
# Create mock tables
self.mock_table1 = Table(
id=uuid.uuid4(),
name="test_table",
fullyQualifiedName="service1.db1.schema1.test_table",
columns=[
{
"name": "id",
"dataType": "NUMBER",
"fullyQualifiedName": "service1.db1.schema1.test_table.id",
},
{
"name": "name",
"dataType": "VARCHAR",
"fullyQualifiedName": "service1.db1.schema1.test_table.name",
},
],
)
self.mock_table2 = Table(
id=uuid.uuid4(),
name="test_table",
fullyQualifiedName="service2.db2.schema2.test_table",
columns=[
{
"name": "id",
"dataType": "NUMBER",
"fullyQualifiedName": "service2.db2.schema2.test_table.id",
},
{
"name": "name",
"dataType": "VARCHAR",
"fullyQualifiedName": "service2.db2.schema2.test_table.name",
},
],
)
def test_search_table_entities_single_service(self):
"""Test search_table_entities with single service (backward compatibility)"""
# Mock the metadata methods
self.mock_metadata.es_search_from_fqn.return_value = [self.mock_table1]
# Test with single service name (string)
result = search_table_entities(
metadata=self.mock_metadata,
service_names="service1",
database="db1",
database_schema="schema1",
table="test_table",
)
# Verify the result
self.assertEqual(result, [self.mock_table1])
# Verify the method was called correctly
self.mock_metadata.es_search_from_fqn.assert_called()
def test_search_table_entities_multiple_services(self):
"""Test search_table_entities with multiple services (cross-database)"""
# Mock the metadata methods - first service returns None for both ES and API, second returns table
self.mock_metadata.es_search_from_fqn.side_effect = [None, [self.mock_table2]]
# Mock fqn.build to return empty list for first service, list with FQN for second service
with patch(
"metadata.ingestion.lineage.sql_lineage.fqn.build"
) as mock_fqn_build:
mock_fqn_build.side_effect = [[], ["service2.db2.schema2.test_table"]]
# Mock metadata.get_by_name to return the table for second service
self.mock_metadata.get_by_name.return_value = self.mock_table2
# Test with multiple service names
result = search_table_entities(
metadata=self.mock_metadata,
service_names=["service1", "service2"],
database="db2",
database_schema="schema2",
table="test_table",
)
# Verify the result - should return table from second service
self.assertEqual(result, [self.mock_table2])
# Verify the method was called for both services
self.assertEqual(self.mock_metadata.es_search_from_fqn.call_count, 2)
def test_search_table_entities_no_results(self):
"""Test search_table_entities when no tables are found in any service"""
# Mock the metadata methods to return None for all services
self.mock_metadata.es_search_from_fqn.return_value = None
# Mock fqn.build to return empty list
with patch(
"metadata.ingestion.lineage.sql_lineage.fqn.build"
) as mock_fqn_build:
mock_fqn_build.return_value = []
# Test with multiple service names
result = search_table_entities(
metadata=self.mock_metadata,
service_names=["service1", "service2"],
database="db1",
database_schema="schema1",
table="nonexistent_table",
)
# Verify the result is None
self.assertIsNone(result)
def test_get_table_entities_from_query_single_service(self):
"""Test get_table_entities_from_query with single service (backward compatibility)"""
# Mock search_table_entities to return a table
with patch(
"metadata.ingestion.lineage.sql_lineage.search_table_entities"
) as mock_search:
mock_search.return_value = [self.mock_table1]
result = get_table_entities_from_query(
metadata=self.mock_metadata,
service_names="service1",
database_name="db1",
database_schema="schema1",
table_name="test_table",
)
# Verify the result
self.assertEqual(result, [self.mock_table1])
# Verify search_table_entities was called correctly
mock_search.assert_called_with(
metadata=self.mock_metadata,
service_names="service1",
database="db1",
database_schema="schema1",
table="test_table",
)
def test_get_table_entities_from_query_multiple_services(self):
"""Test get_table_entities_from_query with multiple services (cross-database)"""
# Mock search_table_entities to return a table from second service
with patch(
"metadata.ingestion.lineage.sql_lineage.search_table_entities"
) as mock_search:
mock_search.return_value = [self.mock_table2]
result = get_table_entities_from_query(
metadata=self.mock_metadata,
service_names=["service1", "service2"],
database_name="db2",
database_schema="schema2",
table_name="test_table",
)
# Verify the result
self.assertEqual(result, [self.mock_table2])
# Verify search_table_entities was called correctly
mock_search.assert_called_with(
metadata=self.mock_metadata,
service_names=["service1", "service2"],
database="db2",
database_schema="schema2",
table="test_table",
)
def test_get_lineage_by_query_single_service(self):
"""Test get_lineage_by_query with single service (backward compatibility)"""
# Mock the lineage parser and other dependencies
with patch(
"metadata.ingestion.lineage.sql_lineage.LineageParser"
) as mock_parser:
mock_parser_instance = MagicMock()
mock_parser_instance.masked_query = "SELECT * FROM test"
mock_parser_instance.column_lineage = []
mock_parser_instance.intermediate_tables = []
mock_parser_instance.source_tables = []
mock_parser_instance.target_tables = []
mock_parser_instance.query_parsing_success = True
mock_parser.return_value = mock_parser_instance
# Mock get_source_table_names to return empty
with patch(
"metadata.ingestion.lineage.sql_lineage.get_source_table_names"
) as mock_source:
mock_source.return_value = []
result = list(
get_lineage_by_query(
metadata=self.mock_metadata,
service_names="service1",
database_name="db1",
schema_name="schema1",
query="SELECT * FROM test",
dialect=Dialect.ANSI,
)
)
# Verify no lineage is generated (empty source tables)
self.assertEqual(len(result), 0)
def test_get_lineage_by_query_multiple_services(self):
"""Test get_lineage_by_query with multiple services (cross-database)"""
# Mock the lineage parser and other dependencies
with patch(
"metadata.ingestion.lineage.sql_lineage.LineageParser"
) as mock_parser:
mock_parser_instance = MagicMock()
mock_parser_instance.masked_query = "SELECT * FROM test"
mock_parser_instance.column_lineage = []
mock_parser_instance.intermediate_tables = []
mock_parser_instance.source_tables = []
mock_parser_instance.target_tables = []
mock_parser_instance.query_parsing_success = True
mock_parser.return_value = mock_parser_instance
# Mock get_source_table_names to return empty
with patch(
"metadata.ingestion.lineage.sql_lineage.get_source_table_names"
) as mock_source:
mock_source.return_value = []
result = list(
get_lineage_by_query(
metadata=self.mock_metadata,
service_names=["service1", "service2"],
database_name="db1",
schema_name="schema1",
query="SELECT * FROM test",
dialect=Dialect.ANSI,
)
)
# Verify no lineage is generated (empty source tables)
self.assertEqual(len(result), 0)
def test_get_lineage_by_query_with_source_tables(self):
"""Test get_lineage_by_query with actual source tables (query lineage)"""
# Mock the lineage parser with source and target tables
with patch(
"metadata.ingestion.lineage.sql_lineage.LineageParser"
) as mock_parser:
mock_parser_instance = MagicMock()
mock_parser_instance.masked_query = (
"CREATE TABLE target AS SELECT * FROM source"
)
mock_parser_instance.column_lineage = []
mock_parser_instance.intermediate_tables = []
mock_parser_instance.source_tables = ["source"]
mock_parser_instance.target_tables = ["target"]
mock_parser_instance.query_parsing_success = True
mock_parser.return_value = mock_parser_instance
# Mock get_source_table_names to return a source table
with patch(
"metadata.ingestion.lineage.sql_lineage.get_source_table_names"
) as mock_source:
mock_source.return_value = [("", "source_table")]
# Mock search_table_entities to return a table
with patch(
"metadata.ingestion.lineage.sql_lineage.search_table_entities"
) as mock_search:
mock_search.return_value = [self.mock_table1]
result = list(
get_lineage_by_query(
metadata=self.mock_metadata,
service_names=["service1", "service2"],
database_name="db1",
schema_name="schema1",
query="CREATE TABLE target AS SELECT * FROM source",
dialect=Dialect.ANSI,
)
)
# Verify that lineage was attempted
self.assertIsInstance(result, list)
mock_search.assert_called()
def test_get_lineage_via_table_entity_single_service(self):
"""Test get_lineage_via_table_entity with single service (backward compatibility)"""
# Mock the lineage parser
with patch(
"metadata.ingestion.lineage.sql_lineage.LineageParser"
) as mock_parser:
mock_parser_instance = MagicMock()
mock_parser_instance.masked_query = "SELECT * FROM source"
mock_parser_instance.column_lineage = []
mock_parser_instance.source_tables = ["source"]
mock_parser_instance.query_parsing_success = True
mock_parser.return_value = mock_parser_instance
# Mock get_source_table_names to return empty
with patch(
"metadata.ingestion.lineage.sql_lineage.get_source_table_names"
) as mock_source:
mock_source.return_value = []
result = list(
get_lineage_via_table_entity(
metadata=self.mock_metadata,
table_entity=self.mock_table1,
service_names="service1",
database_name="db1",
schema_name="schema1",
query="SELECT * FROM source",
dialect=Dialect.ANSI,
)
)
# Verify the method executes without errors
self.assertIsInstance(result, list)
def test_get_lineage_via_table_entity_multiple_services(self):
"""Test get_lineage_via_table_entity with multiple services (cross-database)"""
# Mock the lineage parser
with patch(
"metadata.ingestion.lineage.sql_lineage.LineageParser"
) as mock_parser:
mock_parser_instance = MagicMock()
mock_parser_instance.masked_query = "SELECT * FROM source"
mock_parser_instance.column_lineage = []
mock_parser_instance.source_tables = ["source"]
mock_parser_instance.query_parsing_success = True
mock_parser.return_value = mock_parser_instance
# Mock get_source_table_names to return a source table
with patch(
"metadata.ingestion.lineage.sql_lineage.get_source_table_names"
) as mock_source:
mock_source.return_value = [("", "source_table")]
# Mock search_table_entities to return a table from second service
with patch(
"metadata.ingestion.lineage.sql_lineage.search_table_entities"
) as mock_search:
mock_search.return_value = [self.mock_table2]
result = list(
get_lineage_via_table_entity(
metadata=self.mock_metadata,
table_entity=self.mock_table1,
service_names=["service1", "service2"],
database_name="db1",
schema_name="schema1",
query="SELECT * FROM source",
dialect=Dialect.ANSI,
)
)
# Verify that lineage was attempted with multiple services
self.assertIsInstance(result, list)
# Verify that search_table_entities was called (the exact parameters may vary)
mock_search.assert_called()
def test_get_view_lineage_single_service(self):
"""Test get_view_lineage with single service (backward compatibility)"""
# Create a mock TableView
mock_view = MagicMock()
mock_view.table_name = "test_view"
mock_view.schema_name = "schema1"
mock_view.db_name = "db1"
mock_view.view_definition = (
"CREATE VIEW test_view AS SELECT * FROM source_table"
)
# Mock the metadata methods
self.mock_metadata.get_by_name.return_value = self.mock_table1
# Mock fqn.build to return a valid FQN
with patch("metadata.utils.db_utils.fqn.build") as mock_fqn_build:
mock_fqn_build.return_value = "service1.db1.schema1.test_view"
# Mock the lineage parser
with patch("metadata.utils.db_utils.LineageParser") as mock_parser:
mock_parser_instance = MagicMock()
mock_parser_instance.masked_query = (
"CREATE VIEW test_view AS SELECT * FROM source_table"
)
mock_parser_instance.column_lineage = []
mock_parser_instance.source_tables = ["source_table"]
mock_parser_instance.target_tables = ["test_view"]
mock_parser_instance.query_parsing_success = True
mock_parser.return_value = mock_parser_instance
# Mock get_source_table_names to return empty (from sql_lineage module)
with patch(
"metadata.ingestion.lineage.sql_lineage.get_source_table_names"
) as mock_source:
mock_source.return_value = []
result = list(
get_view_lineage(
view=mock_view,
metadata=self.mock_metadata,
service_names="service1",
connection_type="snowflake",
)
)
# Verify the method executes without errors
self.assertIsInstance(result, list)
def test_get_view_lineage_multiple_services(self):
"""Test get_view_lineage with multiple services (cross-database view lineage)"""
# Create a mock TableView
mock_view = MagicMock()
mock_view.table_name = "test_view"
mock_view.schema_name = "schema1"
mock_view.db_name = "db1"
mock_view.view_definition = (
"CREATE VIEW test_view AS SELECT * FROM source_table"
)
# Mock the metadata methods
self.mock_metadata.get_by_name.return_value = self.mock_table1
# Mock fqn.build to return a valid FQN
with patch("metadata.utils.db_utils.fqn.build") as mock_fqn_build:
mock_fqn_build.return_value = "service1.db1.schema1.test_view"
# Mock the lineage parser
with patch("metadata.utils.db_utils.LineageParser") as mock_parser:
mock_parser_instance = MagicMock()
mock_parser_instance.masked_query = (
"CREATE VIEW test_view AS SELECT * FROM source_table"
)
mock_parser_instance.column_lineage = []
mock_parser_instance.source_tables = ["source_table"]
mock_parser_instance.target_tables = ["test_view"]
mock_parser_instance.query_parsing_success = True
mock_parser.return_value = mock_parser_instance
# Mock get_source_table_names to return a source table (from sql_lineage module)
with patch(
"metadata.ingestion.lineage.sql_lineage.get_source_table_names"
) as mock_source:
mock_source.return_value = [("", "source_table")]
# Mock search_table_entities to return a table from second service
with patch(
"metadata.ingestion.lineage.sql_lineage.search_table_entities"
) as mock_search:
mock_search.return_value = [self.mock_table2]
result = list(
get_view_lineage(
view=mock_view,
metadata=self.mock_metadata,
service_names=["service1", "service2"],
connection_type="snowflake",
)
)
# Verify that view lineage was attempted with multiple services
self.assertIsInstance(result, list)
mock_search.assert_called()
def test_get_view_lineage_with_postgres_schema_fallback(self):
"""Test get_view_lineage with Postgres schema fallback"""
# Create a mock TableView for Postgres
mock_view = MagicMock()
mock_view.table_name = "test_view"
mock_view.schema_name = None # No schema specified
mock_view.db_name = "db1"
mock_view.view_definition = (
"CREATE VIEW test_view AS SELECT * FROM source_table"
)
# Mock the metadata methods
self.mock_metadata.get_by_name.return_value = self.mock_table1
# Mock fqn.build to return a valid FQN
with patch("metadata.utils.db_utils.fqn.build") as mock_fqn_build:
mock_fqn_build.return_value = "service1.db1.public.test_view"
# Mock the lineage parser
with patch("metadata.utils.db_utils.LineageParser") as mock_parser:
mock_parser_instance = MagicMock()
mock_parser_instance.masked_query = (
"CREATE VIEW test_view AS SELECT * FROM source_table"
)
mock_parser_instance.column_lineage = []
mock_parser_instance.source_tables = ["source_table"]
mock_parser_instance.target_tables = ["test_view"]
mock_parser_instance.query_parsing_success = True
mock_parser.return_value = mock_parser_instance
# Mock get_source_table_names to return a source table (from sql_lineage module)
with patch(
"metadata.ingestion.lineage.sql_lineage.get_source_table_names"
) as mock_source:
mock_source.return_value = [("", "source_table")]
# Mock search_table_entities to return a table
with patch(
"metadata.ingestion.lineage.sql_lineage.search_table_entities"
) as mock_search:
mock_search.return_value = [self.mock_table1]
result = list(
get_view_lineage(
view=mock_view,
metadata=self.mock_metadata,
service_names=["service1", "service2"],
connection_type="postgres",
)
)
# Verify that view lineage was attempted with schema fallback
self.assertIsInstance(result, list)
# Should be called with schema_fallback=True for Postgres
mock_search.assert_called()
def test_stored_procedure_lineage_cross_database(self):
"""Test stored procedure lineage with cross-database support"""
# Create a mock stored procedure
mock_procedure = StoredProcedure(
id=uuid.uuid4(),
name="test_procedure",
fullyQualifiedName="service1.db1.schema1.test_procedure",
storedProcedureCode=StoredProcedureCode(
code="CREATE PROCEDURE test_procedure() BEGIN SELECT * FROM source; END",
language="SQL",
),
database=EntityReference(id=uuid.uuid4(), type="database"),
databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"),
service=EntityReference(id=uuid.uuid4(), type="databaseService"),
)
# Create a mock query by procedure
mock_query = QueryByProcedure(
procedure_name="test_procedure",
query_type="CREATE_TABLE_AS_SELECT",
query_database_name="db1",
query_schema_name="schema1",
procedure_text="CALL test_procedure()",
procedure_start_time=datetime.now(),
procedure_end_time=datetime.now(),
query_text="CREATE TABLE target AS SELECT * FROM source",
)
# Create a mock mixin class
class MockStoredProcedureMixin(StoredProcedureLineageMixin):
def __init__(self, mock_metadata):
self.metadata = mock_metadata
self.service_name = "service1"
self.source_config = DatabaseServiceQueryLineagePipeline(
processCrossDatabaseLineage=True,
crossDatabaseServiceNames=["service2"],
)
self.service_connection = MagicMock()
self.service_connection.type.value = "mysql"
self.stored_procedure_query_lineage = False
self.procedure_graph_map = {}
self.status = MagicMock()
def get_stored_procedure_queries_dict(self):
"""Mock implementation of abstract method"""
return {}
mixin = MockStoredProcedureMixin(self.mock_metadata)
# Mock the lineage parser and other dependencies
with patch(
"metadata.ingestion.lineage.sql_lineage.LineageParser"
) as mock_parser:
mock_parser_instance = MagicMock()
mock_parser_instance.masked_query = (
"CREATE TABLE target AS SELECT * FROM source"
)
mock_parser_instance.column_lineage = []
mock_parser_instance.intermediate_tables = []
mock_parser_instance.source_tables = ["source"]
mock_parser_instance.target_tables = ["target"]
mock_parser_instance.query_parsing_success = True
mock_parser.return_value = mock_parser_instance
# Mock get_source_table_names to return empty
with patch(
"metadata.ingestion.lineage.sql_lineage.get_source_table_names"
) as mock_source:
mock_source.return_value = []
# Test the _yield_procedure_lineage method
result = list(
mixin._yield_procedure_lineage(mock_query, mock_procedure)
)
# Verify that the method was called with the correct service names
# The actual lineage generation depends on the mocked dependencies
# but we can verify that the method executes without errors
self.assertIsInstance(result, list)

View File

@ -151,7 +151,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
@ -221,7 +221,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
@ -291,7 +291,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
@ -362,7 +362,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
@ -395,7 +395,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=table_view_no_definition,
metadata=self.metadata,
service_name=self.service_name,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
@ -430,7 +430,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
@ -500,7 +500,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=table_view,
metadata=metadata,
service_name="test_service",
service_names="test_service",
connection_type="mysql",
timeout_seconds=self.timeout_seconds,
)
@ -547,7 +547,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
@ -587,7 +587,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
@ -603,7 +603,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
service_names=self.service_name,
connection_type=123, # Invalid type
timeout_seconds=self.timeout_seconds,
)
@ -655,7 +655,7 @@ class TestDbUtils(TestCase):
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=custom_timeout,
)

View File

@ -59,6 +59,7 @@ class MockStoredProcedureSource(StoredProcedureLineageMixin):
self.source_config.schemaFilterPattern = None
self.source_config.storedProcedureFilterPattern = None
self.source_config.incrementalLineageProcessing = False
self.source_config.processCrossDatabaseLineage = False
self.source_config.parsingTimeoutLimit = 30
self.metadata = Mock()