diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/client.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/client.py index e96ceafab8f..c4d45825dae 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/client.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/client.py @@ -41,7 +41,7 @@ from metadata.ingestion.source.database.unitycatalog.models import ( from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get" +TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage" COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get" TABLES_PATH = "/unity-catalog/tables" diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index 36fca426a84..5b9caa22bff 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -15,6 +15,7 @@ import traceback from typing import Iterable, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.container import ContainerDataModel from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table @@ -90,6 +91,41 @@ class UnitycatalogLineageSource(Source): ) return cls(config, metadata) + def _get_data_model_column_fqn( + self, data_model_entity: ContainerDataModel, column: str + ) -> Optional[str]: + if not data_model_entity: + return None + for entity_column in data_model_entity.columns: + if entity_column.displayName.lower() == column.lower(): + return entity_column.fullyQualifiedName.root + return None + + def _get_container_column_lineage( + self, data_model_entity: ContainerDataModel, table_entity: Table + ) -> Optional[LineageDetails]: + try: + column_lineage = [] + for column in table_entity.columns: + from_column = self._get_data_model_column_fqn( + data_model_entity=data_model_entity, column=column.name.root + ) + to_column = column.fullyQualifiedName.root + if from_column and to_column: + column_lineage.append( + ColumnLineage(fromColumns=[from_column], toColumn=to_column) + ) + if column_lineage: + return LineageDetails( + columnsLineage=column_lineage, + source=LineageSource.ExternalTableLineage, + ) + return None + except Exception as exc: + logger.debug(f"Error computing container column lineage: {exc}") + logger.debug(traceback.format_exc()) + return None + def _get_lineage_details( self, from_table: Table, to_table: Table, databricks_table_fqn: str ) -> Optional[LineageDetails]: @@ -124,16 +160,87 @@ class UnitycatalogLineageSource(Source): logger.debug(traceback.format_exc()) return None + def _handle_external_location_lineage( + self, file_info, table: Table, is_upstream: bool + ) -> Iterable[Either[AddLineageRequest]]: + try: + if not file_info.storage_location: + logger.debug("No storage location found in fileInfo") + return + + location_entity = self.metadata.es_search_container_by_path( + full_path=file_info.storage_location, fields="dataModel" + ) + + if location_entity and location_entity[0]: + lineage_details = None + if location_entity[0].dataModel: + lineage_details = self._get_container_column_lineage( + location_entity[0].dataModel, table + ) + + if is_upstream: + yield Either( + left=None, + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=location_entity[0].id, + type="container", + ), + toEntity=EntityReference( + id=table.id, + type="table", + ), + lineageDetails=lineage_details, + ) + ), + ) + else: + yield Either( + left=None, + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=table.id, + type="table", + ), + toEntity=EntityReference( + id=location_entity[0].id, + type="container", + ), + lineageDetails=lineage_details, + ) + ), + ) + else: + logger.debug( + f"Unable to find container for external location: {file_info.storage_location}" + ) + except Exception as exc: + logger.debug( + f"Error while processing external location lineage for {file_info.storage_location}: {exc}" + ) + logger.debug(traceback.format_exc()) + def _handle_upstream_table( self, table_streams: LineageTableStreams, table: Table, databricks_table_fqn: str, ) -> Iterable[Either[AddLineageRequest]]: - for upstream_table in table_streams.upstream_tables: + for upstream_entity in table_streams.upstreams: try: - if not upstream_table.name: + if upstream_entity.fileInfo: + yield from self._handle_external_location_lineage( + upstream_entity.fileInfo, table, is_upstream=True + ) continue + + if not upstream_entity.tableInfo or not upstream_entity.tableInfo.name: + continue + + upstream_table = upstream_entity.tableInfo from_entity_fqn = fqn.build( metadata=self.metadata, entity_type=Table, @@ -172,9 +279,69 @@ class UnitycatalogLineageSource(Source): ) except Exception: logger.debug( - "Error while processing lineage for " - f"{upstream_table.catalog_name}.{upstream_table.schema_name}.{upstream_table.name}" - f" -> {databricks_table_fqn}" + "Error while processing upstream lineage for " + f"{databricks_table_fqn}" + ) + logger.debug(traceback.format_exc()) + + def _handle_downstream_table( + self, + table_streams: LineageTableStreams, + table: Table, + databricks_table_fqn: str, + ) -> Iterable[Either[AddLineageRequest]]: + for downstream_entity in table_streams.downstreams: + try: + if downstream_entity.fileInfo: + yield from self._handle_external_location_lineage( + downstream_entity.fileInfo, table, is_upstream=False + ) + continue + + if ( + not downstream_entity.tableInfo + or not downstream_entity.tableInfo.name + ): + continue + + downstream_table = downstream_entity.tableInfo + to_entity_fqn = fqn.build( + metadata=self.metadata, + entity_type=Table, + database_name=downstream_table.catalog_name, + schema_name=downstream_table.schema_name, + table_name=downstream_table.name, + service_name=self.config.serviceName, + ) + + to_entity = self.metadata.get_by_name(entity=Table, fqn=to_entity_fqn) + if to_entity: + downstream_table_fqn = f"{downstream_table.catalog_name}.{downstream_table.schema_name}.{downstream_table.name}" + lineage_details = self._get_lineage_details( + from_table=table, + to_table=to_entity, + databricks_table_fqn=downstream_table_fqn, + ) + yield Either( + left=None, + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=table.id, type="table"), + toEntity=EntityReference(id=to_entity.id, type="table"), + lineageDetails=lineage_details, + ) + ), + ) + else: + logger.debug( + f"Unable to find downstream entity for " + f"{databricks_table_fqn} -> " + f"{downstream_table.catalog_name}.{downstream_table.schema_name}.{downstream_table.name}" + ) + except Exception: + logger.debug( + "Error while processing downstream lineage for " + f"{databricks_table_fqn}" ) logger.debug(traceback.format_exc()) @@ -225,10 +392,17 @@ class UnitycatalogLineageSource(Source): table_streams: LineageTableStreams = self.client.get_table_lineage( databricks_table_fqn ) + + # Process upstream lineage yield from self._handle_upstream_table( table_streams, table, databricks_table_fqn ) + # Process downstream lineage + yield from self._handle_downstream_table( + table_streams, table, databricks_table_fqn + ) + def test_connection(self) -> None: test_connection_common( self.metadata, self.connection_obj, self.service_connection diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/models.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/models.py index 442923c71aa..53be90ac596 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/models.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/models.py @@ -22,6 +22,8 @@ class DatabricksTable(BaseModel): name: Optional[str] = None catalog_name: Optional[str] = None schema_name: Optional[str] = None + table_type: Optional[str] = None + lineage_timestamp: Optional[str] = None class DatabricksColumn(BaseModel): @@ -31,9 +33,23 @@ class DatabricksColumn(BaseModel): table_name: Optional[str] = None +class FileInfo(BaseModel): + path: Optional[str] = None + has_permission: Optional[bool] = None + securable_name: Optional[str] = None + storage_location: Optional[str] = None + securable_type: Optional[str] = None + lineage_timestamp: Optional[str] = None + + +class LineageEntity(BaseModel): + tableInfo: Optional[DatabricksTable] = None + fileInfo: Optional[FileInfo] = None + + class LineageTableStreams(BaseModel): - upstream_tables: Optional[List[DatabricksTable]] = [] - downstream_tables: Optional[List[DatabricksTable]] = [] + upstreams: Optional[List[LineageEntity]] = [] + downstreams: Optional[List[LineageEntity]] = [] class LineageColumnStreams(BaseModel): diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index 5d79e3e6e2a..b286ff2f59c 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -356,6 +356,7 @@ class S3Source(StorageServiceSource): if entry and entry.get("Key") and len(entry.get("Key").split("/")) > total_depth + and "/_delta_log/" not in entry.get("Key") } for key in candidate_keys: metadata_entry_copy = deepcopy(metadata_entry) @@ -469,7 +470,10 @@ class S3Source(StorageServiceSource): candidate_keys = [ entry["Key"] for entry in response - if entry and entry.get("Key") and not entry.get("Key").endswith("/") + if entry + and entry.get("Key") + and not entry.get("Key").endswith("/") + and "/_delta_log/" not in entry.get("Key") ] for key in candidate_keys: if self.is_valid_unstructured_file(metadata_entry.unstructuredFormats, key): @@ -678,7 +682,10 @@ class S3Source(StorageServiceSource): candidate_keys = [ entry["Key"] for entry in response[S3_CLIENT_ROOT_RESPONSE] - if entry and entry.get("Key") and not entry.get("Key").endswith("/") + if entry + and entry.get("Key") + and not entry.get("Key").endswith("/") + and "/_delta_log/" not in entry.get("Key") ] # pick a random key out of the candidates if any were returned if candidate_keys: diff --git a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py new file mode 100644 index 00000000000..940653fac19 --- /dev/null +++ b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py @@ -0,0 +1,513 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test Unity Catalog lineage functionality +""" + +from unittest import TestCase +from unittest.mock import Mock, patch +from uuid import uuid4 + +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.container import ( + Container, + ContainerDataModel, +) +from metadata.generated.schema.entity.data.table import ( + Column, + ColumnName, + DataType, + Table, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.models import Either +from metadata.ingestion.source.database.unitycatalog.lineage import ( + UnitycatalogLineageSource, +) +from metadata.ingestion.source.database.unitycatalog.models import ( + DatabricksTable, + FileInfo, + LineageEntity, + LineageTableStreams, +) + +mock_unitycatalog_lineage_config = { + "source": { + "type": "unitycatalog-lineage", + "serviceName": "local_unitycatalog", + "serviceConnection": { + "config": { + "type": "UnityCatalog", + "catalog": "demo-test-cat", + "databaseSchema": "test-schema", + "authType": {"token": "test_token"}, + "hostPort": "localhost:443", + "httpPath": "/sql/1.0/warehouses/test", + } + }, + "sourceConfig": {"config": {"type": "DatabaseLineage"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": {"jwtToken": "test_token"}, + } + }, +} + + +class TestUnityCatalogLineage(TestCase): + """ + Unity Catalog lineage unit tests + """ + + @patch( + "metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection" + ) + @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata") + def setUp(self, mock_metadata, mock_test_connection): + mock_test_connection.return_value = None + self.mock_metadata = mock_metadata + self.config = WorkflowSource.model_validate( + mock_unitycatalog_lineage_config["source"] + ) + self.lineage_source = UnitycatalogLineageSource(self.config, self.mock_metadata) + + def test_lineage_table_streams_with_table_info(self): + """Test LineageTableStreams model with tableInfo""" + upstream_table = DatabricksTable( + name="source_table", + catalog_name="demo-test-cat", + schema_name="test-schema", + table_type="TABLE", + lineage_timestamp="2025-10-08 11:16:26.0", + ) + + downstream_table = DatabricksTable( + name="target_table", + catalog_name="demo-test-cat", + schema_name="test-schema", + table_type="TABLE", + lineage_timestamp="2025-10-08 11:16:26.0", + ) + + lineage_streams = LineageTableStreams( + upstreams=[LineageEntity(tableInfo=upstream_table)], + downstreams=[LineageEntity(tableInfo=downstream_table)], + ) + + self.assertEqual(len(lineage_streams.upstreams), 1) + self.assertEqual(len(lineage_streams.downstreams), 1) + self.assertEqual(lineage_streams.upstreams[0].tableInfo.name, "source_table") + self.assertEqual(lineage_streams.downstreams[0].tableInfo.name, "target_table") + self.assertIsNone(lineage_streams.upstreams[0].fileInfo) + self.assertIsNone(lineage_streams.downstreams[0].fileInfo) + + def test_lineage_table_streams_with_file_info(self): + """Test LineageTableStreams model with fileInfo""" + file_info = FileInfo( + path="s3://bucket/path/file.parquet", + has_permission=True, + securable_name="test_location", + storage_location="s3://bucket/path", + securable_type="EXTERNAL_LOCATION", + lineage_timestamp="2025-10-08 10:37:42.0", + ) + + lineage_streams = LineageTableStreams( + upstreams=[LineageEntity(fileInfo=file_info)], downstreams=[] + ) + + self.assertEqual(len(lineage_streams.upstreams), 1) + self.assertEqual( + lineage_streams.upstreams[0].fileInfo.path, "s3://bucket/path/file.parquet" + ) + self.assertEqual( + lineage_streams.upstreams[0].fileInfo.storage_location, "s3://bucket/path" + ) + self.assertIsNone(lineage_streams.upstreams[0].tableInfo) + + def test_lineage_table_streams_mixed(self): + """Test LineageTableStreams with both tableInfo and fileInfo""" + table_info = DatabricksTable( + name="table1", + catalog_name="demo-test-cat", + schema_name="test-schema", + table_type="TABLE", + ) + + file_info = FileInfo( + path="s3://bucket/path/file.parquet", + storage_location="s3://bucket/path", + securable_type="EXTERNAL_LOCATION", + ) + + lineage_streams = LineageTableStreams( + upstreams=[ + LineageEntity(tableInfo=table_info), + LineageEntity(fileInfo=file_info), + ], + downstreams=[], + ) + + self.assertEqual(len(lineage_streams.upstreams), 2) + self.assertIsNotNone(lineage_streams.upstreams[0].tableInfo) + self.assertIsNone(lineage_streams.upstreams[0].fileInfo) + self.assertIsNone(lineage_streams.upstreams[1].tableInfo) + self.assertIsNotNone(lineage_streams.upstreams[1].fileInfo) + + @patch( + "metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection" + ) + @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata") + def test_get_data_model_column_fqn(self, mock_metadata, mock_test_connection): + """Test _get_data_model_column_fqn method""" + mock_test_connection.return_value = None + + data_model = ContainerDataModel( + columns=[ + Column( + name=ColumnName(root="id"), + displayName="id", + dataType=DataType.INT, + fullyQualifiedName=FullyQualifiedEntityName( + root="service.container.id" + ), + ), + Column( + name=ColumnName(root="name"), + displayName="name", + dataType=DataType.STRING, + fullyQualifiedName=FullyQualifiedEntityName( + root="service.container.name" + ), + ), + ] + ) + + lineage_source = UnitycatalogLineageSource(self.config, mock_metadata) + + result = lineage_source._get_data_model_column_fqn(data_model, "id") + self.assertEqual(result, "service.container.id") + + result = lineage_source._get_data_model_column_fqn(data_model, "name") + self.assertEqual(result, "service.container.name") + + result = lineage_source._get_data_model_column_fqn(data_model, "nonexistent") + self.assertIsNone(result) + + result = lineage_source._get_data_model_column_fqn(None, "id") + self.assertIsNone(result) + + @patch( + "metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection" + ) + @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata") + def test_get_container_column_lineage(self, mock_metadata, mock_test_connection): + """Test _get_container_column_lineage method""" + mock_test_connection.return_value = None + + data_model = ContainerDataModel( + columns=[ + Column( + name=ColumnName(root="id"), + displayName="id", + dataType=DataType.INT, + fullyQualifiedName=FullyQualifiedEntityName( + root="service.container.id" + ), + ), + Column( + name=ColumnName(root="name"), + displayName="name", + dataType=DataType.STRING, + fullyQualifiedName=FullyQualifiedEntityName( + root="service.container.name" + ), + ), + ] + ) + + table_entity = Table( + id=uuid4(), + name=EntityName(root="test_table"), + columns=[ + Column( + name=ColumnName(root="id"), + dataType=DataType.INT, + fullyQualifiedName=FullyQualifiedEntityName( + root="service.db.schema.test_table.id" + ), + ), + Column( + name=ColumnName(root="name"), + dataType=DataType.STRING, + fullyQualifiedName=FullyQualifiedEntityName( + root="service.db.schema.test_table.name" + ), + ), + ], + ) + + lineage_source = UnitycatalogLineageSource(self.config, mock_metadata) + + result = lineage_source._get_container_column_lineage(data_model, table_entity) + + self.assertIsNotNone(result) + self.assertEqual(len(result.columnsLineage), 2) + self.assertEqual(result.source, LineageSource.ExternalTableLineage) + self.assertEqual( + result.columnsLineage[0].fromColumns[0].root, "service.container.id" + ) + self.assertEqual( + result.columnsLineage[0].toColumn.root, "service.db.schema.test_table.id" + ) + self.assertEqual( + result.columnsLineage[1].fromColumns[0].root, "service.container.name" + ) + self.assertEqual( + result.columnsLineage[1].toColumn.root, "service.db.schema.test_table.name" + ) + + @patch( + "metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection" + ) + @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata") + def test_handle_external_location_lineage_upstream( + self, mock_metadata, mock_test_connection + ): + """Test _handle_external_location_lineage for upstream""" + mock_test_connection.return_value = None + + file_info = FileInfo( + path="s3://bucket/path/file.parquet", + storage_location="s3://bucket/path", + securable_type="EXTERNAL_LOCATION", + ) + + table_entity = Table( + id=uuid4(), + name=EntityName(root="test_table"), + columns=[], + ) + + container_entity = Container( + id=uuid4(), + name=EntityName(root="test_container"), + service=EntityReference(id=uuid4(), type="storageService"), + ) + + mock_metadata.es_search_container_by_path.return_value = [container_entity] + + lineage_source = UnitycatalogLineageSource(self.config, mock_metadata) + + results = list( + lineage_source._handle_external_location_lineage( + file_info, table_entity, is_upstream=True + ) + ) + + self.assertEqual(len(results), 1) + self.assertIsInstance(results[0], Either) + self.assertIsInstance(results[0].right, AddLineageRequest) + self.assertEqual(results[0].right.edge.fromEntity.id, container_entity.id) + self.assertEqual(results[0].right.edge.fromEntity.type, "container") + self.assertEqual(results[0].right.edge.toEntity.id, table_entity.id) + self.assertEqual(results[0].right.edge.toEntity.type, "table") + + mock_metadata.es_search_container_by_path.assert_called_once_with( + full_path="s3://bucket/path", fields="dataModel" + ) + + @patch( + "metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection" + ) + @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata") + def test_handle_external_location_lineage_downstream( + self, mock_metadata, mock_test_connection + ): + """Test _handle_external_location_lineage for downstream""" + mock_test_connection.return_value = None + + file_info = FileInfo( + path="s3://bucket/path/file.parquet", + storage_location="s3://bucket/path", + securable_type="EXTERNAL_LOCATION", + ) + + table_entity = Table( + id=uuid4(), + name=EntityName(root="test_table"), + columns=[], + ) + + container_entity = Container( + id=uuid4(), + name=EntityName(root="test_container"), + service=EntityReference(id=uuid4(), type="storageService"), + ) + + mock_metadata.es_search_container_by_path.return_value = [container_entity] + + lineage_source = UnitycatalogLineageSource(self.config, mock_metadata) + + results = list( + lineage_source._handle_external_location_lineage( + file_info, table_entity, is_upstream=False + ) + ) + + self.assertEqual(len(results), 1) + self.assertIsInstance(results[0], Either) + self.assertIsInstance(results[0].right, AddLineageRequest) + self.assertEqual(results[0].right.edge.fromEntity.id, table_entity.id) + self.assertEqual(results[0].right.edge.fromEntity.type, "table") + self.assertEqual(results[0].right.edge.toEntity.id, container_entity.id) + self.assertEqual(results[0].right.edge.toEntity.type, "container") + + @patch( + "metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection" + ) + @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata") + def test_handle_external_location_lineage_no_container( + self, mock_metadata, mock_test_connection + ): + """Test _handle_external_location_lineage when container is not found""" + mock_test_connection.return_value = None + + file_info = FileInfo( + path="s3://bucket/path/file.parquet", + storage_location="s3://bucket/path", + securable_type="EXTERNAL_LOCATION", + ) + + table_entity = Table( + id=uuid4(), + name=EntityName(root="test_table"), + columns=[], + ) + + mock_metadata.es_search_container_by_path.return_value = [] + + lineage_source = UnitycatalogLineageSource(self.config, mock_metadata) + + results = list( + lineage_source._handle_external_location_lineage( + file_info, table_entity, is_upstream=True + ) + ) + + self.assertEqual(len(results), 0) + + @patch( + "metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection" + ) + @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata") + def test_handle_upstream_table_with_table_info( + self, mock_metadata, mock_test_connection + ): + """Test _handle_upstream_table with tableInfo""" + mock_test_connection.return_value = None + + upstream_table_info = DatabricksTable( + name="source_table", + catalog_name="demo-test-cat", + schema_name="test-schema", + table_type="TABLE", + ) + + table_streams = LineageTableStreams( + upstreams=[LineageEntity(tableInfo=upstream_table_info)], downstreams=[] + ) + + current_table = Table( + id=uuid4(), + name=EntityName(root="current_table"), + columns=[], + ) + + upstream_table = Table( + id=uuid4(), + name=EntityName(root="source_table"), + columns=[], + ) + + mock_metadata.get_by_name.return_value = upstream_table + + lineage_source = UnitycatalogLineageSource(self.config, mock_metadata) + lineage_source.client = Mock() + lineage_source.client.get_column_lineage.return_value = Mock(upstream_cols=[]) + + results = list( + lineage_source._handle_upstream_table( + table_streams, current_table, "demo-test-cat.test-schema.current_table" + ) + ) + + self.assertEqual(len(results), 1) + self.assertIsInstance(results[0], Either) + self.assertEqual(results[0].right.edge.fromEntity.id, upstream_table.id) + self.assertEqual(results[0].right.edge.toEntity.id, current_table.id) + + @patch( + "metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection" + ) + @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata") + def test_handle_upstream_table_with_file_info( + self, mock_metadata, mock_test_connection + ): + """Test _handle_upstream_table with fileInfo""" + mock_test_connection.return_value = None + + file_info = FileInfo( + path="s3://bucket/path/file.parquet", + storage_location="s3://bucket/path", + securable_type="EXTERNAL_LOCATION", + ) + + table_streams = LineageTableStreams( + upstreams=[LineageEntity(fileInfo=file_info)], downstreams=[] + ) + + current_table = Table( + id=uuid4(), + name=EntityName(root="current_table"), + columns=[], + ) + + container_entity = Container( + id=uuid4(), + name=EntityName(root="test_container"), + service=EntityReference(id=uuid4(), type="storageService"), + ) + + mock_metadata.es_search_container_by_path.return_value = [container_entity] + + lineage_source = UnitycatalogLineageSource(self.config, mock_metadata) + + results = list( + lineage_source._handle_upstream_table( + table_streams, current_table, "demo-test-cat.test-schema.current_table" + ) + ) + + self.assertEqual(len(results), 1) + self.assertIsInstance(results[0], Either) + self.assertEqual(results[0].right.edge.fromEntity.id, container_entity.id) + self.assertEqual(results[0].right.edge.toEntity.id, current_table.id)