MINOR: Postgres Implement schema fallback (#21858)

* MINOR: Postgres Implement schema fallback

* missing sql_lineage file
This commit is contained in:
Mayur Singal 2025-07-29 14:45:21 +05:30 committed by GitHub
parent fc50cce12a
commit cc9506db20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 694 additions and 0 deletions

View File

@ -364,6 +364,7 @@ def get_table_entities_from_query(
database_name: str, database_name: str,
database_schema: str, database_schema: str,
table_name: str, table_name: str,
schema_fallback: bool = False,
) -> Optional[List[Table]]: ) -> Optional[List[Table]]:
""" """
Fetch data from API and ES with a fallback strategy. Fetch data from API and ES with a fallback strategy.
@ -394,6 +395,17 @@ def get_table_entities_from_query(
if table_entities: if table_entities:
return table_entities return table_entities
if schema_fallback:
table_entities = search_table_entities(
metadata=metadata,
service_name=service_name,
database=database_query if database_query else database_name,
database_schema=None,
table=table,
)
if table_entities:
return table_entities
return None return None
@ -516,6 +528,7 @@ def _create_lineage_by_table_name(
lineage_source: LineageSource = LineageSource.QueryLineage, lineage_source: LineageSource = LineageSource.QueryLineage,
procedure: Optional[EntityReference] = None, procedure: Optional[EntityReference] = None,
graph: DiGraph = None, graph: DiGraph = None,
schema_fallback: bool = False,
) -> Iterable[Either[AddLineageRequest]]: ) -> Iterable[Either[AddLineageRequest]]:
""" """
This method is to create a lineage between two tables This method is to create a lineage between two tables
@ -527,6 +540,7 @@ def _create_lineage_by_table_name(
database_name=database_name, database_name=database_name,
database_schema=schema_name, database_schema=schema_name,
table_name=from_table, table_name=from_table,
schema_fallback=schema_fallback,
) )
to_table_entities = get_table_entities_from_query( to_table_entities = get_table_entities_from_query(
@ -535,6 +549,7 @@ def _create_lineage_by_table_name(
database_name=database_name, database_name=database_name,
database_schema=schema_name, database_schema=schema_name,
table_name=to_table, table_name=to_table,
schema_fallback=schema_fallback,
) )
for table_name, entity in ( for table_name, entity in (
@ -633,6 +648,7 @@ def get_lineage_by_query(
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
lineage_source: LineageSource = LineageSource.QueryLineage, lineage_source: LineageSource = LineageSource.QueryLineage,
graph: DiGraph = None, graph: DiGraph = None,
schema_fallback: bool = False,
) -> Iterable[Either[AddLineageRequest]]: ) -> Iterable[Either[AddLineageRequest]]:
""" """
This method parses the query to get source, target and intermediate table names to create lineage, This method parses the query to get source, target and intermediate table names to create lineage,
@ -673,6 +689,7 @@ def get_lineage_by_query(
lineage_source=lineage_source, lineage_source=lineage_source,
procedure=procedure, procedure=procedure,
graph=graph, graph=graph,
schema_fallback=schema_fallback,
) )
for target_table in lineage_parser.target_tables: for target_table in lineage_parser.target_tables:
yield from _create_lineage_by_table_name( yield from _create_lineage_by_table_name(
@ -685,6 +702,7 @@ def get_lineage_by_query(
masked_query=masked_query, masked_query=masked_query,
column_lineage_map=column_lineage, column_lineage_map=column_lineage,
lineage_source=lineage_source, lineage_source=lineage_source,
schema_fallback=schema_fallback,
) )
if not lineage_parser.intermediate_tables: if not lineage_parser.intermediate_tables:
for target_table in lineage_parser.target_tables: for target_table in lineage_parser.target_tables:
@ -711,6 +729,7 @@ def get_lineage_by_query(
lineage_source=lineage_source, lineage_source=lineage_source,
procedure=procedure, procedure=procedure,
graph=graph, graph=graph,
schema_fallback=schema_fallback,
) )
if not lineage_parser.query_parsing_success: if not lineage_parser.query_parsing_success:
query_parsing_failures.add( query_parsing_failures.add(
@ -740,6 +759,7 @@ def get_lineage_via_table_entity(
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
lineage_source: LineageSource = LineageSource.QueryLineage, lineage_source: LineageSource = LineageSource.QueryLineage,
graph: DiGraph = None, graph: DiGraph = None,
schema_fallback: bool = False,
) -> Iterable[Either[AddLineageRequest]]: ) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage from table entity""" """Get lineage from table entity"""
column_lineage = {} column_lineage = {}
@ -776,6 +796,7 @@ def get_lineage_via_table_entity(
lineage_source=lineage_source, lineage_source=lineage_source,
procedure=procedure, procedure=procedure,
graph=graph, graph=graph,
schema_fallback=schema_fallback,
) or [] ) or []
if not lineage_parser.query_parsing_success: if not lineage_parser.query_parsing_success:
query_parsing_failures.add( query_parsing_failures.add(

View File

@ -60,6 +60,7 @@ def get_view_lineage(
table_name = view.table_name table_name = view.table_name
schema_name = view.schema_name schema_name = view.schema_name
db_name = view.db_name db_name = view.db_name
schema_fallback = False
view_definition = view.view_definition view_definition = view.view_definition
table_fqn = fqn.build( table_fqn = fqn.build(
metadata, metadata,
@ -88,6 +89,7 @@ def get_view_lineage(
if table_entity.serviceType == DatabaseServiceType.Postgres: if table_entity.serviceType == DatabaseServiceType.Postgres:
# For Postgres, if schema is not defined, we need to use the public schema # For Postgres, if schema is not defined, we need to use the public schema
schema_name = PUBLIC_SCHEMA schema_name = PUBLIC_SCHEMA
schema_fallback = True
if lineage_parser.source_tables and lineage_parser.target_tables: if lineage_parser.source_tables and lineage_parser.target_tables:
yield from get_lineage_by_query( yield from get_lineage_by_query(
@ -99,6 +101,7 @@ def get_view_lineage(
dialect=dialect, dialect=dialect,
timeout_seconds=timeout_seconds, timeout_seconds=timeout_seconds,
lineage_source=LineageSource.ViewLineage, lineage_source=LineageSource.ViewLineage,
schema_fallback=schema_fallback,
) or [] ) or []
else: else:
@ -112,6 +115,7 @@ def get_view_lineage(
dialect=dialect, dialect=dialect,
timeout_seconds=timeout_seconds, timeout_seconds=timeout_seconds,
lineage_source=LineageSource.ViewLineage, lineage_source=LineageSource.ViewLineage,
schema_fallback=schema_fallback,
) or [] ) or []
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())

View File

@ -0,0 +1,669 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unit tests for db_utils module
"""
import uuid
from copy import deepcopy
from unittest import TestCase
from unittest.mock import MagicMock, patch
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
Uuid,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.ingestion.source.models import TableView
from metadata.utils.db_utils import get_host_from_host_port, get_view_lineage
# Mock LineageTable class to simulate collate_sqllineage.core.models.Table
class MockLineageTable:
def __init__(self, name):
self.raw_name = name
self.schema = None
self.catalog = None
def __str__(self):
return self.raw_name
class TestDbUtils(TestCase):
"""
Test cases for db_utils module
"""
def setUp(self):
"""Set up test fixtures"""
self.metadata = MagicMock()
self.service_name = "test_service"
self.connection_type = "postgres"
self.timeout_seconds = 30
# Create a mock table entity
self.table_entity = Table(
id=Uuid(root=uuid.uuid4()),
name=EntityName(root="test_view"),
fullyQualifiedName=FullyQualifiedEntityName(
root="test_service.test_db.test_schema.test_view"
),
serviceType=DatabaseServiceType.Postgres,
columns=[], # Add required columns field
)
self.table_entity_non_postgres = deepcopy(self.table_entity)
self.table_entity_non_postgres.serviceType = DatabaseServiceType.Mysql
# Create a mock TableView
self.table_view = TableView(
table_name="test_view",
schema_name="test_schema",
db_name="test_db",
view_definition="create view test_view as SELECT * FROM source_table",
)
def tearDown(self):
"""Clean up after each test"""
# Reset any module-level state if needed
pass
def test_get_host_from_host_port(self):
"""Test get_host_from_host_port function"""
# Test with host:port format
self.assertEqual(get_host_from_host_port("localhost:9000"), "localhost")
self.assertEqual(get_host_from_host_port("127.0.0.1:5432"), "127.0.0.1")
self.assertEqual(get_host_from_host_port("example.com:8080"), "example.com")
# Test with host only (no port)
self.assertEqual(get_host_from_host_port("localhost"), "localhost")
self.assertEqual(get_host_from_host_port("example.com"), "example.com")
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_success_with_lineage_parser(
self, mock_fqn, mock_lineage_parser_class, mock_dialect_mapper
):
"""Test successful view lineage generation when lineage parser has source and target tables"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = [MockLineageTable("source_table")]
mock_lineage_parser.target_tables = [MockLineageTable("test_view")]
mock_lineage_parser_class.return_value = mock_lineage_parser
# Mock metadata search methods that get_lineage_by_query will call
# Create a mock source table entity
source_table_entity = Table(
id=Uuid(root=uuid.uuid4()),
name=EntityName(root="source_table"),
fullyQualifiedName=FullyQualifiedEntityName(
root="test_service.test_db.test_schema.source_table"
),
serviceType=DatabaseServiceType.Postgres,
columns=[],
)
# Mock the search methods that get_lineage_by_query uses
# The es_search_from_fqn method will be called with a search string like "test_service.test_db.test_schema.source_table"
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
if "source_table" in fqn_search_string:
return [source_table_entity]
if "test_view" in fqn_search_string:
return [self.table_entity]
return []
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
# Also mock the get_by_name method that might be called
def mock_get_by_name(entity, fqn=None, **kwargs):
if "test_service.test_db.test_schema.test_view" in fqn:
return self.table_entity
return None
self.metadata.get_by_name = mock_get_by_name
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
)
# Assertions - should get lineage results since we have source and target tables
self.assertGreater(len(result), 0)
# Verify mocks were called correctly
mock_fqn.build.assert_called_once()
mock_dialect_mapper.dialect_of.assert_called_once_with(self.connection_type)
mock_lineage_parser_class.assert_called_once()
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_success_with_fallback(
self, mock_fqn, mock_lineage_parser_class, mock_dialect_mapper
):
"""Test successful view lineage generation when lineage parser has source and target tables"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
search_cache.clear()
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = [MockLineageTable("source_table")]
mock_lineage_parser.target_tables = [MockLineageTable("test_view")]
mock_lineage_parser_class.return_value = mock_lineage_parser
# Mock metadata search methods that get_lineage_by_query will call
# Create a mock source table entity
source_table_entity = Table(
id=Uuid(root=uuid.uuid4()),
name=EntityName(root="source_table"),
fullyQualifiedName=FullyQualifiedEntityName(
root="test_service.test_db.test_schema.source_table"
),
serviceType=DatabaseServiceType.Postgres,
columns=[],
)
# Mock the search methods that get_lineage_by_query uses
# The es_search_from_fqn method will be called with a search string like "test_service.test_db.test_schema.source_table"
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
if "public.source_table" in fqn_search_string:
return []
if "source_table" in fqn_search_string:
return [source_table_entity]
if "test_view" in fqn_search_string:
return [self.table_entity]
return []
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
# Also mock the get_by_name method that might be called
def mock_get_by_name(entity, fqn=None, **kwargs):
if "test_service.test_db.test_schema.test_view" in fqn:
return self.table_entity
return None
self.metadata.get_by_name = mock_get_by_name
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
)
# Assertions - should get lineage results since we have source and target tables
self.assertGreater(len(result), 0)
# Verify mocks were called correctly
mock_fqn.build.assert_called_once()
mock_dialect_mapper.dialect_of.assert_called_once_with(self.connection_type)
mock_lineage_parser_class.assert_called_once()
@patch("metadata.utils.db_utils.get_lineage_via_table_entity")
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_success_with_table_entity(
self,
mock_fqn,
mock_lineage_parser_class,
mock_dialect_mapper,
mock_get_lineage_via_table_entity,
):
"""Test successful view lineage generation when lineage parser has no source/target tables"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = [] # No source tables
mock_lineage_parser.target_tables = [] # No target tables
mock_lineage_parser_class.return_value = mock_lineage_parser
# Create a valid AddLineageRequest for the mock
source_table_entity = Table(
id=Uuid(root=uuid.uuid4()),
name=EntityName(root="source_table"),
fullyQualifiedName=FullyQualifiedEntityName(
root="test_service.test_db.test_schema.source_table"
),
serviceType=DatabaseServiceType.Postgres,
columns=[],
)
valid_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=source_table_entity.id.root,
type="table",
),
toEntity=EntityReference(
id=self.table_entity.id.root,
type="table",
),
)
)
mock_get_lineage_via_table_entity.return_value = [
Either(right=valid_lineage_request)
]
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
)
# Assertions
self.assertEqual(len(result), 1)
self.assertIsInstance(result[0], Either)
self.assertIsNotNone(result[0].right)
# Verify mocks were called correctly
mock_fqn.build.assert_called_once()
self.metadata.get_by_name.assert_called_once()
mock_dialect_mapper.dialect_of.assert_called_once_with(self.connection_type)
mock_lineage_parser_class.assert_called_once()
mock_get_lineage_via_table_entity.assert_called_once()
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_postgres_schema_fallback(
self, mock_fqn, mock_lineage_parser_class, mock_dialect_mapper
):
"""Test that Postgres views use public schema fallback"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
# Reset metadata mocks to prevent interference from other tests
self.metadata.reset_mock()
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = [MockLineageTable("source_table")]
mock_lineage_parser.target_tables = [MockLineageTable("test_view")]
mock_lineage_parser_class.return_value = mock_lineage_parser
# Mock metadata search methods
source_table_entity = Table(
id=Uuid(root=uuid.uuid4()),
name=EntityName(root="source_table"),
fullyQualifiedName=FullyQualifiedEntityName(
root="test_service.test_db.test_schema.source_table"
),
serviceType=DatabaseServiceType.Postgres,
columns=[],
)
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
if "source_table" in fqn_search_string:
return [source_table_entity]
if "test_view" in fqn_search_string:
return [self.table_entity]
return []
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
def mock_get_by_name(entity, fqn=None, **kwargs):
if "test_service.test_db.test_schema.test_view" in fqn:
return self.table_entity
return None
self.metadata.get_by_name = mock_get_by_name
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
)
# Verify that LineageParser was called with the correct parameters
mock_lineage_parser_class.assert_called_once_with(
self.table_view.view_definition,
Dialect.POSTGRES,
timeout_seconds=self.timeout_seconds,
)
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_no_view_definition(self, mock_fqn):
"""Test handling when view definition is not available"""
# Setup mock
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
# Create TableView without view definition
table_view_no_definition = TableView(
table_name="test_view",
schema_name="test_schema",
db_name="test_db",
view_definition=None,
)
# Execute function
result = list(
get_view_lineage(
view=table_view_no_definition,
metadata=self.metadata,
service_name=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
)
# Assertions
self.assertEqual(len(result), 0)
@patch("metadata.utils.db_utils.get_lineage_by_query")
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_exception_handling(
self,
mock_fqn,
mock_lineage_parser_class,
mock_dialect_mapper,
mock_get_lineage_by_query,
):
"""Test exception handling during lineage generation"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
# Make LineageParser raise an exception
mock_lineage_parser_class.side_effect = Exception("Test exception")
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
)
# Assertions
self.assertEqual(len(result), 0)
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_non_postgres_service(
self, mock_fqn, mock_lineage_parser_class, mock_dialect_mapper
):
"""Test view lineage for non-Postgres services"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
# Reset metadata mocks to prevent interference from other tests
metadata = MagicMock()
search_cache.clear()
mock_dialect_mapper.dialect_of.return_value = Dialect.MYSQL
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = [MockLineageTable("source_table")]
mock_lineage_parser.target_tables = [MockLineageTable("test_view")]
mock_lineage_parser_class.return_value = mock_lineage_parser
# Mock metadata search methods that get_lineage_by_query will call
# Create a mock source table entity
source_table_entity = Table(
id=Uuid(root=uuid.uuid4()),
name=EntityName(root="source_table"),
fullyQualifiedName=FullyQualifiedEntityName(
root="test_service.test_db.test_schema.source_table"
),
serviceType=DatabaseServiceType.Mysql,
columns=[],
)
table_view = TableView(
table_name="test_view",
schema_name="test_schema",
db_name="test_db",
view_definition="create view test_view as SELECT * FROM source_table",
)
# Mock the search methods that get_lineage_by_query uses
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
if "source_table" in fqn_search_string:
return [source_table_entity]
if "test_view" in fqn_search_string:
return [self.table_entity_non_postgres]
return []
metadata.es_search_from_fqn = mock_es_search_from_fqn
def mock_get_by_name(entity, fqn=None, **kwargs):
if "test_service.test_db.test_schema.test_view" in fqn:
return self.table_entity_non_postgres
return None
metadata.get_by_name = mock_get_by_name
# Execute function with mysql connection type
result = list(
get_view_lineage(
view=table_view,
metadata=metadata,
service_name="test_service",
connection_type="mysql",
timeout_seconds=self.timeout_seconds,
)
)
# Assertions - should get lineage results since we have source and target tables
self.assertGreater(len(result), 0)
# Verify that LineageParser was called with MySQL dialect
mock_lineage_parser_class.assert_called_once_with(
self.table_view.view_definition,
Dialect.MYSQL,
timeout_seconds=self.timeout_seconds,
)
@patch("metadata.utils.db_utils.get_lineage_by_query")
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_empty_lineage_results(
self,
mock_fqn,
mock_lineage_parser_class,
mock_dialect_mapper,
mock_get_lineage_by_query,
):
"""Test handling when lineage functions return empty results"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = ["source_table"]
mock_lineage_parser.target_tables = ["test_view"]
mock_lineage_parser_class.return_value = mock_lineage_parser
# Return empty results
mock_get_lineage_by_query.return_value = []
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
)
# Assertions
self.assertEqual(len(result), 0)
@patch("metadata.utils.db_utils.get_lineage_by_query")
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_none_lineage_results(
self,
mock_fqn,
mock_lineage_parser_class,
mock_dialect_mapper,
mock_get_lineage_by_query,
):
"""Test handling when lineage functions return None"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = ["source_table"]
mock_lineage_parser.target_tables = ["test_view"]
mock_lineage_parser_class.return_value = mock_lineage_parser
# Return None
mock_get_lineage_by_query.return_value = None
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
)
)
# Assertions
self.assertEqual(len(result), 0)
def test_get_view_lineage_invalid_connection_type(self):
"""Test handling of invalid connection type"""
# Execute function with invalid connection type
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
connection_type=123, # Invalid type
timeout_seconds=self.timeout_seconds,
)
)
# Should handle gracefully and return empty result
self.assertEqual(len(result), 0)
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_custom_timeout(
self, mock_fqn, mock_lineage_parser_class, mock_dialect_mapper
):
"""Test that custom timeout is passed correctly"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = [MockLineageTable("source_table")]
mock_lineage_parser.target_tables = [MockLineageTable("test_view")]
mock_lineage_parser_class.return_value = mock_lineage_parser
# Mock metadata search methods that get_lineage_by_query will call
source_table_entity = Table(
id=Uuid(root=uuid.uuid4()),
name=EntityName(root="source_table"),
fullyQualifiedName=FullyQualifiedEntityName(
root="test_service.test_db.test_schema.source_table"
),
serviceType=DatabaseServiceType.Postgres,
columns=[],
)
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
if "source_table" in fqn_search_string:
return [source_table_entity]
return []
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
custom_timeout = 60
# Execute function with custom timeout
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_name=self.service_name,
connection_type=self.connection_type,
timeout_seconds=custom_timeout,
)
)
# Verify that LineageParser was called with custom timeout
mock_lineage_parser_class.assert_called_once_with(
self.table_view.view_definition,
Dialect.POSTGRES,
timeout_seconds=custom_timeout,
)