From dc4c75ded47c07848e2b003204e244cf53a2a54e Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Tue, 12 Aug 2025 16:54:40 +0530 Subject: [PATCH] FIX #21180: Implement Cross Service Lineage (#22665) --- .../metadata/ingestion/lineage/sql_lineage.py | 144 ++-- .../ingestion/ometa/mixins/lineage_mixin.py | 2 +- .../ingestion/source/database/dbt/metadata.py | 2 +- .../source/database/lineage_source.py | 20 +- .../database/stored_procedures_mixin.py | 10 +- ingestion/src/metadata/utils/db_utils.py | 13 +- .../unit/test_cross_database_lineage_sql.py | 644 ++++++++++++++++++ ingestion/tests/unit/test_db_utils.py | 22 +- .../unit/test_lineage_for_stored_procedure.py | 1 + 9 files changed, 771 insertions(+), 87 deletions(-) create mode 100644 ingestion/tests/unit/test_cross_database_lineage_sql.py diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index 9d88367a119..c9975873a26 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -75,70 +75,71 @@ search_cache = LRUCache(LRU_CACHE_SIZE) def search_table_entities( metadata: OpenMetadata, - service_name: str, + service_names: Union[str, List[str]], database: Optional[str], database_schema: Optional[str], table: str, ) -> Optional[List[Table]]: """ Method to get table entity from database, database_schema & table name. - - It will try to search first in ES and doing an extra call to get Table entities - with the needed fields like columns for column lineage. - - If the ES result is empty, it will try by running - a request against the API to find the Entity. + Now supports searching across multiple services (cross-database lineage). Args: metadata: OMeta client - service_name: service name + service_names: service name or list of service names (current + cross db) database: database name database_schema: schema name table: table name Returns: - A list of Table entities, otherwise, None + A list of Table entities from the first service where found, otherwise None """ - search_tuple = (service_name, database, database_schema, table) - if search_tuple in search_cache: - return search_cache.get(search_tuple) - try: - table_entities: Optional[List[Table]] = [] - # search on ES first - fqn_search_string = build_es_fqn_search_string( - database, database_schema, service_name, table - ) - es_result_entities = metadata.es_search_from_fqn( - entity_type=Table, - fqn_search_string=fqn_search_string, - ) - if es_result_entities: - table_entities = es_result_entities - else: - # build FQNs and search with the API in case ES response is empty - table_fqns = fqn.build( - metadata, - entity_type=Table, - service_name=service_name, - database_name=database, - schema_name=database_schema, - table_name=table, - fetch_multiple_entities=True, - skip_es_search=True, + if isinstance(service_names, str): + service_names = [service_names] + for service_name in service_names: + search_tuple = (service_name, database, database_schema, table) + if search_tuple in search_cache: + result = search_cache.get(search_tuple) + if result: + return result + try: + table_entities: Optional[List[Table]] = [] + # search on ES first + fqn_search_string = build_es_fqn_search_string( + database, database_schema, service_name, table ) - for table_fqn in table_fqns or []: - table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn) - if table_entity: - table_entities.append(table_entity) - # added the search tuple to the cache - search_cache.put(search_tuple, table_entities) - return table_entities - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Error searching for table entities for service [{service_name}]: {exc}" - ) - return None + es_result_entities = metadata.es_search_from_fqn( + entity_type=Table, + fqn_search_string=fqn_search_string, + ) + if es_result_entities: + table_entities = es_result_entities + else: + # build FQNs and search with the API in case ES response is empty + table_fqns = fqn.build( + metadata, + entity_type=Table, + service_name=service_name, + database_name=database, + schema_name=database_schema, + table_name=table, + fetch_multiple_entities=True, + skip_es_search=True, + ) + for table_fqn in table_fqns or []: + table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn) + if table_entity: + table_entities.append(table_entity) + # added the search tuple to the cache + search_cache.put(search_tuple, table_entities) + if table_entities: + return table_entities + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Error searching for table entities for service [{service_name}]: {exc}" + ) + return None def get_table_fqn_from_query_name( @@ -360,7 +361,7 @@ def get_source_table_names( def get_table_entities_from_query( metadata: OpenMetadata, - service_name: str, + service_names: Union[str, List[str]], database_name: str, database_schema: str, table_name: str, @@ -371,8 +372,7 @@ def get_table_entities_from_query( If the sys data is incorrect, use the table name ingredients. - :param metadata: OpenMetadata client - :param service_name: Service being ingested. + :param service_names: Service(s) being ingested (current + cross db) :param database_name: Name of the database informed on db sys results :param database_schema: Name of the schema informed on db sys results :param table_name: Table name extracted from query. Can be `table`, `schema.table` or `db.schema.table` @@ -386,7 +386,7 @@ def get_table_entities_from_query( table_entities = search_table_entities( metadata=metadata, - service_name=service_name, + service_names=service_names, database=database_query if database_query else database_name, database_schema=schema_query if schema_query else database_schema, table=table, @@ -536,7 +536,7 @@ def _create_lineage_by_table_name( try: from_table_entities = get_table_entities_from_query( metadata=metadata, - service_name=service_name, + service_names=service_name, database_name=database_name, database_schema=schema_name, table_name=from_table, @@ -545,7 +545,7 @@ def _create_lineage_by_table_name( to_table_entities = get_table_entities_from_query( metadata=metadata, - service_name=service_name, + service_names=service_name, database_name=database_name, database_schema=schema_name, table_name=to_table, @@ -640,7 +640,7 @@ def populate_column_lineage_map(raw_column_lineage): # pylint: disable=too-many-locals def get_lineage_by_query( metadata: OpenMetadata, - service_name: str, + service_names: Union[str, List[str]], database_name: Optional[str], schema_name: Optional[str], query: str, @@ -649,14 +649,23 @@ def get_lineage_by_query( lineage_source: LineageSource = LineageSource.QueryLineage, graph: DiGraph = None, schema_fallback: bool = False, + service_name: Optional[str] = None, # backward compatibility for python sdk ) -> Iterable[Either[AddLineageRequest]]: """ This method parses the query to get source, target and intermediate table names to create lineage, and returns True if target table is found to create lineage otherwise returns False. + + Now supports cross-database lineage by accepting a list of service names. """ column_lineage = {} query_parsing_failures = QueryParsingFailures() - + if service_name and isinstance(service_name, str): + service_names = [service_name] + logger.warning( + "Deprecated: service_name is deprecated, use service_names instead" + ) + if isinstance(service_names, str): + service_names = [service_names] try: lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds) masked_query = lineage_parser.masked_query @@ -673,7 +682,7 @@ def get_lineage_by_query( source_table=source_table, database_name=database_name, schema_name=schema_name, - service_name=service_name, + service_name=service_names, timeout_seconds=timeout_seconds, column_lineage=column_lineage, ): @@ -681,7 +690,7 @@ def get_lineage_by_query( metadata, from_table=str(from_table_name), to_table=str(intermediate_table), - service_name=service_name, + service_name=service_names, database_name=database_name, schema_name=schema_name, masked_query=masked_query, @@ -696,13 +705,14 @@ def get_lineage_by_query( metadata, from_table=str(intermediate_table), to_table=str(target_table), - service_name=service_name, + service_name=service_names, database_name=database_name, schema_name=schema_name, masked_query=masked_query, column_lineage_map=column_lineage, lineage_source=lineage_source, schema_fallback=schema_fallback, + graph=graph, ) if not lineage_parser.intermediate_tables: for target_table in lineage_parser.target_tables: @@ -713,7 +723,7 @@ def get_lineage_by_query( source_table=source_table, database_name=database_name, schema_name=schema_name, - service_name=service_name, + service_name=service_names, timeout_seconds=timeout_seconds, column_lineage=column_lineage, ): @@ -721,7 +731,7 @@ def get_lineage_by_query( metadata, from_table=str(from_table_name), to_table=str(target_table), - service_name=service_name, + service_name=service_names, database_name=database_name, schema_name=schema_name, masked_query=masked_query, @@ -742,7 +752,7 @@ def get_lineage_by_query( yield Either( left=StackTraceError( name="Lineage", - error=f"Ingesting lineage failed for service [{service_name}]: {exc}", + error=f"Ingesting lineage failed for service(s) [{service_names}]: {exc}", stackTrace=traceback.format_exc(), ) ) @@ -753,7 +763,7 @@ def get_lineage_via_table_entity( table_entity: Table, database_name: str, schema_name: str, - service_name: str, + service_names: Union[str, List[str]], query: str, dialect: Dialect, timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, @@ -765,6 +775,8 @@ def get_lineage_via_table_entity( column_lineage = {} query_parsing_failures = QueryParsingFailures() + if isinstance(service_names, str): + service_names = [service_names] try: lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds) masked_query = lineage_parser.masked_query @@ -780,7 +792,7 @@ def get_lineage_via_table_entity( source_table=from_table_name, database_name=database_name, schema_name=schema_name, - service_name=service_name, + service_name=service_names, timeout_seconds=timeout_seconds, column_lineage=column_lineage, ): @@ -788,7 +800,7 @@ def get_lineage_via_table_entity( metadata, from_table=str(source_table), to_table=f"{schema_name}.{to_table_name}", - service_name=service_name, + service_name=service_names, database_name=database_name, schema_name=schema_name, masked_query=masked_query, @@ -809,7 +821,7 @@ def get_lineage_via_table_entity( Either( left=StackTraceError( name="Lineage", - error=f"Failed to create view lineage for database [{database_name}] and table [{table_entity}]: {exc}", + error=f"Failed to create view lineage for database [{database_name}] and table [{table_entity}] with service(s) [{service_names}]: {exc}", stackTrace=traceback.format_exc(), ) ) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index b3b7c3bece2..f5063f637c2 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -383,7 +383,7 @@ class OMetaLineageMixin(Generic[T]): connection_type = database_service.serviceType.value add_lineage_request = get_lineage_by_query( metadata=self, - service_name=database_service.name.root, + service_names=database_service.name.root, dialect=ConnectionTypeDialectMapper.dialect_of(connection_type), query=sql, database_name=database_name, diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index baeb9dc40d5..a8b5030683c 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -924,7 +924,7 @@ class DbtSource(DbtServiceSource): lineages = get_lineage_by_query( self.metadata, query=query, - service_name=source_elements[0], + service_names=source_elements[0], database_name=source_elements[1], schema_name=source_elements[2], dialect=dialect, diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index f83b5961f84..05107f98573 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -262,10 +262,18 @@ class LineageSource(QueryParserSource, ABC): for table_query in table_queries or []: if not self._query_already_processed(table_query): + # Prepare service names for lineage processing + service_names = [table_query.serviceName] + if ( + self.source_config.processCrossDatabaseLineage + and self.source_config.crossDatabaseServiceNames + ): + service_names.extend(self.source_config.crossDatabaseServiceNames) + lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query( self.metadata, query=table_query.query, - service_name=table_query.serviceName, + service_names=service_names, database_name=table_query.databaseName, schema_name=table_query.databaseSchema, dialect=self.dialect, @@ -334,10 +342,18 @@ class LineageSource(QueryParserSource, ABC): "View Filtered Out", ) continue + # Prepare service names for view lineage processing + service_names = [self.config.serviceName] + if ( + self.source_config.processCrossDatabaseLineage + and self.source_config.crossDatabaseServiceNames + ): + service_names.extend(self.source_config.crossDatabaseServiceNames) + for lineage in get_view_lineage( view=view, metadata=self.metadata, - service_name=self.config.serviceName, + service_names=service_names, connection_type=self.service_connection.type.value, timeout_seconds=self.source_config.parsingTimeoutLimit, ): diff --git a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py index a99d62b2e3d..c369edbaa8b 100644 --- a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py +++ b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py @@ -207,10 +207,18 @@ class StoredProcedureLineageMixin(ABC): ).graph self.stored_procedure_query_lineage = True + # Prepare service names for lineage processing + service_names = [self.service_name] + if ( + self.source_config.processCrossDatabaseLineage + and self.source_config.crossDatabaseServiceNames + ): + service_names.extend(self.source_config.crossDatabaseServiceNames) + for either_lineage in get_lineage_by_query( self.metadata, query=query_by_procedure.query_text, - service_name=self.service_name, + service_names=service_names, database_name=query_by_procedure.query_database_name, schema_name=query_by_procedure.query_schema_name, dialect=ConnectionTypeDialectMapper.dialect_of( diff --git a/ingestion/src/metadata/utils/db_utils.py b/ingestion/src/metadata/utils/db_utils.py index 2569497f83a..b7d8ce9bc8e 100644 --- a/ingestion/src/metadata/utils/db_utils.py +++ b/ingestion/src/metadata/utils/db_utils.py @@ -14,7 +14,7 @@ Helpers module for db sources """ import traceback -from typing import Iterable +from typing import Iterable, List, Union from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Table @@ -50,13 +50,16 @@ def get_host_from_host_port(uri: str) -> str: def get_view_lineage( view: TableView, metadata: OpenMetadata, - service_name: str, + service_names: Union[str, List[str]], connection_type: str, timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, ) -> Iterable[Either[AddLineageRequest]]: """ Method to generate view lineage + Now supports cross-database lineage by accepting a list of service names. """ + if isinstance(service_names, str): + service_names = [service_names] table_name = view.table_name schema_name = view.schema_name db_name = view.db_name @@ -65,7 +68,7 @@ def get_view_lineage( table_fqn = fqn.build( metadata, entity_type=Table, - service_name=service_name, + service_name=service_names[0], # Use first service for table entity lookup database_name=db_name, schema_name=schema_name, table_name=table_name, @@ -95,7 +98,7 @@ def get_view_lineage( yield from get_lineage_by_query( metadata, query=view_definition, - service_name=service_name, + service_names=service_names, database_name=db_name, schema_name=schema_name, dialect=dialect, @@ -108,7 +111,7 @@ def get_view_lineage( yield from get_lineage_via_table_entity( metadata, table_entity=table_entity, - service_name=service_name, + service_names=service_names, database_name=db_name, schema_name=schema_name, query=view_definition, diff --git a/ingestion/tests/unit/test_cross_database_lineage_sql.py b/ingestion/tests/unit/test_cross_database_lineage_sql.py new file mode 100644 index 00000000000..c5da6e5aeb5 --- /dev/null +++ b/ingestion/tests/unit/test_cross_database_lineage_sql.py @@ -0,0 +1,644 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cross database lineage functionality in SQL lineage module +""" +import uuid +from datetime import datetime +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from metadata.generated.schema.entity.data.storedProcedure import ( + StoredProcedure, + StoredProcedureCode, +) +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import ( + DatabaseServiceQueryLineagePipeline, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.lineage.models import Dialect +from metadata.ingestion.lineage.sql_lineage import ( + get_lineage_by_query, + get_lineage_via_table_entity, + get_table_entities_from_query, + search_table_entities, +) +from metadata.ingestion.source.database.stored_procedures_mixin import ( + QueryByProcedure, + StoredProcedureLineageMixin, +) +from metadata.utils.db_utils import get_view_lineage + + +class CrossDatabaseLineageSQLTest(TestCase): + """ + Test cross database lineage functionality in SQL lineage module + """ + + def setUp(self): + """Set up test fixtures""" + self.mock_metadata = MagicMock() + + # Create mock tables + self.mock_table1 = Table( + id=uuid.uuid4(), + name="test_table", + fullyQualifiedName="service1.db1.schema1.test_table", + columns=[ + { + "name": "id", + "dataType": "NUMBER", + "fullyQualifiedName": "service1.db1.schema1.test_table.id", + }, + { + "name": "name", + "dataType": "VARCHAR", + "fullyQualifiedName": "service1.db1.schema1.test_table.name", + }, + ], + ) + + self.mock_table2 = Table( + id=uuid.uuid4(), + name="test_table", + fullyQualifiedName="service2.db2.schema2.test_table", + columns=[ + { + "name": "id", + "dataType": "NUMBER", + "fullyQualifiedName": "service2.db2.schema2.test_table.id", + }, + { + "name": "name", + "dataType": "VARCHAR", + "fullyQualifiedName": "service2.db2.schema2.test_table.name", + }, + ], + ) + + def test_search_table_entities_single_service(self): + """Test search_table_entities with single service (backward compatibility)""" + # Mock the metadata methods + self.mock_metadata.es_search_from_fqn.return_value = [self.mock_table1] + + # Test with single service name (string) + result = search_table_entities( + metadata=self.mock_metadata, + service_names="service1", + database="db1", + database_schema="schema1", + table="test_table", + ) + + # Verify the result + self.assertEqual(result, [self.mock_table1]) + + # Verify the method was called correctly + self.mock_metadata.es_search_from_fqn.assert_called() + + def test_search_table_entities_multiple_services(self): + """Test search_table_entities with multiple services (cross-database)""" + # Mock the metadata methods - first service returns None for both ES and API, second returns table + self.mock_metadata.es_search_from_fqn.side_effect = [None, [self.mock_table2]] + + # Mock fqn.build to return empty list for first service, list with FQN for second service + with patch( + "metadata.ingestion.lineage.sql_lineage.fqn.build" + ) as mock_fqn_build: + mock_fqn_build.side_effect = [[], ["service2.db2.schema2.test_table"]] + + # Mock metadata.get_by_name to return the table for second service + self.mock_metadata.get_by_name.return_value = self.mock_table2 + + # Test with multiple service names + result = search_table_entities( + metadata=self.mock_metadata, + service_names=["service1", "service2"], + database="db2", + database_schema="schema2", + table="test_table", + ) + + # Verify the result - should return table from second service + self.assertEqual(result, [self.mock_table2]) + + # Verify the method was called for both services + self.assertEqual(self.mock_metadata.es_search_from_fqn.call_count, 2) + + def test_search_table_entities_no_results(self): + """Test search_table_entities when no tables are found in any service""" + # Mock the metadata methods to return None for all services + self.mock_metadata.es_search_from_fqn.return_value = None + + # Mock fqn.build to return empty list + with patch( + "metadata.ingestion.lineage.sql_lineage.fqn.build" + ) as mock_fqn_build: + mock_fqn_build.return_value = [] + + # Test with multiple service names + result = search_table_entities( + metadata=self.mock_metadata, + service_names=["service1", "service2"], + database="db1", + database_schema="schema1", + table="nonexistent_table", + ) + + # Verify the result is None + self.assertIsNone(result) + + def test_get_table_entities_from_query_single_service(self): + """Test get_table_entities_from_query with single service (backward compatibility)""" + # Mock search_table_entities to return a table + with patch( + "metadata.ingestion.lineage.sql_lineage.search_table_entities" + ) as mock_search: + mock_search.return_value = [self.mock_table1] + + result = get_table_entities_from_query( + metadata=self.mock_metadata, + service_names="service1", + database_name="db1", + database_schema="schema1", + table_name="test_table", + ) + + # Verify the result + self.assertEqual(result, [self.mock_table1]) + + # Verify search_table_entities was called correctly + mock_search.assert_called_with( + metadata=self.mock_metadata, + service_names="service1", + database="db1", + database_schema="schema1", + table="test_table", + ) + + def test_get_table_entities_from_query_multiple_services(self): + """Test get_table_entities_from_query with multiple services (cross-database)""" + # Mock search_table_entities to return a table from second service + with patch( + "metadata.ingestion.lineage.sql_lineage.search_table_entities" + ) as mock_search: + mock_search.return_value = [self.mock_table2] + + result = get_table_entities_from_query( + metadata=self.mock_metadata, + service_names=["service1", "service2"], + database_name="db2", + database_schema="schema2", + table_name="test_table", + ) + + # Verify the result + self.assertEqual(result, [self.mock_table2]) + + # Verify search_table_entities was called correctly + mock_search.assert_called_with( + metadata=self.mock_metadata, + service_names=["service1", "service2"], + database="db2", + database_schema="schema2", + table="test_table", + ) + + def test_get_lineage_by_query_single_service(self): + """Test get_lineage_by_query with single service (backward compatibility)""" + # Mock the lineage parser and other dependencies + with patch( + "metadata.ingestion.lineage.sql_lineage.LineageParser" + ) as mock_parser: + mock_parser_instance = MagicMock() + mock_parser_instance.masked_query = "SELECT * FROM test" + mock_parser_instance.column_lineage = [] + mock_parser_instance.intermediate_tables = [] + mock_parser_instance.source_tables = [] + mock_parser_instance.target_tables = [] + mock_parser_instance.query_parsing_success = True + mock_parser.return_value = mock_parser_instance + + # Mock get_source_table_names to return empty + with patch( + "metadata.ingestion.lineage.sql_lineage.get_source_table_names" + ) as mock_source: + mock_source.return_value = [] + + result = list( + get_lineage_by_query( + metadata=self.mock_metadata, + service_names="service1", + database_name="db1", + schema_name="schema1", + query="SELECT * FROM test", + dialect=Dialect.ANSI, + ) + ) + + # Verify no lineage is generated (empty source tables) + self.assertEqual(len(result), 0) + + def test_get_lineage_by_query_multiple_services(self): + """Test get_lineage_by_query with multiple services (cross-database)""" + # Mock the lineage parser and other dependencies + with patch( + "metadata.ingestion.lineage.sql_lineage.LineageParser" + ) as mock_parser: + mock_parser_instance = MagicMock() + mock_parser_instance.masked_query = "SELECT * FROM test" + mock_parser_instance.column_lineage = [] + mock_parser_instance.intermediate_tables = [] + mock_parser_instance.source_tables = [] + mock_parser_instance.target_tables = [] + mock_parser_instance.query_parsing_success = True + mock_parser.return_value = mock_parser_instance + + # Mock get_source_table_names to return empty + with patch( + "metadata.ingestion.lineage.sql_lineage.get_source_table_names" + ) as mock_source: + mock_source.return_value = [] + + result = list( + get_lineage_by_query( + metadata=self.mock_metadata, + service_names=["service1", "service2"], + database_name="db1", + schema_name="schema1", + query="SELECT * FROM test", + dialect=Dialect.ANSI, + ) + ) + + # Verify no lineage is generated (empty source tables) + self.assertEqual(len(result), 0) + + def test_get_lineage_by_query_with_source_tables(self): + """Test get_lineage_by_query with actual source tables (query lineage)""" + # Mock the lineage parser with source and target tables + with patch( + "metadata.ingestion.lineage.sql_lineage.LineageParser" + ) as mock_parser: + mock_parser_instance = MagicMock() + mock_parser_instance.masked_query = ( + "CREATE TABLE target AS SELECT * FROM source" + ) + mock_parser_instance.column_lineage = [] + mock_parser_instance.intermediate_tables = [] + mock_parser_instance.source_tables = ["source"] + mock_parser_instance.target_tables = ["target"] + mock_parser_instance.query_parsing_success = True + mock_parser.return_value = mock_parser_instance + + # Mock get_source_table_names to return a source table + with patch( + "metadata.ingestion.lineage.sql_lineage.get_source_table_names" + ) as mock_source: + mock_source.return_value = [("", "source_table")] + + # Mock search_table_entities to return a table + with patch( + "metadata.ingestion.lineage.sql_lineage.search_table_entities" + ) as mock_search: + mock_search.return_value = [self.mock_table1] + + result = list( + get_lineage_by_query( + metadata=self.mock_metadata, + service_names=["service1", "service2"], + database_name="db1", + schema_name="schema1", + query="CREATE TABLE target AS SELECT * FROM source", + dialect=Dialect.ANSI, + ) + ) + + # Verify that lineage was attempted + self.assertIsInstance(result, list) + mock_search.assert_called() + + def test_get_lineage_via_table_entity_single_service(self): + """Test get_lineage_via_table_entity with single service (backward compatibility)""" + # Mock the lineage parser + with patch( + "metadata.ingestion.lineage.sql_lineage.LineageParser" + ) as mock_parser: + mock_parser_instance = MagicMock() + mock_parser_instance.masked_query = "SELECT * FROM source" + mock_parser_instance.column_lineage = [] + mock_parser_instance.source_tables = ["source"] + mock_parser_instance.query_parsing_success = True + mock_parser.return_value = mock_parser_instance + + # Mock get_source_table_names to return empty + with patch( + "metadata.ingestion.lineage.sql_lineage.get_source_table_names" + ) as mock_source: + mock_source.return_value = [] + + result = list( + get_lineage_via_table_entity( + metadata=self.mock_metadata, + table_entity=self.mock_table1, + service_names="service1", + database_name="db1", + schema_name="schema1", + query="SELECT * FROM source", + dialect=Dialect.ANSI, + ) + ) + + # Verify the method executes without errors + self.assertIsInstance(result, list) + + def test_get_lineage_via_table_entity_multiple_services(self): + """Test get_lineage_via_table_entity with multiple services (cross-database)""" + # Mock the lineage parser + with patch( + "metadata.ingestion.lineage.sql_lineage.LineageParser" + ) as mock_parser: + mock_parser_instance = MagicMock() + mock_parser_instance.masked_query = "SELECT * FROM source" + mock_parser_instance.column_lineage = [] + mock_parser_instance.source_tables = ["source"] + mock_parser_instance.query_parsing_success = True + mock_parser.return_value = mock_parser_instance + + # Mock get_source_table_names to return a source table + with patch( + "metadata.ingestion.lineage.sql_lineage.get_source_table_names" + ) as mock_source: + mock_source.return_value = [("", "source_table")] + + # Mock search_table_entities to return a table from second service + with patch( + "metadata.ingestion.lineage.sql_lineage.search_table_entities" + ) as mock_search: + mock_search.return_value = [self.mock_table2] + + result = list( + get_lineage_via_table_entity( + metadata=self.mock_metadata, + table_entity=self.mock_table1, + service_names=["service1", "service2"], + database_name="db1", + schema_name="schema1", + query="SELECT * FROM source", + dialect=Dialect.ANSI, + ) + ) + + # Verify that lineage was attempted with multiple services + self.assertIsInstance(result, list) + # Verify that search_table_entities was called (the exact parameters may vary) + mock_search.assert_called() + + def test_get_view_lineage_single_service(self): + """Test get_view_lineage with single service (backward compatibility)""" + # Create a mock TableView + mock_view = MagicMock() + mock_view.table_name = "test_view" + mock_view.schema_name = "schema1" + mock_view.db_name = "db1" + mock_view.view_definition = ( + "CREATE VIEW test_view AS SELECT * FROM source_table" + ) + + # Mock the metadata methods + self.mock_metadata.get_by_name.return_value = self.mock_table1 + + # Mock fqn.build to return a valid FQN + with patch("metadata.utils.db_utils.fqn.build") as mock_fqn_build: + mock_fqn_build.return_value = "service1.db1.schema1.test_view" + + # Mock the lineage parser + with patch("metadata.utils.db_utils.LineageParser") as mock_parser: + mock_parser_instance = MagicMock() + mock_parser_instance.masked_query = ( + "CREATE VIEW test_view AS SELECT * FROM source_table" + ) + mock_parser_instance.column_lineage = [] + mock_parser_instance.source_tables = ["source_table"] + mock_parser_instance.target_tables = ["test_view"] + mock_parser_instance.query_parsing_success = True + mock_parser.return_value = mock_parser_instance + + # Mock get_source_table_names to return empty (from sql_lineage module) + with patch( + "metadata.ingestion.lineage.sql_lineage.get_source_table_names" + ) as mock_source: + mock_source.return_value = [] + + result = list( + get_view_lineage( + view=mock_view, + metadata=self.mock_metadata, + service_names="service1", + connection_type="snowflake", + ) + ) + + # Verify the method executes without errors + self.assertIsInstance(result, list) + + def test_get_view_lineage_multiple_services(self): + """Test get_view_lineage with multiple services (cross-database view lineage)""" + # Create a mock TableView + mock_view = MagicMock() + mock_view.table_name = "test_view" + mock_view.schema_name = "schema1" + mock_view.db_name = "db1" + mock_view.view_definition = ( + "CREATE VIEW test_view AS SELECT * FROM source_table" + ) + + # Mock the metadata methods + self.mock_metadata.get_by_name.return_value = self.mock_table1 + + # Mock fqn.build to return a valid FQN + with patch("metadata.utils.db_utils.fqn.build") as mock_fqn_build: + mock_fqn_build.return_value = "service1.db1.schema1.test_view" + + # Mock the lineage parser + with patch("metadata.utils.db_utils.LineageParser") as mock_parser: + mock_parser_instance = MagicMock() + mock_parser_instance.masked_query = ( + "CREATE VIEW test_view AS SELECT * FROM source_table" + ) + mock_parser_instance.column_lineage = [] + mock_parser_instance.source_tables = ["source_table"] + mock_parser_instance.target_tables = ["test_view"] + mock_parser_instance.query_parsing_success = True + mock_parser.return_value = mock_parser_instance + + # Mock get_source_table_names to return a source table (from sql_lineage module) + with patch( + "metadata.ingestion.lineage.sql_lineage.get_source_table_names" + ) as mock_source: + mock_source.return_value = [("", "source_table")] + + # Mock search_table_entities to return a table from second service + with patch( + "metadata.ingestion.lineage.sql_lineage.search_table_entities" + ) as mock_search: + mock_search.return_value = [self.mock_table2] + + result = list( + get_view_lineage( + view=mock_view, + metadata=self.mock_metadata, + service_names=["service1", "service2"], + connection_type="snowflake", + ) + ) + + # Verify that view lineage was attempted with multiple services + self.assertIsInstance(result, list) + mock_search.assert_called() + + def test_get_view_lineage_with_postgres_schema_fallback(self): + """Test get_view_lineage with Postgres schema fallback""" + # Create a mock TableView for Postgres + mock_view = MagicMock() + mock_view.table_name = "test_view" + mock_view.schema_name = None # No schema specified + mock_view.db_name = "db1" + mock_view.view_definition = ( + "CREATE VIEW test_view AS SELECT * FROM source_table" + ) + + # Mock the metadata methods + self.mock_metadata.get_by_name.return_value = self.mock_table1 + + # Mock fqn.build to return a valid FQN + with patch("metadata.utils.db_utils.fqn.build") as mock_fqn_build: + mock_fqn_build.return_value = "service1.db1.public.test_view" + + # Mock the lineage parser + with patch("metadata.utils.db_utils.LineageParser") as mock_parser: + mock_parser_instance = MagicMock() + mock_parser_instance.masked_query = ( + "CREATE VIEW test_view AS SELECT * FROM source_table" + ) + mock_parser_instance.column_lineage = [] + mock_parser_instance.source_tables = ["source_table"] + mock_parser_instance.target_tables = ["test_view"] + mock_parser_instance.query_parsing_success = True + mock_parser.return_value = mock_parser_instance + + # Mock get_source_table_names to return a source table (from sql_lineage module) + with patch( + "metadata.ingestion.lineage.sql_lineage.get_source_table_names" + ) as mock_source: + mock_source.return_value = [("", "source_table")] + + # Mock search_table_entities to return a table + with patch( + "metadata.ingestion.lineage.sql_lineage.search_table_entities" + ) as mock_search: + mock_search.return_value = [self.mock_table1] + + result = list( + get_view_lineage( + view=mock_view, + metadata=self.mock_metadata, + service_names=["service1", "service2"], + connection_type="postgres", + ) + ) + + # Verify that view lineage was attempted with schema fallback + self.assertIsInstance(result, list) + # Should be called with schema_fallback=True for Postgres + mock_search.assert_called() + + def test_stored_procedure_lineage_cross_database(self): + """Test stored procedure lineage with cross-database support""" + # Create a mock stored procedure + mock_procedure = StoredProcedure( + id=uuid.uuid4(), + name="test_procedure", + fullyQualifiedName="service1.db1.schema1.test_procedure", + storedProcedureCode=StoredProcedureCode( + code="CREATE PROCEDURE test_procedure() BEGIN SELECT * FROM source; END", + language="SQL", + ), + database=EntityReference(id=uuid.uuid4(), type="database"), + databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"), + service=EntityReference(id=uuid.uuid4(), type="databaseService"), + ) + + # Create a mock query by procedure + mock_query = QueryByProcedure( + procedure_name="test_procedure", + query_type="CREATE_TABLE_AS_SELECT", + query_database_name="db1", + query_schema_name="schema1", + procedure_text="CALL test_procedure()", + procedure_start_time=datetime.now(), + procedure_end_time=datetime.now(), + query_text="CREATE TABLE target AS SELECT * FROM source", + ) + + # Create a mock mixin class + class MockStoredProcedureMixin(StoredProcedureLineageMixin): + def __init__(self, mock_metadata): + self.metadata = mock_metadata + self.service_name = "service1" + self.source_config = DatabaseServiceQueryLineagePipeline( + processCrossDatabaseLineage=True, + crossDatabaseServiceNames=["service2"], + ) + self.service_connection = MagicMock() + self.service_connection.type.value = "mysql" + self.stored_procedure_query_lineage = False + self.procedure_graph_map = {} + self.status = MagicMock() + + def get_stored_procedure_queries_dict(self): + """Mock implementation of abstract method""" + return {} + + mixin = MockStoredProcedureMixin(self.mock_metadata) + + # Mock the lineage parser and other dependencies + with patch( + "metadata.ingestion.lineage.sql_lineage.LineageParser" + ) as mock_parser: + mock_parser_instance = MagicMock() + mock_parser_instance.masked_query = ( + "CREATE TABLE target AS SELECT * FROM source" + ) + mock_parser_instance.column_lineage = [] + mock_parser_instance.intermediate_tables = [] + mock_parser_instance.source_tables = ["source"] + mock_parser_instance.target_tables = ["target"] + mock_parser_instance.query_parsing_success = True + mock_parser.return_value = mock_parser_instance + + # Mock get_source_table_names to return empty + with patch( + "metadata.ingestion.lineage.sql_lineage.get_source_table_names" + ) as mock_source: + mock_source.return_value = [] + + # Test the _yield_procedure_lineage method + result = list( + mixin._yield_procedure_lineage(mock_query, mock_procedure) + ) + + # Verify that the method was called with the correct service names + # The actual lineage generation depends on the mocked dependencies + # but we can verify that the method executes without errors + self.assertIsInstance(result, list) diff --git a/ingestion/tests/unit/test_db_utils.py b/ingestion/tests/unit/test_db_utils.py index f8a4105a2bb..0f4bc757ecc 100644 --- a/ingestion/tests/unit/test_db_utils.py +++ b/ingestion/tests/unit/test_db_utils.py @@ -151,7 +151,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=self.table_view, metadata=self.metadata, - service_name=self.service_name, + service_names=self.service_name, connection_type=self.connection_type, timeout_seconds=self.timeout_seconds, ) @@ -221,7 +221,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=self.table_view, metadata=self.metadata, - service_name=self.service_name, + service_names=self.service_name, connection_type=self.connection_type, timeout_seconds=self.timeout_seconds, ) @@ -291,7 +291,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=self.table_view, metadata=self.metadata, - service_name=self.service_name, + service_names=self.service_name, connection_type=self.connection_type, timeout_seconds=self.timeout_seconds, ) @@ -362,7 +362,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=self.table_view, metadata=self.metadata, - service_name=self.service_name, + service_names=self.service_name, connection_type=self.connection_type, timeout_seconds=self.timeout_seconds, ) @@ -395,7 +395,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=table_view_no_definition, metadata=self.metadata, - service_name=self.service_name, + service_names=self.service_name, connection_type=self.connection_type, timeout_seconds=self.timeout_seconds, ) @@ -430,7 +430,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=self.table_view, metadata=self.metadata, - service_name=self.service_name, + service_names=self.service_name, connection_type=self.connection_type, timeout_seconds=self.timeout_seconds, ) @@ -500,7 +500,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=table_view, metadata=metadata, - service_name="test_service", + service_names="test_service", connection_type="mysql", timeout_seconds=self.timeout_seconds, ) @@ -547,7 +547,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=self.table_view, metadata=self.metadata, - service_name=self.service_name, + service_names=self.service_name, connection_type=self.connection_type, timeout_seconds=self.timeout_seconds, ) @@ -587,7 +587,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=self.table_view, metadata=self.metadata, - service_name=self.service_name, + service_names=self.service_name, connection_type=self.connection_type, timeout_seconds=self.timeout_seconds, ) @@ -603,7 +603,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=self.table_view, metadata=self.metadata, - service_name=self.service_name, + service_names=self.service_name, connection_type=123, # Invalid type timeout_seconds=self.timeout_seconds, ) @@ -655,7 +655,7 @@ class TestDbUtils(TestCase): get_view_lineage( view=self.table_view, metadata=self.metadata, - service_name=self.service_name, + service_names=self.service_name, connection_type=self.connection_type, timeout_seconds=custom_timeout, ) diff --git a/ingestion/tests/unit/test_lineage_for_stored_procedure.py b/ingestion/tests/unit/test_lineage_for_stored_procedure.py index c20f8b8332a..9a7d9fe8b16 100644 --- a/ingestion/tests/unit/test_lineage_for_stored_procedure.py +++ b/ingestion/tests/unit/test_lineage_for_stored_procedure.py @@ -59,6 +59,7 @@ class MockStoredProcedureSource(StoredProcedureLineageMixin): self.source_config.schemaFilterPattern = None self.source_config.storedProcedureFilterPattern = None self.source_config.incrementalLineageProcessing = False + self.source_config.processCrossDatabaseLineage = False self.source_config.parsingTimeoutLimit = 30 self.metadata = Mock()