datahub/metadata-ingestion/tests/unit/test_file_lineage_source.py
Harshal Sheth 25d9d6656c
feat(ingest): fix validators (#10115)
Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
2024-03-27 15:20:55 -07:00

221 lines
6.3 KiB
Python

import logging
from typing import List
import pytest
import yaml
from pydantic import ValidationError
from datahub.ingestion.source.metadata.lineage import LineageConfig, _get_lineage_mcp
from datahub.metadata.schema_classes import FineGrainedLineageClass, UpstreamClass
logger = logging.getLogger(__name__)
@pytest.fixture
def basic_mcp():
"""
The below mcp should represent a lineage that looks like this
topic1 -
->topic3
topic2 -
:return:
"""
sample_lineage = """
lineage:
- entity:
name: topic3
type: dataset
env: DEV
platform: kafka
upstream:
- entity:
name: topic1
type: dataset
env: DEV
platform: kafka
- entity:
name: topic2
type: dataset
env: DEV
platform: kafka
fineGrainedLineages:
- upstreamType: FIELD_SET
upstreams:
- urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,topic1,PROD),user_id)
downstreamType: FIELD_SET
downstreams:
- urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,topic3,PROD),user_id)
confidenceScore: 0.9
transformOperation: func1
"""
config = yaml.safe_load(sample_lineage)
lineage_config: LineageConfig = LineageConfig.parse_obj(config)
return _get_lineage_mcp(lineage_config.lineage[0], False)
def unsupported_entity_type_mcp():
sample_lineage = """
lineage:
- entity:
name: topic3
type: NotSupported!
env: DEV
platform: kafka
upstream:
- entity:
name: topic1
type: dataset
env: DEV
platform: kafka
- entity:
name: topic2
type: dataset
env: DEV
platform: kafka
- entity:
name: topic6
type: dataset
env: DEV
platform: kafka
upstream:
- entity:
name: topic4
type: dataset
env: DEV
platform: kafka
- entity:
name: topic5
type: dataset
env: DEV
platform: kafka
"""
config = yaml.safe_load(sample_lineage)
return LineageConfig.parse_obj(config)
def unsupported_upstream_entity_type_mcp():
sample_lineage = """
lineage:
- entity:
name: topic3
type: dataset
env: DEV
platform: kafka
upstream:
- entity:
name: topic1
type: NotSupported
env: DEV
platform: kafka
- entity:
name: topic2
type: dataset
env: DEV
platform: kafka
"""
config = yaml.safe_load(sample_lineage)
return LineageConfig.parse_obj(config)
def unsupported_entity_env_mcp():
sample_lineage = """
lineage:
- entity:
name: topic3
type: dataset
env: NotSupported!
platform: kafka
upstream:
- entity:
name: topic1
type: dataset
env: DEV
platform: kafka
- entity:
name: topic2
type: dataset
env: DEV
platform: kafka
- entity:
name: topic6
type: dataset
env: DEV
platform: kafka
upstream:
- entity:
name: topic4
type: dataset
env: DEV
platform: kafka
- entity:
name: topic5
type: dataset
env: DEV
platform: kafka
"""
config = yaml.safe_load(sample_lineage)
return LineageConfig.parse_obj(config)
def test_basic_lineage_entity_root_node_urn(basic_mcp):
"""
Checks to see if the entityUrn extracted is correct for the root entity node
"""
assert (
basic_mcp.entityUrn == "urn:li:dataset:(urn:li:dataPlatform:kafka,topic3,DEV)"
)
def test_basic_lineage_upstream_urns(basic_mcp):
"""
Checks to see if the upstream urns are correct for a basic_mcp example
"""
basic_mcp_upstreams: List[UpstreamClass] = basic_mcp.aspect.upstreams
assert (
basic_mcp_upstreams[0].dataset
== "urn:li:dataset:(urn:li:dataPlatform:kafka,topic1,DEV)"
and basic_mcp_upstreams[1].dataset
== "urn:li:dataset:(urn:li:dataPlatform:kafka,topic2,DEV)"
)
def test_basic_lineage_finegrained_upstream_urns(basic_mcp):
"""
Checks to see if the finegrained urns are correct for a basic_mcp example
"""
fine_grained_lineage: FineGrainedLineageClass = (
basic_mcp.aspect.fineGrainedLineages[0]
)
assert fine_grained_lineage.upstreamType == "FIELD_SET"
assert fine_grained_lineage.downstreamType == "FIELD_SET"
assert fine_grained_lineage.confidenceScore == 0.9
assert fine_grained_lineage.transformOperation == "func1"
def test_unsupported_entity_type():
"""
Checks to see how we handle the case of unsupported entity types.
If validation is working correctly, it should raise a ValidationError
"""
with pytest.raises(ValidationError):
unsupported_entity_type_mcp()
def test_unsupported_upstream_entity_type():
"""
Checks to see how invalid types work in the upstream node.
If validation is working correctly, it should raise a ValidationError
"""
with pytest.raises(ValidationError):
unsupported_upstream_entity_type_mcp()
def test_unsupported_entity_env():
"""
Checks to see how invalid envs work.
If validation is working correctly, it should raise a ValidationError
"""
with pytest.raises(ValidationError):
unsupported_entity_env_mcp()