mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-31 20:51:26 +00:00
This commit is contained in:
parent
05b70a24f3
commit
ee5d8eee8b
@ -29,12 +29,7 @@ from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
|
||||
from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT
|
||||
from metadata.ingestion.models.patch_request import build_patch
|
||||
from metadata.ingestion.ometa.client import REST, APIError
|
||||
from metadata.ingestion.ometa.utils import (
|
||||
clean_lineage_columns,
|
||||
get_entity_type,
|
||||
model_str,
|
||||
quote,
|
||||
)
|
||||
from metadata.ingestion.ometa.utils import get_entity_type, model_str, quote
|
||||
from metadata.utils.logger import ometa_logger
|
||||
from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache
|
||||
|
||||
@ -155,7 +150,6 @@ class OMetaLineageMixin(Generic[T]):
|
||||
original.edge.lineageDetails.columnsLineage = (
|
||||
serialized_col_details_og
|
||||
)
|
||||
clean_lineage_columns(metadata=self, lineage_request=data)
|
||||
|
||||
# Keep the pipeline information from the original
|
||||
# lineage if available
|
||||
@ -171,7 +165,6 @@ class OMetaLineageMixin(Generic[T]):
|
||||
patch_op_success = True
|
||||
|
||||
if patch_op_success is False:
|
||||
clean_lineage_columns(metadata=self, lineage_request=data)
|
||||
self.client.put(
|
||||
self.get_suffix(AddLineageRequest), data=data.model_dump_json()
|
||||
)
|
||||
|
@ -14,39 +14,16 @@ Helper functions to handle OpenMetadata Entities' properties
|
||||
|
||||
import re
|
||||
import string
|
||||
from functools import singledispatch
|
||||
from typing import Any, List, Type, TypeVar, Union
|
||||
from typing import Any, Type, TypeVar, Union
|
||||
|
||||
from pydantic import BaseModel
|
||||
from requests.utils import quote as url_quote
|
||||
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.apiEndpoint import APIEndpoint
|
||||
from metadata.generated.schema.entity.data.container import Container
|
||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||
from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel
|
||||
from metadata.generated.schema.entity.data.metric import Metric
|
||||
from metadata.generated.schema.entity.data.mlmodel import MlModel
|
||||
from metadata.generated.schema.entity.data.searchIndex import SearchIndex
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.data.topic import Topic
|
||||
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
|
||||
T = TypeVar("T", bound=BaseModel)
|
||||
|
||||
LINEAGE_ENTITY_CLASS_MAP = {
|
||||
"table": (Table, ("columns",)),
|
||||
"searchIndex": (SearchIndex, ("fields",)),
|
||||
"topic": (Topic, ("messageSchema",)),
|
||||
"container": (Container, ("dataModel",)),
|
||||
"dashboardDataModel": (DashboardDataModel, ("columns",)),
|
||||
"dashboard": (Dashboard, ("charts",)),
|
||||
"mlmodel": (MlModel, ("",)),
|
||||
"apiEndpoint": (APIEndpoint, ("responseSchema", "requestSchema")),
|
||||
"metric": (Metric, ("",)),
|
||||
}
|
||||
|
||||
|
||||
def format_name(name: str) -> str:
|
||||
"""
|
||||
@ -121,182 +98,3 @@ def build_entity_reference(entity: T) -> EntityReference:
|
||||
description=entity.description,
|
||||
href=entity.href,
|
||||
)
|
||||
|
||||
|
||||
# pylint: disable=unused-argument,import-outside-toplevel,too-many-locals
|
||||
@singledispatch
|
||||
def column_name_list(entity: T) -> List[str]:
|
||||
"""
|
||||
helper function to get the column names of the entity
|
||||
"""
|
||||
return set()
|
||||
|
||||
|
||||
def _get_column_names(column, parent_path: str = "") -> set:
|
||||
"""
|
||||
Helper function to recursively get column names with their full path
|
||||
"""
|
||||
result = set()
|
||||
current_path = (
|
||||
f"{parent_path}.{column.name.root}" if parent_path else column.name.root
|
||||
)
|
||||
result.add(current_path)
|
||||
|
||||
if column.children:
|
||||
for child in column.children:
|
||||
result.update(_get_column_names(child, current_path))
|
||||
return result
|
||||
|
||||
|
||||
@column_name_list.register(DashboardDataModel)
|
||||
@column_name_list.register(Table)
|
||||
def _(entity: Union[DashboardDataModel, Table]) -> List[str]:
|
||||
"""Get the column names of the table"""
|
||||
result = set()
|
||||
for column in entity.columns or []:
|
||||
result.update(_get_column_names(column))
|
||||
return result
|
||||
|
||||
|
||||
@column_name_list.register(Container)
|
||||
def _(entity: Container) -> List[str]:
|
||||
"""Get the column names of the table"""
|
||||
result = set()
|
||||
if entity.dataModel and entity.dataModel.columns:
|
||||
for column in entity.dataModel.columns:
|
||||
result.update(_get_column_names(column))
|
||||
return result
|
||||
|
||||
|
||||
@column_name_list.register(Dashboard)
|
||||
def _(entity: Dashboard) -> List[str]:
|
||||
"""Get the column names of the table"""
|
||||
from metadata.utils.fqn import split
|
||||
|
||||
result = set()
|
||||
if entity.charts and entity.charts.root:
|
||||
for chart in entity.charts.root:
|
||||
if chart.fullyQualifiedName:
|
||||
split_fqn = split(chart.fullyQualifiedName)
|
||||
if split_fqn:
|
||||
result.add(split_fqn[-1])
|
||||
return result
|
||||
|
||||
|
||||
@column_name_list.register(MlModel)
|
||||
def _(entity: MlModel) -> List[str]:
|
||||
"""Get the column names of the table"""
|
||||
result = set()
|
||||
for feature in entity.mlFeatures or []:
|
||||
result.add(feature.name.root)
|
||||
if feature.featureSources:
|
||||
result.update(column_name_list(feature.featureSources))
|
||||
return result
|
||||
|
||||
|
||||
@column_name_list.register(Topic)
|
||||
def _(entity: Topic) -> List[str]:
|
||||
"""Get the column names of the table"""
|
||||
result = set()
|
||||
if entity.messageSchema and entity.messageSchema.schemaFields:
|
||||
for field in entity.messageSchema.schemaFields:
|
||||
result.update(_get_column_names(field))
|
||||
return result
|
||||
|
||||
|
||||
@column_name_list.register(APIEndpoint)
|
||||
def _(entity: APIEndpoint) -> List[str]:
|
||||
"""Get the column names of the table"""
|
||||
result = set()
|
||||
if entity.requestSchema and entity.requestSchema.fields:
|
||||
for field in entity.requestSchema.schemaFields:
|
||||
result.add(field.name.root)
|
||||
if field.children:
|
||||
result.update(column_name_list(field.children))
|
||||
return result
|
||||
if entity.responseSchema and entity.responseSchema.fields:
|
||||
for field in entity.responseSchema.schemaFields:
|
||||
result.add(field.name.root)
|
||||
if field.children:
|
||||
result.update(column_name_list(field.children))
|
||||
return result
|
||||
return result
|
||||
|
||||
|
||||
def clean_lineage_columns(metadata, lineage_request: AddLineageRequest) -> None:
|
||||
"""
|
||||
Replicate the behavior of validateChildren in the Backend and remove the invalid columns
|
||||
"""
|
||||
from metadata.utils.fqn import FQN_SEPARATOR
|
||||
from metadata.utils.logger import utils_logger
|
||||
|
||||
logger = utils_logger()
|
||||
|
||||
if (
|
||||
lineage_request.edge
|
||||
and lineage_request.edge.lineageDetails
|
||||
and lineage_request.edge.lineageDetails.columnsLineage
|
||||
and lineage_request.edge.fromEntity
|
||||
and lineage_request.edge.toEntity
|
||||
):
|
||||
from_class, from_fields = LINEAGE_ENTITY_CLASS_MAP.get(
|
||||
lineage_request.edge.fromEntity.type, (None, None)
|
||||
)
|
||||
to_class, to_fields = LINEAGE_ENTITY_CLASS_MAP.get(
|
||||
lineage_request.edge.toEntity.type, (None, None)
|
||||
)
|
||||
if not from_class or not to_class:
|
||||
return
|
||||
|
||||
from_entity = metadata.get_by_id(
|
||||
entity=from_class,
|
||||
entity_id=lineage_request.edge.fromEntity.id.root,
|
||||
fields=from_fields,
|
||||
)
|
||||
to_entity = metadata.get_by_id(
|
||||
entity=to_class,
|
||||
entity_id=lineage_request.edge.toEntity.id.root,
|
||||
fields=to_fields,
|
||||
)
|
||||
|
||||
if not from_entity or not to_entity:
|
||||
return
|
||||
|
||||
from_entity_columns = column_name_list(from_entity)
|
||||
to_entity_columns = column_name_list(to_entity)
|
||||
|
||||
cleaned_columns_lineage = []
|
||||
|
||||
for column_lineage in lineage_request.edge.lineageDetails.columnsLineage:
|
||||
invalid_column = False
|
||||
for from_column in column_lineage.fromColumns or []:
|
||||
if hasattr(from_column, "root"):
|
||||
from_column = from_column.root
|
||||
from_column_name = from_column.replace(
|
||||
from_entity.fullyQualifiedName.root + FQN_SEPARATOR, ""
|
||||
)
|
||||
if from_column_name not in from_entity_columns:
|
||||
invalid_column = True
|
||||
logger.warning(
|
||||
f"Ignoring invalid column {from_column} for lineage from {from_entity.fullyQualifiedName.root} "
|
||||
f"to {to_entity.fullyQualifiedName.root}"
|
||||
)
|
||||
|
||||
if column_lineage.toColumn:
|
||||
to_column = column_lineage.toColumn
|
||||
if hasattr(to_column, "root"):
|
||||
to_column = to_column.root
|
||||
to_column_name = to_column.replace(
|
||||
to_entity.fullyQualifiedName.root + FQN_SEPARATOR, ""
|
||||
)
|
||||
if to_column_name not in to_entity_columns:
|
||||
logger.warning(
|
||||
f"Ignoring invalid column {column_lineage.toColumn} for lineage "
|
||||
f"from {from_entity.fullyQualifiedName.root} to {to_entity.fullyQualifiedName.root}"
|
||||
)
|
||||
invalid_column = True
|
||||
|
||||
if not invalid_column:
|
||||
cleaned_columns_lineage.append(column_lineage)
|
||||
|
||||
lineage_request.edge.lineageDetails.columnsLineage = cleaned_columns_lineage
|
||||
|
@ -21,7 +21,7 @@ from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDa
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.pipeline import Pipeline
|
||||
from metadata.generated.schema.entity.data.table import Column, Table
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.services.dashboardService import DashboardService
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.generated.schema.entity.services.pipelineService import PipelineService
|
||||
@ -109,14 +109,6 @@ class OMetaLineageTest(TestCase):
|
||||
|
||||
cls.table2_entity = cls.metadata.create_or_update(data=cls.table2)
|
||||
|
||||
cls.table3 = get_create_entity(
|
||||
name=generate_name(),
|
||||
entity=Table,
|
||||
reference=cls.create_schema_entity.fullyQualifiedName,
|
||||
)
|
||||
|
||||
cls.table3_entity = cls.metadata.create_or_update(data=cls.table3)
|
||||
|
||||
cls.pipeline = get_create_entity(
|
||||
name=generate_name(),
|
||||
entity=Pipeline,
|
||||
@ -141,15 +133,6 @@ class OMetaLineageTest(TestCase):
|
||||
data=cls.dashboard_datamodel
|
||||
)
|
||||
|
||||
cls.dashboard_datamodel2 = get_create_entity(
|
||||
name=generate_name(),
|
||||
entity=DashboardDataModel,
|
||||
reference=cls.dashboard_service_entity.fullyQualifiedName,
|
||||
)
|
||||
cls.dashboard_datamodel_entity2 = cls.metadata.create_or_update(
|
||||
data=cls.dashboard_datamodel2
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
"""
|
||||
@ -275,37 +258,6 @@ class OMetaLineageTest(TestCase):
|
||||
)
|
||||
|
||||
# Add a new column to the lineage edge
|
||||
linage_request_2 = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=self.table1_entity.id, type="table"),
|
||||
toEntity=EntityReference(id=self.table2_entity.id, type="table"),
|
||||
lineageDetails=LineageDetails(
|
||||
description="test lineage",
|
||||
columnsLineage=[
|
||||
ColumnLineage(
|
||||
fromColumns=[
|
||||
f"{self.table1_entity.fullyQualifiedName.root}.another"
|
||||
],
|
||||
toColumn=f"{self.table2_entity.fullyQualifiedName.root}.another",
|
||||
)
|
||||
],
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
res = self.metadata.add_lineage(data=linage_request_2, check_patch=True)
|
||||
|
||||
res["entity"]["id"] = str(res["entity"]["id"])
|
||||
self.assertEqual(len(res["downstreamEdges"]), 1)
|
||||
self.assertEqual(
|
||||
res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"],
|
||||
str(self.pipeline_entity.id.root),
|
||||
)
|
||||
self.assertEqual(
|
||||
len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 2
|
||||
)
|
||||
|
||||
# Invalid column test
|
||||
linage_request_2 = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=self.table1_entity.id, type="table"),
|
||||
@ -332,7 +284,6 @@ class OMetaLineageTest(TestCase):
|
||||
res["downstreamEdges"][0]["lineageDetails"]["pipeline"]["id"],
|
||||
str(self.pipeline_entity.id.root),
|
||||
)
|
||||
# col lineage remains unchanged
|
||||
self.assertEqual(
|
||||
len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 2
|
||||
)
|
||||
@ -458,197 +409,3 @@ class OMetaLineageTest(TestCase):
|
||||
entity_lineage.upstreamEdges[0].fromEntity.root
|
||||
== self.table1_entity.id.root
|
||||
)
|
||||
|
||||
def test_clean_lineage_columns(self):
|
||||
"""Test that clean_lineage_columns works"""
|
||||
# Create a lineage request with both valid and invalid columns
|
||||
table1 = get_create_entity(
|
||||
name=generate_name(),
|
||||
entity=Table,
|
||||
reference=self.create_schema_entity.fullyQualifiedName,
|
||||
)
|
||||
|
||||
table1_entity = self.metadata.create_or_update(data=table1)
|
||||
table2 = get_create_entity(
|
||||
name=generate_name(),
|
||||
entity=Table,
|
||||
reference=self.create_schema_entity.fullyQualifiedName,
|
||||
)
|
||||
|
||||
table2_entity = self.metadata.create_or_update(data=table2)
|
||||
lineage_request = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=table1_entity.id, type="table"),
|
||||
toEntity=EntityReference(id=table2_entity.id, type="table"),
|
||||
lineageDetails=LineageDetails(
|
||||
description="test lineage",
|
||||
columnsLineage=[
|
||||
# Valid column lineage
|
||||
ColumnLineage(
|
||||
fromColumns=[f"{table1_entity.fullyQualifiedName.root}.id"],
|
||||
toColumn=f"{table2_entity.fullyQualifiedName.root}.id",
|
||||
),
|
||||
# Invalid column lineage - non-existent column
|
||||
ColumnLineage(
|
||||
fromColumns=[
|
||||
f"{table1_entity.fullyQualifiedName.root}.invalid_col"
|
||||
],
|
||||
toColumn=f"{table2_entity.fullyQualifiedName.root}.invalid_col",
|
||||
),
|
||||
# Invalid column lineage - wrong table
|
||||
ColumnLineage(
|
||||
fromColumns=["wrong_table.id"],
|
||||
toColumn=f"{table2_entity.fullyQualifiedName.root}.id",
|
||||
),
|
||||
],
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
# Add the lineage with invalid columns
|
||||
self.metadata.add_lineage(data=lineage_request)
|
||||
|
||||
# Verify that only valid columns remain in the lineage
|
||||
lineage = self.metadata.get_lineage_by_id(
|
||||
entity=Table, entity_id=table2_entity.id.root
|
||||
)
|
||||
self.assertEqual(
|
||||
len(lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 1
|
||||
)
|
||||
self.assertEqual(
|
||||
lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"][0][
|
||||
"fromColumns"
|
||||
][0],
|
||||
f"{table1_entity.fullyQualifiedName.root}.id",
|
||||
)
|
||||
self.assertEqual(
|
||||
lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"][0][
|
||||
"toColumn"
|
||||
],
|
||||
f"{table2_entity.fullyQualifiedName.root}.id",
|
||||
)
|
||||
|
||||
def test_clean_lineage_columns_table_datamodel(self):
|
||||
"""Test clean_lineage_columns for table to dashboard datamodel lineage"""
|
||||
# Create a lineage request with both valid and invalid columns
|
||||
lineage_request = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=self.table3_entity.id, type="table"),
|
||||
toEntity=EntityReference(
|
||||
id=self.dashboard_datamodel_entity2.id, type="dashboardDataModel"
|
||||
),
|
||||
lineageDetails=LineageDetails(
|
||||
description="test lineage",
|
||||
columnsLineage=[
|
||||
# Valid column lineage
|
||||
ColumnLineage(
|
||||
fromColumns=[
|
||||
f"{self.table3_entity.fullyQualifiedName.root}.id"
|
||||
],
|
||||
toColumn=f"{self.dashboard_datamodel_entity2.fullyQualifiedName.root}.id",
|
||||
),
|
||||
# Invalid column lineage - non-existent column
|
||||
ColumnLineage(
|
||||
fromColumns=[
|
||||
f"{self.table3_entity.fullyQualifiedName.root}.invalid_col"
|
||||
],
|
||||
toColumn=f"{self.dashboard_datamodel_entity2.fullyQualifiedName.root}.invalid_col",
|
||||
),
|
||||
# Invalid column lineage - wrong table
|
||||
ColumnLineage(
|
||||
fromColumns=["wrong_table.id"],
|
||||
toColumn=f"{self.dashboard_datamodel_entity2.fullyQualifiedName.root}.id",
|
||||
),
|
||||
],
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
# Add the lineage with invalid columns
|
||||
self.metadata.add_lineage(data=lineage_request)
|
||||
|
||||
# Verify that only valid columns remain in the lineage
|
||||
lineage = self.metadata.get_lineage_by_name(
|
||||
entity=DashboardDataModel,
|
||||
fqn=self.dashboard_datamodel_entity2.fullyQualifiedName.root,
|
||||
)
|
||||
self.assertEqual(
|
||||
len(lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 1
|
||||
)
|
||||
self.assertEqual(
|
||||
lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"][0][
|
||||
"fromColumns"
|
||||
][0],
|
||||
f"{self.table3_entity.fullyQualifiedName.root}.id",
|
||||
)
|
||||
self.assertEqual(
|
||||
lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"][0][
|
||||
"toColumn"
|
||||
],
|
||||
f"{self.dashboard_datamodel_entity2.fullyQualifiedName.root}.id",
|
||||
)
|
||||
|
||||
def test_clean_lineage_columns_nested_columns(self):
|
||||
"""Test clean_lineage_columns with nested columns"""
|
||||
# Create a table with nested columns
|
||||
nested_table = get_create_entity(
|
||||
name=generate_name(),
|
||||
entity=Table,
|
||||
reference=self.create_schema_entity.fullyQualifiedName,
|
||||
)
|
||||
nested_table.columns = [
|
||||
Column(name="parent_col", dataType="STRING"),
|
||||
Column(
|
||||
name="nested_col",
|
||||
dataType="STRING",
|
||||
children=[
|
||||
Column(name="child1", dataType="STRING"),
|
||||
Column(name="child2", dataType="STRING"),
|
||||
],
|
||||
),
|
||||
]
|
||||
nested_table_entity = self.metadata.create_or_update(data=nested_table)
|
||||
|
||||
# Create a lineage request with nested column references
|
||||
lineage_request = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=self.table3_entity.id, type="table"),
|
||||
toEntity=EntityReference(id=nested_table_entity.id, type="table"),
|
||||
lineageDetails=LineageDetails(
|
||||
description="test lineage",
|
||||
columnsLineage=[
|
||||
# Valid nested column lineage
|
||||
ColumnLineage(
|
||||
fromColumns=[
|
||||
f"{self.table3_entity.fullyQualifiedName.root}.id"
|
||||
],
|
||||
toColumn=f"{nested_table_entity.fullyQualifiedName.root}.nested_col.child1",
|
||||
),
|
||||
# Invalid nested column lineage
|
||||
ColumnLineage(
|
||||
fromColumns=[
|
||||
f"{self.table3_entity.fullyQualifiedName.root}.id"
|
||||
],
|
||||
toColumn=f"{nested_table_entity.fullyQualifiedName.root}.nested_col.invalid_child",
|
||||
),
|
||||
],
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
# Add the lineage with invalid columns
|
||||
self.metadata.add_lineage(data=lineage_request, check_patch=True)
|
||||
|
||||
# Verify that only valid columns remain in the lineage
|
||||
lineage = self.metadata.get_lineage_by_id(
|
||||
entity=Table, entity_id=nested_table_entity.id.root
|
||||
)
|
||||
self.assertEqual(
|
||||
len(lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 1
|
||||
)
|
||||
self.assertEqual(
|
||||
lineage["upstreamEdges"][0]["lineageDetails"]["columnsLineage"][0][
|
||||
"toColumn"
|
||||
],
|
||||
f"{nested_table_entity.fullyQualifiedName.root}.nested_col.child1",
|
||||
)
|
||||
|
@ -1,438 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Lineage utils unit tests
|
||||
"""
|
||||
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.container import (
|
||||
Container,
|
||||
ContainerDataModel,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||
from metadata.generated.schema.entity.data.dashboardDataModel import (
|
||||
DashboardDataModel,
|
||||
DataModelType,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.mlmodel import MlFeature, MlModel
|
||||
from metadata.generated.schema.entity.data.table import Column, Table
|
||||
from metadata.generated.schema.entity.data.topic import Topic
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
||||
OpenMetadataJWTClientConfig,
|
||||
)
|
||||
from metadata.generated.schema.type.basic import UUID, FullyQualifiedEntityName
|
||||
from metadata.generated.schema.type.entityLineage import (
|
||||
ColumnLineage,
|
||||
EntitiesEdge,
|
||||
LineageDetails,
|
||||
)
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.generated.schema.type.schema import DataTypeTopic, FieldModel
|
||||
from metadata.generated.schema.type.schema import Topic as TopicSchema
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.ometa.utils import clean_lineage_columns, column_name_list
|
||||
|
||||
|
||||
def test_column_name_list_table():
|
||||
"""Test column_name_list for Table entity"""
|
||||
# Create a table with nested columns
|
||||
table = Table(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
name="test_table",
|
||||
columns=[
|
||||
Column(name="col1", dataType="STRING"),
|
||||
Column(
|
||||
name="nested_col",
|
||||
dataType="STRING",
|
||||
children=[
|
||||
Column(name="child1", dataType="STRING"),
|
||||
Column(name="child2", dataType="STRING"),
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
expected_columns = {"col1", "nested_col", "nested_col.child1", "nested_col.child2"}
|
||||
result = column_name_list(table)
|
||||
assert result == expected_columns
|
||||
|
||||
|
||||
def test_column_name_list_table_deep_nesting():
|
||||
"""Test column_name_list for Table entity with deep nesting"""
|
||||
table = Table(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
name="test_table",
|
||||
columns=[
|
||||
Column(
|
||||
name="parent_col",
|
||||
dataType="STRING",
|
||||
children=[
|
||||
Column(
|
||||
name="child_col",
|
||||
dataType="STRING",
|
||||
children=[
|
||||
Column(name="grandchild_col", dataType="STRING"),
|
||||
Column(
|
||||
name="grandchild_col2",
|
||||
dataType="STRING",
|
||||
children=[
|
||||
Column(
|
||||
name="great_grandchild_col", dataType="STRING"
|
||||
)
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
],
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
expected_columns = {
|
||||
"parent_col",
|
||||
"parent_col.child_col",
|
||||
"parent_col.child_col.grandchild_col",
|
||||
"parent_col.child_col.grandchild_col2",
|
||||
"parent_col.child_col.grandchild_col2.great_grandchild_col",
|
||||
}
|
||||
result = column_name_list(table)
|
||||
assert result == expected_columns
|
||||
|
||||
|
||||
def test_column_name_list_dashboard():
|
||||
"""Test column_name_list for Dashboard entity"""
|
||||
dashboard = Dashboard(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
service=EntityReference(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
type="service",
|
||||
name="random_service",
|
||||
fullyQualifiedName="random_service",
|
||||
),
|
||||
name="test_dashboard",
|
||||
charts=[
|
||||
EntityReference(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
type="chart",
|
||||
name="chart1",
|
||||
fullyQualifiedName="dashboard.chart1",
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
expected_columns = {"chart1"}
|
||||
result = column_name_list(dashboard)
|
||||
assert result == expected_columns
|
||||
|
||||
|
||||
def test_column_name_list_container():
|
||||
"""Test column_name_list for Container entity"""
|
||||
container = Container(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
name="test_container",
|
||||
service=EntityReference(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
type="service",
|
||||
name="random_service",
|
||||
fullyQualifiedName="random_service",
|
||||
),
|
||||
dataModel=ContainerDataModel(
|
||||
columns=[
|
||||
Column(name="col1", dataType="STRING"),
|
||||
Column(name="col2", dataType="STRING"),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
expected_columns = {"col1", "col2"}
|
||||
result = column_name_list(container)
|
||||
assert result == expected_columns
|
||||
|
||||
|
||||
def test_column_name_list_container_nested():
|
||||
"""Test column_name_list for Container entity with nested columns"""
|
||||
container = Container(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
name="test_container",
|
||||
service=EntityReference(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
type="service",
|
||||
name="random_service",
|
||||
fullyQualifiedName="random_service",
|
||||
),
|
||||
dataModel=ContainerDataModel(
|
||||
columns=[
|
||||
Column(
|
||||
name="parent_col",
|
||||
dataType="STRING",
|
||||
children=[
|
||||
Column(
|
||||
name="child_col",
|
||||
dataType="STRING",
|
||||
children=[Column(name="grandchild_col", dataType="STRING")],
|
||||
)
|
||||
],
|
||||
)
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
expected_columns = {
|
||||
"parent_col",
|
||||
"parent_col.child_col",
|
||||
"parent_col.child_col.grandchild_col",
|
||||
}
|
||||
result = column_name_list(container)
|
||||
assert result == expected_columns
|
||||
|
||||
|
||||
def test_column_name_list_mlmodel():
|
||||
"""Test column_name_list for MlModel entity"""
|
||||
mlmodel = MlModel(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
name="test_model",
|
||||
mlFeatures=[MlFeature(name="feature1")],
|
||||
algorithm="test_algorithm",
|
||||
service=EntityReference(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
type="service",
|
||||
name="random_service",
|
||||
fullyQualifiedName="random_service",
|
||||
),
|
||||
)
|
||||
|
||||
expected_columns = {"feature1"}
|
||||
result = column_name_list(mlmodel)
|
||||
assert result == expected_columns
|
||||
|
||||
|
||||
def test_clean_lineage_columns():
|
||||
"""Test clean_lineage_columns function"""
|
||||
# Create test entities
|
||||
source_table = Table(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
name="source_table",
|
||||
fullyQualifiedName="database.schema.source_table",
|
||||
columns=[
|
||||
Column(name="col1", dataType="STRING"),
|
||||
Column(name="col2", dataType="STRING"),
|
||||
],
|
||||
)
|
||||
|
||||
target_table = Table(
|
||||
id=UUID("87654321-4321-8765-4321-876543210987"),
|
||||
name="target_table",
|
||||
fullyQualifiedName="database.schema.target_table",
|
||||
columns=[
|
||||
Column(name="col1", dataType="STRING"),
|
||||
Column(name="col2", dataType="STRING"),
|
||||
],
|
||||
)
|
||||
|
||||
# Create lineage request with valid and invalid columns
|
||||
lineage_request = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
type="table",
|
||||
name="source_table",
|
||||
fullyQualifiedName="database.schema.source_table",
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=UUID("87654321-4321-8765-4321-876543210987"),
|
||||
type="table",
|
||||
name="target_table",
|
||||
fullyQualifiedName="database.schema.target_table",
|
||||
),
|
||||
lineageDetails=LineageDetails(
|
||||
columnsLineage=[
|
||||
ColumnLineage(
|
||||
fromColumns=["database.schema.source_table.col1"],
|
||||
toColumn="database.schema.target_table.col1",
|
||||
),
|
||||
ColumnLineage(
|
||||
fromColumns=["database.schema.source_table.invalid_col"],
|
||||
toColumn="database.schema.target_table.invalid_col",
|
||||
),
|
||||
ColumnLineage(
|
||||
fromColumns=["database.schema.invalid_table.col1"],
|
||||
toColumn="database.schema.target_table.col1",
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
# Create metadata instance with test entities
|
||||
server_config = OpenMetadataConnection(
|
||||
hostPort="http://localhost:8585/api",
|
||||
authProvider="openmetadata",
|
||||
securityConfig=OpenMetadataJWTClientConfig(jwtToken="<token>"),
|
||||
)
|
||||
metadata = OpenMetadata(server_config)
|
||||
|
||||
metadata.get_by_id = (
|
||||
lambda entity, entity_id, fields: source_table
|
||||
if entity_id == UUID("12345678-1234-5678-1234-567812345678")
|
||||
else target_table
|
||||
)
|
||||
|
||||
# Clean lineage columns
|
||||
clean_lineage_columns(metadata, lineage_request)
|
||||
|
||||
# Verify that only valid columns remain
|
||||
assert len(lineage_request.edge.lineageDetails.columnsLineage) == 1
|
||||
assert lineage_request.edge.lineageDetails.columnsLineage[0].fromColumns == [
|
||||
FullyQualifiedEntityName("database.schema.source_table.col1")
|
||||
]
|
||||
assert lineage_request.edge.lineageDetails.columnsLineage[
|
||||
0
|
||||
].toColumn == FullyQualifiedEntityName("database.schema.target_table.col1")
|
||||
|
||||
|
||||
def test_column_name_list_dashboard_data_model():
|
||||
"""Test column_name_list for DashboardDataModel entity"""
|
||||
dashboard_data_model = DashboardDataModel(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
name="test_dashboard_data_model",
|
||||
service=EntityReference(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
type="service",
|
||||
name="random_service",
|
||||
fullyQualifiedName="random_service",
|
||||
),
|
||||
dataModelType=DataModelType.TableauDataModel,
|
||||
columns=[
|
||||
Column(
|
||||
name="parent_col",
|
||||
dataType="STRING",
|
||||
children=[
|
||||
Column(
|
||||
name="child_col",
|
||||
dataType="STRING",
|
||||
children=[Column(name="grandchild_col", dataType="STRING")],
|
||||
)
|
||||
],
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
expected_columns = {
|
||||
"parent_col",
|
||||
"parent_col.child_col",
|
||||
"parent_col.child_col.grandchild_col",
|
||||
}
|
||||
result = column_name_list(dashboard_data_model)
|
||||
assert result == expected_columns
|
||||
|
||||
|
||||
def test_clean_lineage_columns_topic_container():
|
||||
"""Test clean_lineage_columns function for topic to container lineage"""
|
||||
# Create test entities
|
||||
source_topic = Topic(
|
||||
partitions=1,
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
name="source_topic",
|
||||
fullyQualifiedName="service.source_topic",
|
||||
service=EntityReference(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
type="service",
|
||||
name="random_service",
|
||||
fullyQualifiedName="random_service",
|
||||
),
|
||||
messageSchema=TopicSchema(
|
||||
schemaFields=[
|
||||
FieldModel(name="field1", dataType=DataTypeTopic.STRING),
|
||||
FieldModel(name="field2", dataType=DataTypeTopic.STRING),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
target_container = Container(
|
||||
name="target_container",
|
||||
id=UUID("87654321-4321-8765-4321-876543210987"),
|
||||
service=EntityReference(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
type="service",
|
||||
name="random_service",
|
||||
fullyQualifiedName="random_service",
|
||||
),
|
||||
fullyQualifiedName="service.target_container",
|
||||
dataModel=ContainerDataModel(
|
||||
columns=[
|
||||
Column(name="col1", dataType="STRING"),
|
||||
Column(name="col2", dataType="STRING"),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
# Create lineage request with valid and invalid columns
|
||||
lineage_request = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(
|
||||
id=UUID("12345678-1234-5678-1234-567812345678"),
|
||||
type="topic",
|
||||
name="source_topic",
|
||||
fullyQualifiedName="service.source_topic",
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=UUID("87654321-4321-8765-4321-876543210987"),
|
||||
type="container",
|
||||
name="target_container",
|
||||
fullyQualifiedName="service.target_container",
|
||||
),
|
||||
lineageDetails=LineageDetails(
|
||||
columnsLineage=[
|
||||
ColumnLineage(
|
||||
fromColumns=["service.source_topic.field1"],
|
||||
toColumn="service.target_container.col1",
|
||||
),
|
||||
ColumnLineage(
|
||||
fromColumns=["service.source_topic.invalid_field"],
|
||||
toColumn="service.target_container.invalid_col",
|
||||
),
|
||||
ColumnLineage(
|
||||
fromColumns=["service.invalid_topic.field1"],
|
||||
toColumn="service.target_container.col1",
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
# Create metadata instance with test entities
|
||||
server_config = OpenMetadataConnection(
|
||||
hostPort="http://localhost:8585/api",
|
||||
authProvider="openmetadata",
|
||||
securityConfig=OpenMetadataJWTClientConfig(jwtToken="<token>"),
|
||||
)
|
||||
metadata = OpenMetadata(server_config)
|
||||
metadata.get_by_id = (
|
||||
lambda entity, entity_id, fields: source_topic
|
||||
if entity_id == UUID("12345678-1234-5678-1234-567812345678")
|
||||
else target_container
|
||||
)
|
||||
|
||||
# Clean lineage columns
|
||||
clean_lineage_columns(metadata, lineage_request)
|
||||
|
||||
# Verify that only valid columns remain
|
||||
assert len(lineage_request.edge.lineageDetails.columnsLineage) == 1
|
||||
assert lineage_request.edge.lineageDetails.columnsLineage[0].fromColumns == [
|
||||
FullyQualifiedEntityName("service.source_topic.field1")
|
||||
]
|
||||
assert lineage_request.edge.lineageDetails.columnsLineage[
|
||||
0
|
||||
].toColumn == FullyQualifiedEntityName("service.target_container.col1")
|
Loading…
x
Reference in New Issue
Block a user