diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index 00804af500b..1e488843a3a 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -29,12 +29,7 @@ from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT from metadata.ingestion.models.patch_request import build_patch from metadata.ingestion.ometa.client import REST, APIError -from metadata.ingestion.ometa.utils import ( - clean_lineage_columns, - get_entity_type, - model_str, - quote, -) +from metadata.ingestion.ometa.utils import get_entity_type, model_str, quote from metadata.utils.logger import ometa_logger from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache @@ -155,7 +150,6 @@ class OMetaLineageMixin(Generic[T]): original.edge.lineageDetails.columnsLineage = ( serialized_col_details_og ) - clean_lineage_columns(metadata=self, lineage_request=data) # Keep the pipeline information from the original # lineage if available @@ -171,7 +165,6 @@ class OMetaLineageMixin(Generic[T]): patch_op_success = True if patch_op_success is False: - clean_lineage_columns(metadata=self, lineage_request=data) self.client.put( self.get_suffix(AddLineageRequest), data=data.model_dump_json() ) diff --git a/ingestion/src/metadata/ingestion/ometa/utils.py b/ingestion/src/metadata/ingestion/ometa/utils.py index 8d28ce496d7..a378bbf9561 100644 --- a/ingestion/src/metadata/ingestion/ometa/utils.py +++ b/ingestion/src/metadata/ingestion/ometa/utils.py @@ -14,39 +14,16 @@ Helper functions to handle OpenMetadata Entities' properties import re import string -from functools import singledispatch -from typing import Any, List, Type, TypeVar, Union +from typing import Any, Type, TypeVar, Union from pydantic import BaseModel from requests.utils import quote as url_quote -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.data.apiEndpoint import APIEndpoint -from metadata.generated.schema.entity.data.container import Container -from metadata.generated.schema.entity.data.dashboard import Dashboard -from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel -from metadata.generated.schema.entity.data.metric import Metric -from metadata.generated.schema.entity.data.mlmodel import MlModel -from metadata.generated.schema.entity.data.searchIndex import SearchIndex -from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.generated.schema.type.entityReference import EntityReference T = TypeVar("T", bound=BaseModel) -LINEAGE_ENTITY_CLASS_MAP = { - "table": (Table, ("columns",)), - "searchIndex": (SearchIndex, ("fields",)), - "topic": (Topic, ("messageSchema",)), - "container": (Container, ("dataModel",)), - "dashboardDataModel": (DashboardDataModel, ("columns",)), - "dashboard": (Dashboard, ("charts",)), - "mlmodel": (MlModel, ("",)), - "apiEndpoint": (APIEndpoint, ("responseSchema", "requestSchema")), - "metric": (Metric, ("",)), -} - def format_name(name: str) -> str: """ @@ -121,182 +98,3 @@ def build_entity_reference(entity: T) -> EntityReference: description=entity.description, href=entity.href, ) - - -# pylint: disable=unused-argument,import-outside-toplevel,too-many-locals -@singledispatch -def column_name_list(entity: T) -> List[str]: - """ - helper function to get the column names of the entity - """ - return set() - - -def _get_column_names(column, parent_path: str = "") -> set: - """ - Helper function to recursively get column names with their full path - """ - result = set() - current_path = ( - f"{parent_path}.{column.name.root}" if parent_path else column.name.root - ) - result.add(current_path) - - if column.children: - for child in column.children: - result.update(_get_column_names(child, current_path)) - return result - - -@column_name_list.register(DashboardDataModel) -@column_name_list.register(Table) -def _(entity: Union[DashboardDataModel, Table]) -> List[str]: - """Get the column names of the table""" - result = set() - for column in entity.columns or []: - result.update(_get_column_names(column)) - return result - - -@column_name_list.register(Container) -def _(entity: Container) -> List[str]: - """Get the column names of the table""" - result = set() - if entity.dataModel and entity.dataModel.columns: - for column in entity.dataModel.columns: - result.update(_get_column_names(column)) - return result - - -@column_name_list.register(Dashboard) -def _(entity: Dashboard) -> List[str]: - """Get the column names of the table""" - from metadata.utils.fqn import split - - result = set() - if entity.charts and entity.charts.root: - for chart in entity.charts.root: - if chart.fullyQualifiedName: - split_fqn = split(chart.fullyQualifiedName) - if split_fqn: - result.add(split_fqn[-1]) - return result - - -@column_name_list.register(MlModel) -def _(entity: MlModel) -> List[str]: - """Get the column names of the table""" - result = set() - for feature in entity.mlFeatures or []: - result.add(feature.name.root) - if feature.featureSources: - result.update(column_name_list(feature.featureSources)) - return result - - -@column_name_list.register(Topic) -def _(entity: Topic) -> List[str]: - """Get the column names of the table""" - result = set() - if entity.messageSchema and entity.messageSchema.schemaFields: - for field in entity.messageSchema.schemaFields: - result.update(_get_column_names(field)) - return result - - -@column_name_list.register(APIEndpoint) -def _(entity: APIEndpoint) -> List[str]: - """Get the column names of the table""" - result = set() - if entity.requestSchema and entity.requestSchema.fields: - for field in entity.requestSchema.schemaFields: - result.add(field.name.root) - if field.children: - result.update(column_name_list(field.children)) - return result - if entity.responseSchema and entity.responseSchema.fields: - for field in entity.responseSchema.schemaFields: - result.add(field.name.root) - if field.children: - result.update(column_name_list(field.children)) - return result - return result - - -def clean_lineage_columns(metadata, lineage_request: AddLineageRequest) -> None: - """ - Replicate the behavior of validateChildren in the Backend and remove the invalid columns - """ - from metadata.utils.fqn import FQN_SEPARATOR - from metadata.utils.logger import utils_logger - - logger = utils_logger() - - if ( - lineage_request.edge - and lineage_request.edge.lineageDetails - and lineage_request.edge.lineageDetails.columnsLineage - and lineage_request.edge.fromEntity - and lineage_request.edge.toEntity - ): - from_class, from_fields = LINEAGE_ENTITY_CLASS_MAP.get( - lineage_request.edge.fromEntity.type, (None, None) - ) - to_class, to_fields = LINEAGE_ENTITY_CLASS_MAP.get( - lineage_request.edge.toEntity.type, (None, None) - ) - if not from_class or not to_class: - return - - from_entity = metadata.get_by_id( - entity=from_class, - entity_id=lineage_request.edge.fromEntity.id.root, - fields=from_fields, - ) - to_entity = metadata.get_by_id( - entity=to_class, - entity_id=lineage_request.edge.toEntity.id.root, - fields=to_fields, - ) - - if not from_entity or not to_entity: - return - - from_entity_columns = column_name_list(from_entity) - to_entity_columns = column_name_list(to_entity) - - cleaned_columns_lineage = [] - - for column_lineage in lineage_request.edge.lineageDetails.columnsLineage: - invalid_column = False - for from_column in column_lineage.fromColumns or []: - if hasattr(from_column, "root"): - from_column = from_column.root - from_column_name = from_column.replace( - from_entity.fullyQualifiedName.root + FQN_SEPARATOR, "" - ) - if from_column_name not in from_entity_columns: - invalid_column = True - logger.warning( - f"Ignoring invalid column {from_column} for lineage from {from_entity.fullyQualifiedName.root} " - f"to {to_entity.fullyQualifiedName.root}" - ) - - if column_lineage.toColumn: - to_column = column_lineage.toColumn - if hasattr(to_column, "root"): - to_column = to_column.root - to_column_name = to_column.replace( - to_entity.fullyQualifiedName.root + FQN_SEPARATOR, "" - ) - if to_column_name not in to_entity_columns: - logger.warning( - f"Ignoring invalid column {column_lineage.toColumn} for lineage " - f"from {from_entity.fullyQualifiedName.root} to {to_entity.fullyQualifiedName.root}" - ) - invalid_column = True - - if not invalid_column: - cleaned_columns_lineage.append(column_lineage) - - lineage_request.edge.lineageDetails.columnsLineage = cleaned_columns_lineage diff --git a/ingestion/tests/integration/ometa/test_ometa_lineage_api.py b/ingestion/tests/integration/ometa/test_ometa_lineage_api.py index 193cdc6b145..f0024617554 100644 --- a/ingestion/tests/integration/ometa/test_ometa_lineage_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_lineage_api.py @@ -21,7 +21,7 @@ from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDa from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.pipeline import Pipeline -from metadata.generated.schema.entity.data.table import Column, Table +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.pipelineService import PipelineService @@ -109,14 +109,6 @@ class OMetaLineageTest(TestCase): cls.table2_entity = cls.metadata.create_or_update(data=cls.table2) - cls.table3 = get_create_entity( - name=generate_name(), - entity=Table, - reference=cls.create_schema_entity.fullyQualifiedName, - ) - - cls.table3_entity = cls.metadata.create_or_update(data=cls.table3) - cls.pipeline = get_create_entity( name=generate_name(), entity=Pipeline, @@ -141,15 +133,6 @@ class OMetaLineageTest(TestCase): data=cls.dashboard_datamodel ) - cls.dashboard_datamodel2 = get_create_entity( - name=generate_name(), - entity=DashboardDataModel, - reference=cls.dashboard_service_entity.fullyQualifiedName, - ) - cls.dashboard_datamodel_entity2 = cls.metadata.create_or_update( - data=cls.dashboard_datamodel2 - ) - @classmethod def tearDownClass(cls) -> None: """ @@ -275,37 +258,6 @@ class OMetaLineageTest(TestCase): ) # Add a new column to the lineage edge - linage_request_2 = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference(id=self.table1_entity.id, type="table"), - toEntity=EntityReference(id=self.table2_entity.id, type="table"), - lineageDetails=LineageDetails( - description="test lineage", - columnsLineage=[ - ColumnLineage( - fromColumns=[ - f"{self.table1_entity.fullyQualifiedName.root}.another" - ], - toColumn=f"{self.table2_entity.fullyQualifiedName.root}.another", - ) - ], - ), - ), - ) - - res = self.metadata.add_lineage(data=linage_request_2, check_patch=True) - - res["entity"]["id"] = str(res["entity"]["id"]) - self.assertEqual(len(res["downstreamEdges"]), 1) - self.assertEqual( - res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"], - str(self.pipeline_entity.id.root), - ) - self.assertEqual( - len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 2 - ) - - # Invalid column test linage_request_2 = AddLineageRequest( edge=EntitiesEdge( fromEntity=EntityReference(id=self.table1_entity.id, type="table"), @@ -332,7 +284,6 @@ class OMetaLineageTest(TestCase): res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"], str(self.pipeline_entity.id.root), ) - # col lineage remains unchanged self.assertEqual( len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 2 ) @@ -458,197 +409,3 @@ class OMetaLineageTest(TestCase): entity_lineage.upstreamEdges[0].fromEntity.root == self.table1_entity.id.root ) - - def test_clean_lineage_columns(self): - """Test that clean_lineage_columns works""" - # Create a lineage request with both valid and invalid columns - table1 = get_create_entity( - name=generate_name(), - entity=Table, - reference=self.create_schema_entity.fullyQualifiedName, - ) - - table1_entity = self.metadata.create_or_update(data=table1) - table2 = get_create_entity( - name=generate_name(), - entity=Table, - reference=self.create_schema_entity.fullyQualifiedName, - ) - - table2_entity = self.metadata.create_or_update(data=table2) - lineage_request = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference(id=table1_entity.id, type="table"), - toEntity=EntityReference(id=table2_entity.id, type="table"), - lineageDetails=LineageDetails( - description="test lineage", - columnsLineage=[ - # Valid column lineage - ColumnLineage( - fromColumns=[f"{table1_entity.fullyQualifiedName.root}.id"], - toColumn=f"{table2_entity.fullyQualifiedName.root}.id", - ), - # Invalid column lineage - non-existent column - ColumnLineage( - fromColumns=[ - f"{table1_entity.fullyQualifiedName.root}.invalid_col" - ], - toColumn=f"{table2_entity.fullyQualifiedName.root}.invalid_col", - ), - # Invalid column lineage - wrong table - ColumnLineage( - fromColumns=["wrong_table.id"], - toColumn=f"{table2_entity.fullyQualifiedName.root}.id", - ), - ], - ), - ), - ) - - # Add the lineage with invalid columns - self.metadata.add_lineage(data=lineage_request) - - # Verify that only valid columns remain in the lineage - lineage = self.metadata.get_lineage_by_id( - entity=Table, entity_id=table2_entity.id.root - ) - self.assertEqual( - len(lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 1 - ) - self.assertEqual( - lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"][0][ - "fromColumns" - ][0], - f"{table1_entity.fullyQualifiedName.root}.id", - ) - self.assertEqual( - lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"][0][ - "toColumn" - ], - f"{table2_entity.fullyQualifiedName.root}.id", - ) - - def test_clean_lineage_columns_table_datamodel(self): - """Test clean_lineage_columns for table to dashboard datamodel lineage""" - # Create a lineage request with both valid and invalid columns - lineage_request = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference(id=self.table3_entity.id, type="table"), - toEntity=EntityReference( - id=self.dashboard_datamodel_entity2.id, type="dashboardDataModel" - ), - lineageDetails=LineageDetails( - description="test lineage", - columnsLineage=[ - # Valid column lineage - ColumnLineage( - fromColumns=[ - f"{self.table3_entity.fullyQualifiedName.root}.id" - ], - toColumn=f"{self.dashboard_datamodel_entity2.fullyQualifiedName.root}.id", - ), - # Invalid column lineage - non-existent column - ColumnLineage( - fromColumns=[ - f"{self.table3_entity.fullyQualifiedName.root}.invalid_col" - ], - toColumn=f"{self.dashboard_datamodel_entity2.fullyQualifiedName.root}.invalid_col", - ), - # Invalid column lineage - wrong table - ColumnLineage( - fromColumns=["wrong_table.id"], - toColumn=f"{self.dashboard_datamodel_entity2.fullyQualifiedName.root}.id", - ), - ], - ), - ), - ) - - # Add the lineage with invalid columns - self.metadata.add_lineage(data=lineage_request) - - # Verify that only valid columns remain in the lineage - lineage = self.metadata.get_lineage_by_name( - entity=DashboardDataModel, - fqn=self.dashboard_datamodel_entity2.fullyQualifiedName.root, - ) - self.assertEqual( - len(lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 1 - ) - self.assertEqual( - lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"][0][ - "fromColumns" - ][0], - f"{self.table3_entity.fullyQualifiedName.root}.id", - ) - self.assertEqual( - lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"][0][ - "toColumn" - ], - f"{self.dashboard_datamodel_entity2.fullyQualifiedName.root}.id", - ) - - def test_clean_lineage_columns_nested_columns(self): - """Test clean_lineage_columns with nested columns""" - # Create a table with nested columns - nested_table = get_create_entity( - name=generate_name(), - entity=Table, - reference=self.create_schema_entity.fullyQualifiedName, - ) - nested_table.columns = [ - Column(name="parent_col", dataType="STRING"), - Column( - name="nested_col", - dataType="STRING", - children=[ - Column(name="child1", dataType="STRING"), - Column(name="child2", dataType="STRING"), - ], - ), - ] - nested_table_entity = self.metadata.create_or_update(data=nested_table) - - # Create a lineage request with nested column references - lineage_request = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference(id=self.table3_entity.id, type="table"), - toEntity=EntityReference(id=nested_table_entity.id, type="table"), - lineageDetails=LineageDetails( - description="test lineage", - columnsLineage=[ - # Valid nested column lineage - ColumnLineage( - fromColumns=[ - f"{self.table3_entity.fullyQualifiedName.root}.id" - ], - toColumn=f"{nested_table_entity.fullyQualifiedName.root}.nested_col.child1", - ), - # Invalid nested column lineage - ColumnLineage( - fromColumns=[ - f"{self.table3_entity.fullyQualifiedName.root}.id" - ], - toColumn=f"{nested_table_entity.fullyQualifiedName.root}.nested_col.invalid_child", - ), - ], - ), - ), - ) - - # Add the lineage with invalid columns - self.metadata.add_lineage(data=lineage_request, check_patch=True) - - # Verify that only valid columns remain in the lineage - lineage = self.metadata.get_lineage_by_id( - entity=Table, entity_id=nested_table_entity.id.root - ) - self.assertEqual( - len(lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 1 - ) - self.assertEqual( - lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"][0][ - "toColumn" - ], - f"{nested_table_entity.fullyQualifiedName.root}.nested_col.child1", - ) diff --git a/ingestion/tests/unit/test_lineage_utils.py b/ingestion/tests/unit/test_lineage_utils.py deleted file mode 100644 index 3685263b0cc..00000000000 --- a/ingestion/tests/unit/test_lineage_utils.py +++ /dev/null @@ -1,438 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Lineage utils unit tests -""" - -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.data.container import ( - Container, - ContainerDataModel, -) -from metadata.generated.schema.entity.data.dashboard import Dashboard -from metadata.generated.schema.entity.data.dashboardDataModel import ( - DashboardDataModel, - DataModelType, -) -from metadata.generated.schema.entity.data.mlmodel import MlFeature, MlModel -from metadata.generated.schema.entity.data.table import Column, Table -from metadata.generated.schema.entity.data.topic import Topic -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( - OpenMetadataJWTClientConfig, -) -from metadata.generated.schema.type.basic import UUID, FullyQualifiedEntityName -from metadata.generated.schema.type.entityLineage import ( - ColumnLineage, - EntitiesEdge, - LineageDetails, -) -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.schema import DataTypeTopic, FieldModel -from metadata.generated.schema.type.schema import Topic as TopicSchema -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.ometa.utils import clean_lineage_columns, column_name_list - - -def test_column_name_list_table(): - """Test column_name_list for Table entity""" - # Create a table with nested columns - table = Table( - id=UUID("12345678-1234-5678-1234-567812345678"), - name="test_table", - columns=[ - Column(name="col1", dataType="STRING"), - Column( - name="nested_col", - dataType="STRING", - children=[ - Column(name="child1", dataType="STRING"), - Column(name="child2", dataType="STRING"), - ], - ), - ], - ) - - expected_columns = {"col1", "nested_col", "nested_col.child1", "nested_col.child2"} - result = column_name_list(table) - assert result == expected_columns - - -def test_column_name_list_table_deep_nesting(): - """Test column_name_list for Table entity with deep nesting""" - table = Table( - id=UUID("12345678-1234-5678-1234-567812345678"), - name="test_table", - columns=[ - Column( - name="parent_col", - dataType="STRING", - children=[ - Column( - name="child_col", - dataType="STRING", - children=[ - Column(name="grandchild_col", dataType="STRING"), - Column( - name="grandchild_col2", - dataType="STRING", - children=[ - Column( - name="great_grandchild_col", dataType="STRING" - ) - ], - ), - ], - ) - ], - ) - ], - ) - - expected_columns = { - "parent_col", - "parent_col.child_col", - "parent_col.child_col.grandchild_col", - "parent_col.child_col.grandchild_col2", - "parent_col.child_col.grandchild_col2.great_grandchild_col", - } - result = column_name_list(table) - assert result == expected_columns - - -def test_column_name_list_dashboard(): - """Test column_name_list for Dashboard entity""" - dashboard = Dashboard( - id=UUID("12345678-1234-5678-1234-567812345678"), - service=EntityReference( - id=UUID("12345678-1234-5678-1234-567812345678"), - type="service", - name="random_service", - fullyQualifiedName="random_service", - ), - name="test_dashboard", - charts=[ - EntityReference( - id=UUID("12345678-1234-5678-1234-567812345678"), - type="chart", - name="chart1", - fullyQualifiedName="dashboard.chart1", - ) - ], - ) - - expected_columns = {"chart1"} - result = column_name_list(dashboard) - assert result == expected_columns - - -def test_column_name_list_container(): - """Test column_name_list for Container entity""" - container = Container( - id=UUID("12345678-1234-5678-1234-567812345678"), - name="test_container", - service=EntityReference( - id=UUID("12345678-1234-5678-1234-567812345678"), - type="service", - name="random_service", - fullyQualifiedName="random_service", - ), - dataModel=ContainerDataModel( - columns=[ - Column(name="col1", dataType="STRING"), - Column(name="col2", dataType="STRING"), - ] - ), - ) - - expected_columns = {"col1", "col2"} - result = column_name_list(container) - assert result == expected_columns - - -def test_column_name_list_container_nested(): - """Test column_name_list for Container entity with nested columns""" - container = Container( - id=UUID("12345678-1234-5678-1234-567812345678"), - name="test_container", - service=EntityReference( - id=UUID("12345678-1234-5678-1234-567812345678"), - type="service", - name="random_service", - fullyQualifiedName="random_service", - ), - dataModel=ContainerDataModel( - columns=[ - Column( - name="parent_col", - dataType="STRING", - children=[ - Column( - name="child_col", - dataType="STRING", - children=[Column(name="grandchild_col", dataType="STRING")], - ) - ], - ) - ] - ), - ) - - expected_columns = { - "parent_col", - "parent_col.child_col", - "parent_col.child_col.grandchild_col", - } - result = column_name_list(container) - assert result == expected_columns - - -def test_column_name_list_mlmodel(): - """Test column_name_list for MlModel entity""" - mlmodel = MlModel( - id=UUID("12345678-1234-5678-1234-567812345678"), - name="test_model", - mlFeatures=[MlFeature(name="feature1")], - algorithm="test_algorithm", - service=EntityReference( - id=UUID("12345678-1234-5678-1234-567812345678"), - type="service", - name="random_service", - fullyQualifiedName="random_service", - ), - ) - - expected_columns = {"feature1"} - result = column_name_list(mlmodel) - assert result == expected_columns - - -def test_clean_lineage_columns(): - """Test clean_lineage_columns function""" - # Create test entities - source_table = Table( - id=UUID("12345678-1234-5678-1234-567812345678"), - name="source_table", - fullyQualifiedName="database.schema.source_table", - columns=[ - Column(name="col1", dataType="STRING"), - Column(name="col2", dataType="STRING"), - ], - ) - - target_table = Table( - id=UUID("87654321-4321-8765-4321-876543210987"), - name="target_table", - fullyQualifiedName="database.schema.target_table", - columns=[ - Column(name="col1", dataType="STRING"), - Column(name="col2", dataType="STRING"), - ], - ) - - # Create lineage request with valid and invalid columns - lineage_request = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=UUID("12345678-1234-5678-1234-567812345678"), - type="table", - name="source_table", - fullyQualifiedName="database.schema.source_table", - ), - toEntity=EntityReference( - id=UUID("87654321-4321-8765-4321-876543210987"), - type="table", - name="target_table", - fullyQualifiedName="database.schema.target_table", - ), - lineageDetails=LineageDetails( - columnsLineage=[ - ColumnLineage( - fromColumns=["database.schema.source_table.col1"], - toColumn="database.schema.target_table.col1", - ), - ColumnLineage( - fromColumns=["database.schema.source_table.invalid_col"], - toColumn="database.schema.target_table.invalid_col", - ), - ColumnLineage( - fromColumns=["database.schema.invalid_table.col1"], - toColumn="database.schema.target_table.col1", - ), - ] - ), - ) - ) - - # Create metadata instance with test entities - server_config = OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider="openmetadata", - securityConfig=OpenMetadataJWTClientConfig(jwtToken=""), - ) - metadata = OpenMetadata(server_config) - - metadata.get_by_id = ( - lambda entity, entity_id, fields: source_table - if entity_id == UUID("12345678-1234-5678-1234-567812345678") - else target_table - ) - - # Clean lineage columns - clean_lineage_columns(metadata, lineage_request) - - # Verify that only valid columns remain - assert len(lineage_request.edge.lineageDetails.columnsLineage) == 1 - assert lineage_request.edge.lineageDetails.columnsLineage[0].fromColumns == [ - FullyQualifiedEntityName("database.schema.source_table.col1") - ] - assert lineage_request.edge.lineageDetails.columnsLineage[ - 0 - ].toColumn == FullyQualifiedEntityName("database.schema.target_table.col1") - - -def test_column_name_list_dashboard_data_model(): - """Test column_name_list for DashboardDataModel entity""" - dashboard_data_model = DashboardDataModel( - id=UUID("12345678-1234-5678-1234-567812345678"), - name="test_dashboard_data_model", - service=EntityReference( - id=UUID("12345678-1234-5678-1234-567812345678"), - type="service", - name="random_service", - fullyQualifiedName="random_service", - ), - dataModelType=DataModelType.TableauDataModel, - columns=[ - Column( - name="parent_col", - dataType="STRING", - children=[ - Column( - name="child_col", - dataType="STRING", - children=[Column(name="grandchild_col", dataType="STRING")], - ) - ], - ) - ], - ) - - expected_columns = { - "parent_col", - "parent_col.child_col", - "parent_col.child_col.grandchild_col", - } - result = column_name_list(dashboard_data_model) - assert result == expected_columns - - -def test_clean_lineage_columns_topic_container(): - """Test clean_lineage_columns function for topic to container lineage""" - # Create test entities - source_topic = Topic( - partitions=1, - id=UUID("12345678-1234-5678-1234-567812345678"), - name="source_topic", - fullyQualifiedName="service.source_topic", - service=EntityReference( - id=UUID("12345678-1234-5678-1234-567812345678"), - type="service", - name="random_service", - fullyQualifiedName="random_service", - ), - messageSchema=TopicSchema( - schemaFields=[ - FieldModel(name="field1", dataType=DataTypeTopic.STRING), - FieldModel(name="field2", dataType=DataTypeTopic.STRING), - ] - ), - ) - - target_container = Container( - name="target_container", - id=UUID("87654321-4321-8765-4321-876543210987"), - service=EntityReference( - id=UUID("12345678-1234-5678-1234-567812345678"), - type="service", - name="random_service", - fullyQualifiedName="random_service", - ), - fullyQualifiedName="service.target_container", - dataModel=ContainerDataModel( - columns=[ - Column(name="col1", dataType="STRING"), - Column(name="col2", dataType="STRING"), - ] - ), - ) - - # Create lineage request with valid and invalid columns - lineage_request = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=UUID("12345678-1234-5678-1234-567812345678"), - type="topic", - name="source_topic", - fullyQualifiedName="service.source_topic", - ), - toEntity=EntityReference( - id=UUID("87654321-4321-8765-4321-876543210987"), - type="container", - name="target_container", - fullyQualifiedName="service.target_container", - ), - lineageDetails=LineageDetails( - columnsLineage=[ - ColumnLineage( - fromColumns=["service.source_topic.field1"], - toColumn="service.target_container.col1", - ), - ColumnLineage( - fromColumns=["service.source_topic.invalid_field"], - toColumn="service.target_container.invalid_col", - ), - ColumnLineage( - fromColumns=["service.invalid_topic.field1"], - toColumn="service.target_container.col1", - ), - ] - ), - ) - ) - - # Create metadata instance with test entities - server_config = OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider="openmetadata", - securityConfig=OpenMetadataJWTClientConfig(jwtToken=""), - ) - metadata = OpenMetadata(server_config) - metadata.get_by_id = ( - lambda entity, entity_id, fields: source_topic - if entity_id == UUID("12345678-1234-5678-1234-567812345678") - else target_container - ) - - # Clean lineage columns - clean_lineage_columns(metadata, lineage_request) - - # Verify that only valid columns remain - assert len(lineage_request.edge.lineageDetails.columnsLineage) == 1 - assert lineage_request.edge.lineageDetails.columnsLineage[0].fromColumns == [ - FullyQualifiedEntityName("service.source_topic.field1") - ] - assert lineage_request.edge.lineageDetails.columnsLineage[ - 0 - ].toColumn == FullyQualifiedEntityName("service.target_container.col1")