mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-23 23:04:23 +00:00

* MINOR: Improve UDF Lineage Processing & Better Logging Time & MultiProcessing (#20848) * Fix multiprocessing with better memory management and Airflow 2+ compatibility * Add support for both multiprocessing and multithreading for relevant platforms * Handle conflicting cross-db lineage changes of service_name parameter change * Handle stored proc queries without caching all and increase the thread timeout times to cover 100% lineage * Fix `get_table_query` inheritance and pylint * Remove mocks from db_utils tests * Better db_utils test and fix the service_names parameter in case of schema_fallback --------- Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>
669 lines
26 KiB
Python
669 lines
26 KiB
Python
# Copyright 2025 Collate
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Unit tests for db_utils module
|
|
"""
|
|
import uuid
|
|
from copy import deepcopy
|
|
from unittest import TestCase
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
|
from metadata.generated.schema.entity.data.table import Table
|
|
from metadata.generated.schema.entity.services.databaseService import (
|
|
DatabaseServiceType,
|
|
)
|
|
from metadata.generated.schema.type.basic import (
|
|
EntityName,
|
|
FullyQualifiedEntityName,
|
|
Uuid,
|
|
)
|
|
from metadata.generated.schema.type.entityLineage import EntitiesEdge
|
|
from metadata.generated.schema.type.entityReference import EntityReference
|
|
from metadata.ingestion.api.models import Either
|
|
from metadata.ingestion.lineage.models import Dialect
|
|
from metadata.ingestion.lineage.sql_lineage import search_cache
|
|
from metadata.ingestion.source.models import TableView
|
|
from metadata.utils.db_utils import get_host_from_host_port, get_view_lineage
|
|
|
|
|
|
# Mock LineageTable class to simulate collate_sqllineage.core.models.Table
|
|
class MockLineageTable:
|
|
def __init__(self, name):
|
|
self.raw_name = name
|
|
self.schema = None
|
|
self.catalog = None
|
|
|
|
def __str__(self):
|
|
return self.raw_name
|
|
|
|
|
|
class TestDbUtils(TestCase):
|
|
"""
|
|
Test cases for db_utils module
|
|
"""
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures"""
|
|
# Clear the search cache to ensure test isolation
|
|
search_cache.clear()
|
|
|
|
self.metadata = MagicMock()
|
|
self.service_name = "test_service"
|
|
self.connection_type = "postgres"
|
|
self.timeout_seconds = 30
|
|
|
|
# Create a mock table entity
|
|
self.table_entity = Table(
|
|
id=Uuid(root=uuid.uuid4()),
|
|
name=EntityName(root="test_view"),
|
|
fullyQualifiedName=FullyQualifiedEntityName(
|
|
root="test_service.test_db.test_schema.test_view"
|
|
),
|
|
serviceType=DatabaseServiceType.Postgres,
|
|
columns=[], # Add required columns field
|
|
)
|
|
|
|
self.table_entity_non_postgres = deepcopy(self.table_entity)
|
|
self.table_entity_non_postgres.serviceType = DatabaseServiceType.Mysql
|
|
|
|
# Create a mock source table entity for lineage tests
|
|
self.source_table_entity = Table(
|
|
id=Uuid(root=uuid.uuid4()),
|
|
name=EntityName(root="source_table"),
|
|
fullyQualifiedName=FullyQualifiedEntityName(
|
|
root="test_service.test_db.test_schema.source_table"
|
|
),
|
|
serviceType=DatabaseServiceType.Postgres,
|
|
columns=[],
|
|
)
|
|
|
|
# Non-postgres version of source table
|
|
self.source_table_entity_non_postgres = deepcopy(self.source_table_entity)
|
|
self.source_table_entity_non_postgres.serviceType = DatabaseServiceType.Mysql
|
|
|
|
# Create a mock TableView
|
|
self.table_view = TableView(
|
|
table_name="test_view",
|
|
schema_name="test_schema",
|
|
db_name="test_db",
|
|
view_definition="create view test_view as SELECT * FROM source_table",
|
|
)
|
|
|
|
def tearDown(self):
|
|
"""Clean up after each test"""
|
|
# Reset any module-level state if needed
|
|
pass
|
|
|
|
def test_get_host_from_host_port(self):
|
|
"""Test get_host_from_host_port function"""
|
|
# Test with host:port format
|
|
self.assertEqual(get_host_from_host_port("localhost:9000"), "localhost")
|
|
self.assertEqual(get_host_from_host_port("127.0.0.1:5432"), "127.0.0.1")
|
|
self.assertEqual(get_host_from_host_port("example.com:8080"), "example.com")
|
|
|
|
# Test with host only (no port)
|
|
self.assertEqual(get_host_from_host_port("localhost"), "localhost")
|
|
self.assertEqual(get_host_from_host_port("example.com"), "example.com")
|
|
|
|
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
|
|
@patch("metadata.utils.db_utils.fqn")
|
|
def test_get_view_lineage_success_with_lineage_parser(
|
|
self, mock_fqn, mock_dialect_mapper
|
|
):
|
|
"""Test successful view lineage generation when lineage parser has source and target tables"""
|
|
# Setup mocks
|
|
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
|
|
self.metadata.get_by_name.return_value = self.table_entity
|
|
|
|
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
|
|
|
|
# Mock the search methods that get_lineage_by_query uses
|
|
# The es_search_from_fqn method will be called with a search string like "test_service.test_db.test_schema.source_table"
|
|
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
|
|
if "source_table" in fqn_search_string:
|
|
return [self.source_table_entity]
|
|
if "test_view" in fqn_search_string:
|
|
return [self.table_entity]
|
|
return []
|
|
|
|
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
|
|
|
|
# Also mock the get_by_name method that might be called
|
|
def mock_get_by_name(entity, fqn=None, **kwargs):
|
|
if "test_service.test_db.test_schema.test_view" in fqn:
|
|
return self.table_entity
|
|
return None
|
|
|
|
self.metadata.get_by_name = mock_get_by_name
|
|
|
|
# Execute function
|
|
result = list(
|
|
get_view_lineage(
|
|
view=self.table_view,
|
|
metadata=self.metadata,
|
|
service_names=self.service_name,
|
|
connection_type=self.connection_type,
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
)
|
|
|
|
# Assertions - check the actual lineage results
|
|
self.assertGreater(len(result), 0)
|
|
|
|
# Check that we have Either objects with Right values (successful results)
|
|
successful_results = [r.right for r in result if r.right]
|
|
self.assertGreater(len(successful_results), 0)
|
|
|
|
# Verify the lineage has correct source and target
|
|
for lineage_request in successful_results:
|
|
# Check the from and to entities exist
|
|
self.assertIsNotNone(lineage_request.edge.fromEntity)
|
|
self.assertIsNotNone(lineage_request.edge.toEntity)
|
|
|
|
# Check that the IDs match our expected entities
|
|
self.assertEqual(
|
|
lineage_request.edge.fromEntity.id.root,
|
|
self.source_table_entity.id.root,
|
|
)
|
|
self.assertEqual(
|
|
lineage_request.edge.toEntity.id.root, self.table_entity.id.root
|
|
)
|
|
|
|
# Verify mocks were called correctly
|
|
mock_fqn.build.assert_called_once()
|
|
mock_dialect_mapper.dialect_of.assert_called_once_with(self.connection_type)
|
|
|
|
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
|
|
@patch("metadata.utils.db_utils.fqn")
|
|
def test_get_view_lineage_success_with_fallback(
|
|
self, mock_fqn, mock_dialect_mapper
|
|
):
|
|
"""Test successful view lineage generation when lineage parser has source and target tables"""
|
|
# Setup mocks
|
|
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
|
|
self.metadata.get_by_name.return_value = self.table_entity
|
|
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
|
|
|
|
# Mock the search methods that get_lineage_by_query uses
|
|
# The es_search_from_fqn method will be called with a search string like "test_service.test_db.test_schema.source_table"
|
|
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
|
|
if "public.source_table" in fqn_search_string:
|
|
return []
|
|
if "source_table" in fqn_search_string:
|
|
return [self.source_table_entity]
|
|
if "test_view" in fqn_search_string:
|
|
return [self.table_entity]
|
|
return []
|
|
|
|
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
|
|
|
|
# Also mock the get_by_name method that might be called
|
|
def mock_get_by_name(entity, fqn=None, **kwargs):
|
|
if "test_service.test_db.test_schema.test_view" in fqn:
|
|
return self.table_entity
|
|
return None
|
|
|
|
self.metadata.get_by_name = mock_get_by_name
|
|
|
|
# Execute function
|
|
result = list(
|
|
get_view_lineage(
|
|
view=self.table_view,
|
|
metadata=self.metadata,
|
|
service_names=self.service_name,
|
|
connection_type=self.connection_type,
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
)
|
|
|
|
# Assertions - should get lineage results since we have source and target tables
|
|
self.assertGreater(len(result), 0)
|
|
|
|
# Check that we have Either objects with Right values (successful results)
|
|
successful_results = [r.right for r in result if r.right]
|
|
self.assertGreater(len(successful_results), 0)
|
|
|
|
# Verify the lineage has correct source and target
|
|
for lineage_request in successful_results:
|
|
# Check the from and to entities exist
|
|
self.assertIsNotNone(lineage_request.edge.fromEntity)
|
|
self.assertIsNotNone(lineage_request.edge.toEntity)
|
|
|
|
# Check that the IDs match our expected entities
|
|
self.assertEqual(
|
|
lineage_request.edge.fromEntity.id.root,
|
|
self.source_table_entity.id.root,
|
|
)
|
|
self.assertEqual(
|
|
lineage_request.edge.toEntity.id.root, self.table_entity.id.root
|
|
)
|
|
|
|
# Verify mocks were called correctly
|
|
mock_fqn.build.assert_called_once()
|
|
mock_dialect_mapper.dialect_of.assert_called_once_with(self.connection_type)
|
|
|
|
@patch("metadata.utils.db_utils.get_lineage_via_table_entity")
|
|
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
|
|
@patch("metadata.utils.db_utils.LineageParser")
|
|
@patch("metadata.utils.db_utils.fqn")
|
|
def test_get_view_lineage_success_with_table_entity(
|
|
self,
|
|
mock_fqn,
|
|
mock_lineage_parser_class,
|
|
mock_dialect_mapper,
|
|
mock_get_lineage_via_table_entity,
|
|
):
|
|
"""Test successful view lineage generation when lineage parser has no source/target tables"""
|
|
# Setup mocks
|
|
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
|
|
self.metadata.get_by_name.return_value = self.table_entity
|
|
|
|
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
|
|
|
|
mock_lineage_parser = MagicMock()
|
|
mock_lineage_parser.source_tables = [] # No source tables
|
|
mock_lineage_parser.target_tables = [] # No target tables
|
|
mock_lineage_parser_class.return_value = mock_lineage_parser
|
|
|
|
# Create a valid AddLineageRequest for the mock
|
|
|
|
valid_lineage_request = AddLineageRequest(
|
|
edge=EntitiesEdge(
|
|
fromEntity=EntityReference(
|
|
id=self.source_table_entity.id.root,
|
|
type="table",
|
|
),
|
|
toEntity=EntityReference(
|
|
id=self.table_entity.id.root,
|
|
type="table",
|
|
),
|
|
)
|
|
)
|
|
|
|
mock_get_lineage_via_table_entity.return_value = [
|
|
Either(right=valid_lineage_request)
|
|
]
|
|
|
|
# Execute function
|
|
result = list(
|
|
get_view_lineage(
|
|
view=self.table_view,
|
|
metadata=self.metadata,
|
|
service_names=self.service_name,
|
|
connection_type=self.connection_type,
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
)
|
|
|
|
# Assertions
|
|
self.assertEqual(len(result), 1)
|
|
self.assertIsInstance(result[0], Either)
|
|
self.assertIsNotNone(result[0].right)
|
|
|
|
# Verify mocks were called correctly
|
|
mock_fqn.build.assert_called_once()
|
|
self.metadata.get_by_name.assert_called_once()
|
|
mock_dialect_mapper.dialect_of.assert_called_once_with(self.connection_type)
|
|
mock_lineage_parser_class.assert_called_once()
|
|
mock_get_lineage_via_table_entity.assert_called_once()
|
|
|
|
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
|
|
@patch("metadata.utils.db_utils.fqn")
|
|
def test_get_view_lineage_postgres_schema_fallback(
|
|
self, mock_fqn, mock_dialect_mapper
|
|
):
|
|
"""Test that Postgres views use public schema fallback"""
|
|
# Setup mocks
|
|
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
|
|
|
|
# Reset metadata mocks to prevent interference from other tests
|
|
self.metadata.reset_mock()
|
|
self.metadata.get_by_name.return_value = self.table_entity
|
|
|
|
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
|
|
|
|
# Mock metadata search methods
|
|
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
|
|
if "source_table" in fqn_search_string:
|
|
return [self.source_table_entity]
|
|
if "test_view" in fqn_search_string:
|
|
return [self.table_entity]
|
|
return []
|
|
|
|
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
|
|
|
|
def mock_get_by_name(entity, fqn=None, **kwargs):
|
|
if "test_service.test_db.test_schema.test_view" in fqn:
|
|
return self.table_entity
|
|
return None
|
|
|
|
self.metadata.get_by_name = mock_get_by_name
|
|
|
|
# Execute function
|
|
result = list(
|
|
get_view_lineage(
|
|
view=self.table_view,
|
|
metadata=self.metadata,
|
|
service_names=self.service_name,
|
|
connection_type=self.connection_type,
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
)
|
|
|
|
# Test passes if we get lineage results back
|
|
# The actual lineage processing happens internally with LineageParser
|
|
self.assertGreater(len(result), 0)
|
|
|
|
# Check that we have Either objects with Right values (successful results)
|
|
successful_results = [r.right for r in result if r.right]
|
|
self.assertGreater(len(successful_results), 0)
|
|
|
|
# Verify the lineage has correct source and target
|
|
for lineage_request in successful_results:
|
|
# Check the from and to entities exist
|
|
self.assertIsNotNone(lineage_request.edge.fromEntity)
|
|
self.assertIsNotNone(lineage_request.edge.toEntity)
|
|
|
|
# Check that the IDs match our expected entities
|
|
self.assertEqual(
|
|
lineage_request.edge.fromEntity.id.root,
|
|
self.source_table_entity.id.root,
|
|
)
|
|
self.assertEqual(
|
|
lineage_request.edge.toEntity.id.root, self.table_entity.id.root
|
|
)
|
|
|
|
@patch("metadata.utils.db_utils.fqn")
|
|
def test_get_view_lineage_no_view_definition(self, mock_fqn):
|
|
"""Test handling when view definition is not available"""
|
|
# Setup mock
|
|
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
|
|
self.metadata.get_by_name.return_value = self.table_entity
|
|
|
|
# Create TableView without view definition
|
|
table_view_no_definition = TableView(
|
|
table_name="test_view",
|
|
schema_name="test_schema",
|
|
db_name="test_db",
|
|
view_definition=None,
|
|
)
|
|
|
|
# Execute function
|
|
result = list(
|
|
get_view_lineage(
|
|
view=table_view_no_definition,
|
|
metadata=self.metadata,
|
|
service_names=self.service_name,
|
|
connection_type=self.connection_type,
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
)
|
|
|
|
# Assertions
|
|
self.assertEqual(len(result), 0)
|
|
|
|
@patch("metadata.utils.db_utils.get_lineage_by_query")
|
|
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
|
|
@patch("metadata.utils.db_utils.LineageParser")
|
|
@patch("metadata.utils.db_utils.fqn")
|
|
def test_get_view_lineage_exception_handling(
|
|
self,
|
|
mock_fqn,
|
|
mock_lineage_parser_class,
|
|
mock_dialect_mapper,
|
|
mock_get_lineage_by_query,
|
|
):
|
|
"""Test exception handling during lineage generation"""
|
|
# Setup mocks
|
|
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
|
|
self.metadata.get_by_name.return_value = self.table_entity
|
|
|
|
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
|
|
|
|
# Make LineageParser raise an exception
|
|
mock_lineage_parser_class.side_effect = Exception("Test exception")
|
|
|
|
# Execute function
|
|
result = list(
|
|
get_view_lineage(
|
|
view=self.table_view,
|
|
metadata=self.metadata,
|
|
service_names=self.service_name,
|
|
connection_type=self.connection_type,
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
)
|
|
|
|
# Assertions
|
|
self.assertEqual(len(result), 0)
|
|
|
|
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
|
|
@patch("metadata.utils.db_utils.fqn")
|
|
def test_get_view_lineage_non_postgres_service(self, mock_fqn, mock_dialect_mapper):
|
|
"""Test view lineage for non-Postgres services"""
|
|
# Setup mocks
|
|
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
|
|
|
|
# Reset metadata mocks to prevent interference from other tests
|
|
metadata = MagicMock()
|
|
|
|
mock_dialect_mapper.dialect_of.return_value = Dialect.MYSQL
|
|
|
|
# Mock metadata search methods that get_lineage_by_query will call
|
|
table_view = TableView(
|
|
table_name="test_view",
|
|
schema_name="test_schema",
|
|
db_name="test_db",
|
|
view_definition="create view test_view as SELECT * FROM source_table",
|
|
)
|
|
|
|
# Mock the search methods that get_lineage_by_query uses
|
|
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
|
|
if "source_table" in fqn_search_string:
|
|
return [self.source_table_entity_non_postgres]
|
|
if "test_view" in fqn_search_string:
|
|
return [self.table_entity_non_postgres]
|
|
return []
|
|
|
|
metadata.es_search_from_fqn = mock_es_search_from_fqn
|
|
|
|
def mock_get_by_name(entity, fqn=None, **kwargs):
|
|
if "test_service.test_db.test_schema.test_view" in fqn:
|
|
return self.table_entity_non_postgres
|
|
return None
|
|
|
|
metadata.get_by_name = mock_get_by_name
|
|
|
|
# Execute function with mysql connection type
|
|
result = list(
|
|
get_view_lineage(
|
|
view=table_view,
|
|
metadata=metadata,
|
|
service_names="test_service",
|
|
connection_type="mysql",
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
)
|
|
|
|
# Assertions - should get lineage results since we have source and target tables
|
|
self.assertGreater(len(result), 0)
|
|
|
|
# Check that we have Either objects with Right values (successful results)
|
|
successful_results = [r.right for r in result if r.right]
|
|
self.assertGreater(len(successful_results), 0)
|
|
|
|
# Verify the lineage has correct source and target
|
|
for lineage_request in successful_results:
|
|
# Check the from and to entities exist
|
|
self.assertIsNotNone(lineage_request.edge.fromEntity)
|
|
self.assertIsNotNone(lineage_request.edge.toEntity)
|
|
# Check that the IDs match our expected entities
|
|
self.assertEqual(
|
|
lineage_request.edge.fromEntity.id.root,
|
|
self.source_table_entity_non_postgres.id.root,
|
|
)
|
|
self.assertEqual(
|
|
lineage_request.edge.toEntity.id.root,
|
|
self.table_entity_non_postgres.id.root,
|
|
)
|
|
|
|
@patch("metadata.utils.db_utils.get_lineage_by_query")
|
|
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
|
|
@patch("metadata.utils.db_utils.LineageParser")
|
|
@patch("metadata.utils.db_utils.fqn")
|
|
def test_get_view_lineage_empty_lineage_results(
|
|
self,
|
|
mock_fqn,
|
|
mock_lineage_parser_class,
|
|
mock_dialect_mapper,
|
|
mock_get_lineage_by_query,
|
|
):
|
|
"""Test handling when lineage functions return empty results"""
|
|
# Setup mocks
|
|
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
|
|
self.metadata.get_by_name.return_value = self.table_entity
|
|
|
|
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
|
|
|
|
mock_lineage_parser = MagicMock()
|
|
mock_lineage_parser.source_tables = ["source_table"]
|
|
mock_lineage_parser.target_tables = ["test_view"]
|
|
mock_lineage_parser_class.return_value = mock_lineage_parser
|
|
|
|
# Return empty results
|
|
mock_get_lineage_by_query.return_value = []
|
|
|
|
# Execute function
|
|
result = list(
|
|
get_view_lineage(
|
|
view=self.table_view,
|
|
metadata=self.metadata,
|
|
service_names=self.service_name,
|
|
connection_type=self.connection_type,
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
)
|
|
|
|
# Assertions
|
|
self.assertEqual(len(result), 0)
|
|
|
|
@patch("metadata.utils.db_utils.get_lineage_by_query")
|
|
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
|
|
@patch("metadata.utils.db_utils.LineageParser")
|
|
@patch("metadata.utils.db_utils.fqn")
|
|
def test_get_view_lineage_none_lineage_results(
|
|
self,
|
|
mock_fqn,
|
|
mock_lineage_parser_class,
|
|
mock_dialect_mapper,
|
|
mock_get_lineage_by_query,
|
|
):
|
|
"""Test handling when lineage functions return None"""
|
|
# Setup mocks
|
|
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
|
|
self.metadata.get_by_name.return_value = self.table_entity
|
|
|
|
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
|
|
|
|
mock_lineage_parser = MagicMock()
|
|
mock_lineage_parser.source_tables = ["source_table"]
|
|
mock_lineage_parser.target_tables = ["test_view"]
|
|
mock_lineage_parser_class.return_value = mock_lineage_parser
|
|
|
|
# Return None
|
|
mock_get_lineage_by_query.return_value = None
|
|
|
|
# Execute function
|
|
result = list(
|
|
get_view_lineage(
|
|
view=self.table_view,
|
|
metadata=self.metadata,
|
|
service_names=self.service_name,
|
|
connection_type=self.connection_type,
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
)
|
|
|
|
# Assertions
|
|
self.assertEqual(len(result), 0)
|
|
|
|
def test_get_view_lineage_invalid_connection_type(self):
|
|
"""Test handling of invalid connection type"""
|
|
# Execute function with invalid connection type
|
|
result = list(
|
|
get_view_lineage(
|
|
view=self.table_view,
|
|
metadata=self.metadata,
|
|
service_names=self.service_name,
|
|
connection_type=123, # Invalid type
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
)
|
|
|
|
# Should handle gracefully and return empty result
|
|
self.assertEqual(len(result), 0)
|
|
|
|
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
|
|
@patch("metadata.utils.db_utils.fqn")
|
|
def test_get_view_lineage_custom_timeout(self, mock_fqn, mock_dialect_mapper):
|
|
"""Test that custom timeout is passed correctly"""
|
|
# Setup mocks
|
|
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
|
|
self.metadata.get_by_name.return_value = self.table_entity
|
|
|
|
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
|
|
|
|
# Mock metadata search methods that get_lineage_by_query will call
|
|
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
|
|
if "source_table" in fqn_search_string:
|
|
return [self.source_table_entity]
|
|
return []
|
|
|
|
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
|
|
|
|
custom_timeout = 60
|
|
|
|
# Execute function with custom timeout
|
|
result = list(
|
|
get_view_lineage(
|
|
view=self.table_view,
|
|
metadata=self.metadata,
|
|
service_names=self.service_name,
|
|
connection_type=self.connection_type,
|
|
timeout_seconds=custom_timeout,
|
|
)
|
|
)
|
|
|
|
# Test passes if we get lineage results back
|
|
# Custom timeout is used internally by LineageParser
|
|
self.assertGreater(len(result), 0)
|
|
|
|
# Check that we have Either objects with Right values (successful results)
|
|
successful_results = [r.right for r in result if r.right]
|
|
self.assertGreater(len(successful_results), 0)
|
|
|
|
# Verify the lineage has correct source and target
|
|
for lineage_request in successful_results:
|
|
# Check the from and to entities exist
|
|
self.assertIsNotNone(lineage_request.edge.fromEntity)
|
|
self.assertIsNotNone(lineage_request.edge.toEntity)
|
|
# Check that the IDs match our expected entities
|
|
self.assertEqual(
|
|
lineage_request.edge.fromEntity.id.root,
|
|
self.source_table_entity.id.root,
|
|
)
|
|
self.assertEqual(
|
|
lineage_request.edge.toEntity.id.root,
|
|
self.table_entity.id.root,
|
|
)
|