mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-29 17:49:14 +00:00
feat: Unity Catalog Lineage Enhancement: External Location Support (#23790)
This commit is contained in:
parent
63336dd98c
commit
4708c2b64f
@ -41,7 +41,7 @@ from metadata.ingestion.source.database.unitycatalog.models import (
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get"
|
||||
TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage"
|
||||
COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get"
|
||||
TABLES_PATH = "/unity-catalog/tables"
|
||||
|
||||
|
||||
@ -15,6 +15,7 @@ import traceback
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.container import ContainerDataModel
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
@ -90,6 +91,41 @@ class UnitycatalogLineageSource(Source):
|
||||
)
|
||||
return cls(config, metadata)
|
||||
|
||||
def _get_data_model_column_fqn(
|
||||
self, data_model_entity: ContainerDataModel, column: str
|
||||
) -> Optional[str]:
|
||||
if not data_model_entity:
|
||||
return None
|
||||
for entity_column in data_model_entity.columns:
|
||||
if entity_column.displayName.lower() == column.lower():
|
||||
return entity_column.fullyQualifiedName.root
|
||||
return None
|
||||
|
||||
def _get_container_column_lineage(
|
||||
self, data_model_entity: ContainerDataModel, table_entity: Table
|
||||
) -> Optional[LineageDetails]:
|
||||
try:
|
||||
column_lineage = []
|
||||
for column in table_entity.columns:
|
||||
from_column = self._get_data_model_column_fqn(
|
||||
data_model_entity=data_model_entity, column=column.name.root
|
||||
)
|
||||
to_column = column.fullyQualifiedName.root
|
||||
if from_column and to_column:
|
||||
column_lineage.append(
|
||||
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
|
||||
)
|
||||
if column_lineage:
|
||||
return LineageDetails(
|
||||
columnsLineage=column_lineage,
|
||||
source=LineageSource.ExternalTableLineage,
|
||||
)
|
||||
return None
|
||||
except Exception as exc:
|
||||
logger.debug(f"Error computing container column lineage: {exc}")
|
||||
logger.debug(traceback.format_exc())
|
||||
return None
|
||||
|
||||
def _get_lineage_details(
|
||||
self, from_table: Table, to_table: Table, databricks_table_fqn: str
|
||||
) -> Optional[LineageDetails]:
|
||||
@ -124,16 +160,87 @@ class UnitycatalogLineageSource(Source):
|
||||
logger.debug(traceback.format_exc())
|
||||
return None
|
||||
|
||||
def _handle_external_location_lineage(
|
||||
self, file_info, table: Table, is_upstream: bool
|
||||
) -> Iterable[Either[AddLineageRequest]]:
|
||||
try:
|
||||
if not file_info.storage_location:
|
||||
logger.debug("No storage location found in fileInfo")
|
||||
return
|
||||
|
||||
location_entity = self.metadata.es_search_container_by_path(
|
||||
full_path=file_info.storage_location, fields="dataModel"
|
||||
)
|
||||
|
||||
if location_entity and location_entity[0]:
|
||||
lineage_details = None
|
||||
if location_entity[0].dataModel:
|
||||
lineage_details = self._get_container_column_lineage(
|
||||
location_entity[0].dataModel, table
|
||||
)
|
||||
|
||||
if is_upstream:
|
||||
yield Either(
|
||||
left=None,
|
||||
right=AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(
|
||||
id=location_entity[0].id,
|
||||
type="container",
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=table.id,
|
||||
type="table",
|
||||
),
|
||||
lineageDetails=lineage_details,
|
||||
)
|
||||
),
|
||||
)
|
||||
else:
|
||||
yield Either(
|
||||
left=None,
|
||||
right=AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(
|
||||
id=table.id,
|
||||
type="table",
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=location_entity[0].id,
|
||||
type="container",
|
||||
),
|
||||
lineageDetails=lineage_details,
|
||||
)
|
||||
),
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
f"Unable to find container for external location: {file_info.storage_location}"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(
|
||||
f"Error while processing external location lineage for {file_info.storage_location}: {exc}"
|
||||
)
|
||||
logger.debug(traceback.format_exc())
|
||||
|
||||
def _handle_upstream_table(
|
||||
self,
|
||||
table_streams: LineageTableStreams,
|
||||
table: Table,
|
||||
databricks_table_fqn: str,
|
||||
) -> Iterable[Either[AddLineageRequest]]:
|
||||
for upstream_table in table_streams.upstream_tables:
|
||||
for upstream_entity in table_streams.upstreams:
|
||||
try:
|
||||
if not upstream_table.name:
|
||||
if upstream_entity.fileInfo:
|
||||
yield from self._handle_external_location_lineage(
|
||||
upstream_entity.fileInfo, table, is_upstream=True
|
||||
)
|
||||
continue
|
||||
|
||||
if not upstream_entity.tableInfo or not upstream_entity.tableInfo.name:
|
||||
continue
|
||||
|
||||
upstream_table = upstream_entity.tableInfo
|
||||
from_entity_fqn = fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=Table,
|
||||
@ -172,9 +279,69 @@ class UnitycatalogLineageSource(Source):
|
||||
)
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"Error while processing lineage for "
|
||||
f"{upstream_table.catalog_name}.{upstream_table.schema_name}.{upstream_table.name}"
|
||||
f" -> {databricks_table_fqn}"
|
||||
"Error while processing upstream lineage for "
|
||||
f"{databricks_table_fqn}"
|
||||
)
|
||||
logger.debug(traceback.format_exc())
|
||||
|
||||
def _handle_downstream_table(
|
||||
self,
|
||||
table_streams: LineageTableStreams,
|
||||
table: Table,
|
||||
databricks_table_fqn: str,
|
||||
) -> Iterable[Either[AddLineageRequest]]:
|
||||
for downstream_entity in table_streams.downstreams:
|
||||
try:
|
||||
if downstream_entity.fileInfo:
|
||||
yield from self._handle_external_location_lineage(
|
||||
downstream_entity.fileInfo, table, is_upstream=False
|
||||
)
|
||||
continue
|
||||
|
||||
if (
|
||||
not downstream_entity.tableInfo
|
||||
or not downstream_entity.tableInfo.name
|
||||
):
|
||||
continue
|
||||
|
||||
downstream_table = downstream_entity.tableInfo
|
||||
to_entity_fqn = fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=Table,
|
||||
database_name=downstream_table.catalog_name,
|
||||
schema_name=downstream_table.schema_name,
|
||||
table_name=downstream_table.name,
|
||||
service_name=self.config.serviceName,
|
||||
)
|
||||
|
||||
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_entity_fqn)
|
||||
if to_entity:
|
||||
downstream_table_fqn = f"{downstream_table.catalog_name}.{downstream_table.schema_name}.{downstream_table.name}"
|
||||
lineage_details = self._get_lineage_details(
|
||||
from_table=table,
|
||||
to_table=to_entity,
|
||||
databricks_table_fqn=downstream_table_fqn,
|
||||
)
|
||||
yield Either(
|
||||
left=None,
|
||||
right=AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=table.id, type="table"),
|
||||
toEntity=EntityReference(id=to_entity.id, type="table"),
|
||||
lineageDetails=lineage_details,
|
||||
)
|
||||
),
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
f"Unable to find downstream entity for "
|
||||
f"{databricks_table_fqn} -> "
|
||||
f"{downstream_table.catalog_name}.{downstream_table.schema_name}.{downstream_table.name}"
|
||||
)
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"Error while processing downstream lineage for "
|
||||
f"{databricks_table_fqn}"
|
||||
)
|
||||
logger.debug(traceback.format_exc())
|
||||
|
||||
@ -225,10 +392,17 @@ class UnitycatalogLineageSource(Source):
|
||||
table_streams: LineageTableStreams = self.client.get_table_lineage(
|
||||
databricks_table_fqn
|
||||
)
|
||||
|
||||
# Process upstream lineage
|
||||
yield from self._handle_upstream_table(
|
||||
table_streams, table, databricks_table_fqn
|
||||
)
|
||||
|
||||
# Process downstream lineage
|
||||
yield from self._handle_downstream_table(
|
||||
table_streams, table, databricks_table_fqn
|
||||
)
|
||||
|
||||
def test_connection(self) -> None:
|
||||
test_connection_common(
|
||||
self.metadata, self.connection_obj, self.service_connection
|
||||
|
||||
@ -22,6 +22,8 @@ class DatabricksTable(BaseModel):
|
||||
name: Optional[str] = None
|
||||
catalog_name: Optional[str] = None
|
||||
schema_name: Optional[str] = None
|
||||
table_type: Optional[str] = None
|
||||
lineage_timestamp: Optional[str] = None
|
||||
|
||||
|
||||
class DatabricksColumn(BaseModel):
|
||||
@ -31,9 +33,23 @@ class DatabricksColumn(BaseModel):
|
||||
table_name: Optional[str] = None
|
||||
|
||||
|
||||
class FileInfo(BaseModel):
|
||||
path: Optional[str] = None
|
||||
has_permission: Optional[bool] = None
|
||||
securable_name: Optional[str] = None
|
||||
storage_location: Optional[str] = None
|
||||
securable_type: Optional[str] = None
|
||||
lineage_timestamp: Optional[str] = None
|
||||
|
||||
|
||||
class LineageEntity(BaseModel):
|
||||
tableInfo: Optional[DatabricksTable] = None
|
||||
fileInfo: Optional[FileInfo] = None
|
||||
|
||||
|
||||
class LineageTableStreams(BaseModel):
|
||||
upstream_tables: Optional[List[DatabricksTable]] = []
|
||||
downstream_tables: Optional[List[DatabricksTable]] = []
|
||||
upstreams: Optional[List[LineageEntity]] = []
|
||||
downstreams: Optional[List[LineageEntity]] = []
|
||||
|
||||
|
||||
class LineageColumnStreams(BaseModel):
|
||||
|
||||
@ -356,6 +356,7 @@ class S3Source(StorageServiceSource):
|
||||
if entry
|
||||
and entry.get("Key")
|
||||
and len(entry.get("Key").split("/")) > total_depth
|
||||
and "/_delta_log/" not in entry.get("Key")
|
||||
}
|
||||
for key in candidate_keys:
|
||||
metadata_entry_copy = deepcopy(metadata_entry)
|
||||
@ -469,7 +470,10 @@ class S3Source(StorageServiceSource):
|
||||
candidate_keys = [
|
||||
entry["Key"]
|
||||
for entry in response
|
||||
if entry and entry.get("Key") and not entry.get("Key").endswith("/")
|
||||
if entry
|
||||
and entry.get("Key")
|
||||
and not entry.get("Key").endswith("/")
|
||||
and "/_delta_log/" not in entry.get("Key")
|
||||
]
|
||||
for key in candidate_keys:
|
||||
if self.is_valid_unstructured_file(metadata_entry.unstructuredFormats, key):
|
||||
@ -678,7 +682,10 @@ class S3Source(StorageServiceSource):
|
||||
candidate_keys = [
|
||||
entry["Key"]
|
||||
for entry in response[S3_CLIENT_ROOT_RESPONSE]
|
||||
if entry and entry.get("Key") and not entry.get("Key").endswith("/")
|
||||
if entry
|
||||
and entry.get("Key")
|
||||
and not entry.get("Key").endswith("/")
|
||||
and "/_delta_log/" not in entry.get("Key")
|
||||
]
|
||||
# pick a random key out of the candidates if any were returned
|
||||
if candidate_keys:
|
||||
|
||||
@ -0,0 +1,513 @@
|
||||
# Copyright 2025 Collate
|
||||
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Test Unity Catalog lineage functionality
|
||||
"""
|
||||
|
||||
from unittest import TestCase
|
||||
from unittest.mock import Mock, patch
|
||||
from uuid import uuid4
|
||||
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.container import (
|
||||
Container,
|
||||
ContainerDataModel,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
Column,
|
||||
ColumnName,
|
||||
DataType,
|
||||
Table,
|
||||
)
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
Source as WorkflowSource,
|
||||
)
|
||||
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
|
||||
from metadata.generated.schema.type.entityLineage import Source as LineageSource
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.models import Either
|
||||
from metadata.ingestion.source.database.unitycatalog.lineage import (
|
||||
UnitycatalogLineageSource,
|
||||
)
|
||||
from metadata.ingestion.source.database.unitycatalog.models import (
|
||||
DatabricksTable,
|
||||
FileInfo,
|
||||
LineageEntity,
|
||||
LineageTableStreams,
|
||||
)
|
||||
|
||||
mock_unitycatalog_lineage_config = {
|
||||
"source": {
|
||||
"type": "unitycatalog-lineage",
|
||||
"serviceName": "local_unitycatalog",
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"type": "UnityCatalog",
|
||||
"catalog": "demo-test-cat",
|
||||
"databaseSchema": "test-schema",
|
||||
"authType": {"token": "test_token"},
|
||||
"hostPort": "localhost:443",
|
||||
"httpPath": "/sql/1.0/warehouses/test",
|
||||
}
|
||||
},
|
||||
"sourceConfig": {"config": {"type": "DatabaseLineage"}},
|
||||
},
|
||||
"sink": {"type": "metadata-rest", "config": {}},
|
||||
"workflowConfig": {
|
||||
"openMetadataServerConfig": {
|
||||
"hostPort": "http://localhost:8585/api",
|
||||
"authProvider": "openmetadata",
|
||||
"securityConfig": {"jwtToken": "test_token"},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestUnityCatalogLineage(TestCase):
|
||||
"""
|
||||
Unity Catalog lineage unit tests
|
||||
"""
|
||||
|
||||
@patch(
|
||||
"metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection"
|
||||
)
|
||||
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata")
|
||||
def setUp(self, mock_metadata, mock_test_connection):
|
||||
mock_test_connection.return_value = None
|
||||
self.mock_metadata = mock_metadata
|
||||
self.config = WorkflowSource.model_validate(
|
||||
mock_unitycatalog_lineage_config["source"]
|
||||
)
|
||||
self.lineage_source = UnitycatalogLineageSource(self.config, self.mock_metadata)
|
||||
|
||||
def test_lineage_table_streams_with_table_info(self):
|
||||
"""Test LineageTableStreams model with tableInfo"""
|
||||
upstream_table = DatabricksTable(
|
||||
name="source_table",
|
||||
catalog_name="demo-test-cat",
|
||||
schema_name="test-schema",
|
||||
table_type="TABLE",
|
||||
lineage_timestamp="2025-10-08 11:16:26.0",
|
||||
)
|
||||
|
||||
downstream_table = DatabricksTable(
|
||||
name="target_table",
|
||||
catalog_name="demo-test-cat",
|
||||
schema_name="test-schema",
|
||||
table_type="TABLE",
|
||||
lineage_timestamp="2025-10-08 11:16:26.0",
|
||||
)
|
||||
|
||||
lineage_streams = LineageTableStreams(
|
||||
upstreams=[LineageEntity(tableInfo=upstream_table)],
|
||||
downstreams=[LineageEntity(tableInfo=downstream_table)],
|
||||
)
|
||||
|
||||
self.assertEqual(len(lineage_streams.upstreams), 1)
|
||||
self.assertEqual(len(lineage_streams.downstreams), 1)
|
||||
self.assertEqual(lineage_streams.upstreams[0].tableInfo.name, "source_table")
|
||||
self.assertEqual(lineage_streams.downstreams[0].tableInfo.name, "target_table")
|
||||
self.assertIsNone(lineage_streams.upstreams[0].fileInfo)
|
||||
self.assertIsNone(lineage_streams.downstreams[0].fileInfo)
|
||||
|
||||
def test_lineage_table_streams_with_file_info(self):
|
||||
"""Test LineageTableStreams model with fileInfo"""
|
||||
file_info = FileInfo(
|
||||
path="s3://bucket/path/file.parquet",
|
||||
has_permission=True,
|
||||
securable_name="test_location",
|
||||
storage_location="s3://bucket/path",
|
||||
securable_type="EXTERNAL_LOCATION",
|
||||
lineage_timestamp="2025-10-08 10:37:42.0",
|
||||
)
|
||||
|
||||
lineage_streams = LineageTableStreams(
|
||||
upstreams=[LineageEntity(fileInfo=file_info)], downstreams=[]
|
||||
)
|
||||
|
||||
self.assertEqual(len(lineage_streams.upstreams), 1)
|
||||
self.assertEqual(
|
||||
lineage_streams.upstreams[0].fileInfo.path, "s3://bucket/path/file.parquet"
|
||||
)
|
||||
self.assertEqual(
|
||||
lineage_streams.upstreams[0].fileInfo.storage_location, "s3://bucket/path"
|
||||
)
|
||||
self.assertIsNone(lineage_streams.upstreams[0].tableInfo)
|
||||
|
||||
def test_lineage_table_streams_mixed(self):
|
||||
"""Test LineageTableStreams with both tableInfo and fileInfo"""
|
||||
table_info = DatabricksTable(
|
||||
name="table1",
|
||||
catalog_name="demo-test-cat",
|
||||
schema_name="test-schema",
|
||||
table_type="TABLE",
|
||||
)
|
||||
|
||||
file_info = FileInfo(
|
||||
path="s3://bucket/path/file.parquet",
|
||||
storage_location="s3://bucket/path",
|
||||
securable_type="EXTERNAL_LOCATION",
|
||||
)
|
||||
|
||||
lineage_streams = LineageTableStreams(
|
||||
upstreams=[
|
||||
LineageEntity(tableInfo=table_info),
|
||||
LineageEntity(fileInfo=file_info),
|
||||
],
|
||||
downstreams=[],
|
||||
)
|
||||
|
||||
self.assertEqual(len(lineage_streams.upstreams), 2)
|
||||
self.assertIsNotNone(lineage_streams.upstreams[0].tableInfo)
|
||||
self.assertIsNone(lineage_streams.upstreams[0].fileInfo)
|
||||
self.assertIsNone(lineage_streams.upstreams[1].tableInfo)
|
||||
self.assertIsNotNone(lineage_streams.upstreams[1].fileInfo)
|
||||
|
||||
@patch(
|
||||
"metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection"
|
||||
)
|
||||
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata")
|
||||
def test_get_data_model_column_fqn(self, mock_metadata, mock_test_connection):
|
||||
"""Test _get_data_model_column_fqn method"""
|
||||
mock_test_connection.return_value = None
|
||||
|
||||
data_model = ContainerDataModel(
|
||||
columns=[
|
||||
Column(
|
||||
name=ColumnName(root="id"),
|
||||
displayName="id",
|
||||
dataType=DataType.INT,
|
||||
fullyQualifiedName=FullyQualifiedEntityName(
|
||||
root="service.container.id"
|
||||
),
|
||||
),
|
||||
Column(
|
||||
name=ColumnName(root="name"),
|
||||
displayName="name",
|
||||
dataType=DataType.STRING,
|
||||
fullyQualifiedName=FullyQualifiedEntityName(
|
||||
root="service.container.name"
|
||||
),
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
lineage_source = UnitycatalogLineageSource(self.config, mock_metadata)
|
||||
|
||||
result = lineage_source._get_data_model_column_fqn(data_model, "id")
|
||||
self.assertEqual(result, "service.container.id")
|
||||
|
||||
result = lineage_source._get_data_model_column_fqn(data_model, "name")
|
||||
self.assertEqual(result, "service.container.name")
|
||||
|
||||
result = lineage_source._get_data_model_column_fqn(data_model, "nonexistent")
|
||||
self.assertIsNone(result)
|
||||
|
||||
result = lineage_source._get_data_model_column_fqn(None, "id")
|
||||
self.assertIsNone(result)
|
||||
|
||||
@patch(
|
||||
"metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection"
|
||||
)
|
||||
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata")
|
||||
def test_get_container_column_lineage(self, mock_metadata, mock_test_connection):
|
||||
"""Test _get_container_column_lineage method"""
|
||||
mock_test_connection.return_value = None
|
||||
|
||||
data_model = ContainerDataModel(
|
||||
columns=[
|
||||
Column(
|
||||
name=ColumnName(root="id"),
|
||||
displayName="id",
|
||||
dataType=DataType.INT,
|
||||
fullyQualifiedName=FullyQualifiedEntityName(
|
||||
root="service.container.id"
|
||||
),
|
||||
),
|
||||
Column(
|
||||
name=ColumnName(root="name"),
|
||||
displayName="name",
|
||||
dataType=DataType.STRING,
|
||||
fullyQualifiedName=FullyQualifiedEntityName(
|
||||
root="service.container.name"
|
||||
),
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name=EntityName(root="test_table"),
|
||||
columns=[
|
||||
Column(
|
||||
name=ColumnName(root="id"),
|
||||
dataType=DataType.INT,
|
||||
fullyQualifiedName=FullyQualifiedEntityName(
|
||||
root="service.db.schema.test_table.id"
|
||||
),
|
||||
),
|
||||
Column(
|
||||
name=ColumnName(root="name"),
|
||||
dataType=DataType.STRING,
|
||||
fullyQualifiedName=FullyQualifiedEntityName(
|
||||
root="service.db.schema.test_table.name"
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
lineage_source = UnitycatalogLineageSource(self.config, mock_metadata)
|
||||
|
||||
result = lineage_source._get_container_column_lineage(data_model, table_entity)
|
||||
|
||||
self.assertIsNotNone(result)
|
||||
self.assertEqual(len(result.columnsLineage), 2)
|
||||
self.assertEqual(result.source, LineageSource.ExternalTableLineage)
|
||||
self.assertEqual(
|
||||
result.columnsLineage[0].fromColumns[0].root, "service.container.id"
|
||||
)
|
||||
self.assertEqual(
|
||||
result.columnsLineage[0].toColumn.root, "service.db.schema.test_table.id"
|
||||
)
|
||||
self.assertEqual(
|
||||
result.columnsLineage[1].fromColumns[0].root, "service.container.name"
|
||||
)
|
||||
self.assertEqual(
|
||||
result.columnsLineage[1].toColumn.root, "service.db.schema.test_table.name"
|
||||
)
|
||||
|
||||
@patch(
|
||||
"metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection"
|
||||
)
|
||||
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata")
|
||||
def test_handle_external_location_lineage_upstream(
|
||||
self, mock_metadata, mock_test_connection
|
||||
):
|
||||
"""Test _handle_external_location_lineage for upstream"""
|
||||
mock_test_connection.return_value = None
|
||||
|
||||
file_info = FileInfo(
|
||||
path="s3://bucket/path/file.parquet",
|
||||
storage_location="s3://bucket/path",
|
||||
securable_type="EXTERNAL_LOCATION",
|
||||
)
|
||||
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name=EntityName(root="test_table"),
|
||||
columns=[],
|
||||
)
|
||||
|
||||
container_entity = Container(
|
||||
id=uuid4(),
|
||||
name=EntityName(root="test_container"),
|
||||
service=EntityReference(id=uuid4(), type="storageService"),
|
||||
)
|
||||
|
||||
mock_metadata.es_search_container_by_path.return_value = [container_entity]
|
||||
|
||||
lineage_source = UnitycatalogLineageSource(self.config, mock_metadata)
|
||||
|
||||
results = list(
|
||||
lineage_source._handle_external_location_lineage(
|
||||
file_info, table_entity, is_upstream=True
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertIsInstance(results[0], Either)
|
||||
self.assertIsInstance(results[0].right, AddLineageRequest)
|
||||
self.assertEqual(results[0].right.edge.fromEntity.id, container_entity.id)
|
||||
self.assertEqual(results[0].right.edge.fromEntity.type, "container")
|
||||
self.assertEqual(results[0].right.edge.toEntity.id, table_entity.id)
|
||||
self.assertEqual(results[0].right.edge.toEntity.type, "table")
|
||||
|
||||
mock_metadata.es_search_container_by_path.assert_called_once_with(
|
||||
full_path="s3://bucket/path", fields="dataModel"
|
||||
)
|
||||
|
||||
@patch(
|
||||
"metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection"
|
||||
)
|
||||
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata")
|
||||
def test_handle_external_location_lineage_downstream(
|
||||
self, mock_metadata, mock_test_connection
|
||||
):
|
||||
"""Test _handle_external_location_lineage for downstream"""
|
||||
mock_test_connection.return_value = None
|
||||
|
||||
file_info = FileInfo(
|
||||
path="s3://bucket/path/file.parquet",
|
||||
storage_location="s3://bucket/path",
|
||||
securable_type="EXTERNAL_LOCATION",
|
||||
)
|
||||
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name=EntityName(root="test_table"),
|
||||
columns=[],
|
||||
)
|
||||
|
||||
container_entity = Container(
|
||||
id=uuid4(),
|
||||
name=EntityName(root="test_container"),
|
||||
service=EntityReference(id=uuid4(), type="storageService"),
|
||||
)
|
||||
|
||||
mock_metadata.es_search_container_by_path.return_value = [container_entity]
|
||||
|
||||
lineage_source = UnitycatalogLineageSource(self.config, mock_metadata)
|
||||
|
||||
results = list(
|
||||
lineage_source._handle_external_location_lineage(
|
||||
file_info, table_entity, is_upstream=False
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertIsInstance(results[0], Either)
|
||||
self.assertIsInstance(results[0].right, AddLineageRequest)
|
||||
self.assertEqual(results[0].right.edge.fromEntity.id, table_entity.id)
|
||||
self.assertEqual(results[0].right.edge.fromEntity.type, "table")
|
||||
self.assertEqual(results[0].right.edge.toEntity.id, container_entity.id)
|
||||
self.assertEqual(results[0].right.edge.toEntity.type, "container")
|
||||
|
||||
@patch(
|
||||
"metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection"
|
||||
)
|
||||
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata")
|
||||
def test_handle_external_location_lineage_no_container(
|
||||
self, mock_metadata, mock_test_connection
|
||||
):
|
||||
"""Test _handle_external_location_lineage when container is not found"""
|
||||
mock_test_connection.return_value = None
|
||||
|
||||
file_info = FileInfo(
|
||||
path="s3://bucket/path/file.parquet",
|
||||
storage_location="s3://bucket/path",
|
||||
securable_type="EXTERNAL_LOCATION",
|
||||
)
|
||||
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name=EntityName(root="test_table"),
|
||||
columns=[],
|
||||
)
|
||||
|
||||
mock_metadata.es_search_container_by_path.return_value = []
|
||||
|
||||
lineage_source = UnitycatalogLineageSource(self.config, mock_metadata)
|
||||
|
||||
results = list(
|
||||
lineage_source._handle_external_location_lineage(
|
||||
file_info, table_entity, is_upstream=True
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(len(results), 0)
|
||||
|
||||
@patch(
|
||||
"metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection"
|
||||
)
|
||||
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata")
|
||||
def test_handle_upstream_table_with_table_info(
|
||||
self, mock_metadata, mock_test_connection
|
||||
):
|
||||
"""Test _handle_upstream_table with tableInfo"""
|
||||
mock_test_connection.return_value = None
|
||||
|
||||
upstream_table_info = DatabricksTable(
|
||||
name="source_table",
|
||||
catalog_name="demo-test-cat",
|
||||
schema_name="test-schema",
|
||||
table_type="TABLE",
|
||||
)
|
||||
|
||||
table_streams = LineageTableStreams(
|
||||
upstreams=[LineageEntity(tableInfo=upstream_table_info)], downstreams=[]
|
||||
)
|
||||
|
||||
current_table = Table(
|
||||
id=uuid4(),
|
||||
name=EntityName(root="current_table"),
|
||||
columns=[],
|
||||
)
|
||||
|
||||
upstream_table = Table(
|
||||
id=uuid4(),
|
||||
name=EntityName(root="source_table"),
|
||||
columns=[],
|
||||
)
|
||||
|
||||
mock_metadata.get_by_name.return_value = upstream_table
|
||||
|
||||
lineage_source = UnitycatalogLineageSource(self.config, mock_metadata)
|
||||
lineage_source.client = Mock()
|
||||
lineage_source.client.get_column_lineage.return_value = Mock(upstream_cols=[])
|
||||
|
||||
results = list(
|
||||
lineage_source._handle_upstream_table(
|
||||
table_streams, current_table, "demo-test-cat.test-schema.current_table"
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertIsInstance(results[0], Either)
|
||||
self.assertEqual(results[0].right.edge.fromEntity.id, upstream_table.id)
|
||||
self.assertEqual(results[0].right.edge.toEntity.id, current_table.id)
|
||||
|
||||
@patch(
|
||||
"metadata.ingestion.source.database.unitycatalog.lineage.UnitycatalogLineageSource.test_connection"
|
||||
)
|
||||
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata")
|
||||
def test_handle_upstream_table_with_file_info(
|
||||
self, mock_metadata, mock_test_connection
|
||||
):
|
||||
"""Test _handle_upstream_table with fileInfo"""
|
||||
mock_test_connection.return_value = None
|
||||
|
||||
file_info = FileInfo(
|
||||
path="s3://bucket/path/file.parquet",
|
||||
storage_location="s3://bucket/path",
|
||||
securable_type="EXTERNAL_LOCATION",
|
||||
)
|
||||
|
||||
table_streams = LineageTableStreams(
|
||||
upstreams=[LineageEntity(fileInfo=file_info)], downstreams=[]
|
||||
)
|
||||
|
||||
current_table = Table(
|
||||
id=uuid4(),
|
||||
name=EntityName(root="current_table"),
|
||||
columns=[],
|
||||
)
|
||||
|
||||
container_entity = Container(
|
||||
id=uuid4(),
|
||||
name=EntityName(root="test_container"),
|
||||
service=EntityReference(id=uuid4(), type="storageService"),
|
||||
)
|
||||
|
||||
mock_metadata.es_search_container_by_path.return_value = [container_entity]
|
||||
|
||||
lineage_source = UnitycatalogLineageSource(self.config, mock_metadata)
|
||||
|
||||
results = list(
|
||||
lineage_source._handle_upstream_table(
|
||||
table_streams, current_table, "demo-test-cat.test-schema.current_table"
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertIsInstance(results[0], Either)
|
||||
self.assertEqual(results[0].right.edge.fromEntity.id, container_entity.id)
|
||||
self.assertEqual(results[0].right.edge.toEntity.id, current_table.id)
|
||||
Loading…
x
Reference in New Issue
Block a user