887 lines
31 KiB
Python
Raw Permalink Normal View History

import unittest
from datetime import datetime
from datahub.emitter.mce_builder import make_tag_urn, make_ts_millis, make_user_urn
from datahub.emitter.mcp import (
MetadataChangeProposalWrapper,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import BIAssetSubTypes
from datahub.ingestion.source.hex.constants import HEX_PLATFORM_NAME
from datahub.ingestion.source.hex.mapper import Mapper
from datahub.ingestion.source.hex.model import (
Analytics,
Category,
Collection,
Component,
Owner,
Project,
Status,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStampClass,
ChangeAuditStampsClass,
OwnershipType,
)
from datahub.metadata.com.linkedin.pegasus2avro.dashboard import (
DashboardUsageStatistics,
)
from datahub.metadata.schema_classes import (
CalendarIntervalClass,
ContainerClass,
ContainerPropertiesClass,
DashboardInfoClass,
DataPlatformInstanceClass,
GlobalTagsClass,
MetadataChangeProposalClass,
OwnershipClass,
SubTypesClass,
TimeWindowSizeClass,
)
from datahub.metadata.urns import DashboardUrn
class TestMapper(unittest.TestCase):
workspace_name = "test-workspace"
created_at = datetime(2022, 1, 1, 0, 0, 0)
last_edited_at = datetime(2022, 1, 2, 0, 0, 0)
last_modified = ChangeAuditStampsClass(
created=AuditStampClass(
time=make_ts_millis(datetime(2022, 1, 1)),
actor="urn:li:corpuser:_ingestion",
),
lastModified=AuditStampClass(
time=make_ts_millis(datetime(2022, 1, 2)),
actor="urn:li:corpuser:_ingestion",
),
)
def test_map_workspace(self):
mapper = Mapper(
workspace_name=self.workspace_name,
)
work_units = list(mapper.map_workspace())
assert len(work_units) == 1
assert isinstance(work_units[0], MetadataWorkUnit) and isinstance(
work_units[0].metadata, MetadataChangeProposalWrapper
)
assert (
work_units[0].metadata.entityUrn
== "urn:li:container:635fbdd141a7b358624369c6060847c3"
)
aspect = work_units[0].get_aspect_of_type(ContainerPropertiesClass)
assert aspect and aspect.name == self.workspace_name and not aspect.env
mapper = Mapper(
workspace_name=self.workspace_name,
platform_instance="test-platform",
)
work_units = list(mapper.map_workspace())
assert len(work_units) == 1
assert isinstance(work_units[0], MetadataWorkUnit) and isinstance(
work_units[0].metadata, MetadataChangeProposalWrapper
)
assert (
work_units[0].metadata.entityUrn
== "urn:li:container:635fbdd141a7b358624369c6060847c3"
)
aspect = work_units[0].get_aspect_of_type(ContainerPropertiesClass)
assert aspect and aspect.name == self.workspace_name and not aspect.env
mapper = Mapper(
workspace_name=self.workspace_name,
env="test-env",
platform_instance="test-platform",
)
work_units = list(mapper.map_workspace())
assert len(work_units) == 1
assert isinstance(work_units[0], MetadataWorkUnit) and isinstance(
work_units[0].metadata, MetadataChangeProposalWrapper
)
# guid here is the same as before because by default env is ignored in the key
assert (
work_units[0].metadata.entityUrn
== "urn:li:container:635fbdd141a7b358624369c6060847c3"
)
aspect = work_units[0].get_aspect_of_type(ContainerPropertiesClass)
assert (
aspect and aspect.name == self.workspace_name and aspect.env == "test-env"
)
def test_map_project(self):
mapper = Mapper(
workspace_name=self.workspace_name,
patch_metadata=False,
)
project = Project(
id="uuid1",
title="Test Project",
description="A test project",
created_at=self.created_at,
last_edited_at=self.last_edited_at,
status=Status(name="Published"),
categories=[Category(name="Category1"), Category(name="Category2")],
collections=[Collection(name="Collection1")],
creator=Owner(email="creator@example.com"),
owner=Owner(email="owner@example.com"),
analytics=Analytics(
appviews_all_time=100,
appviews_last_7_days=10,
appviews_last_14_days=20,
appviews_last_30_days=30,
last_viewed_at=datetime(2022, 1, 1, 0, 0, 0),
),
)
# check URNs
work_units = list(mapper.map_project(project))
assert len(work_units) == 8
assert all(
isinstance(wu.metadata, MetadataChangeProposalWrapper)
and wu.metadata.entityUrn == "urn:li:dashboard:(hex,uuid1)"
for wu in work_units
)
# check DashboardInfoClass
dashboard_info_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DashboardInfoClass)
]
assert len(dashboard_info_wus) == 1
assert isinstance(
dashboard_info_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(dashboard_info_wus[0].metadata.aspect, DashboardInfoClass)
assert dashboard_info_wus[0].metadata.aspect.title == "Test Project"
assert dashboard_info_wus[0].metadata.aspect.description == "A test project"
assert (
dashboard_info_wus[0].metadata.aspect.externalUrl
== "https://app.hex.tech/test-workspace/hex/uuid1"
)
assert dashboard_info_wus[0].metadata.aspect.customProperties == {
"id": "uuid1",
}
# check SubTypesClass
subtypes_wus = [wu for wu in work_units if wu.get_aspect_of_type(SubTypesClass)]
assert len(subtypes_wus) == 1
assert isinstance(
subtypes_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(subtypes_wus[0].metadata.aspect, SubTypesClass)
assert subtypes_wus[0].metadata.aspect.typeNames == [
BIAssetSubTypes.HEX_PROJECT
]
# check DataPlatformInstanceClass
platform_instance_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DataPlatformInstanceClass)
]
assert len(platform_instance_wus) == 1
assert isinstance(
platform_instance_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(
platform_instance_wus[0].metadata.aspect, DataPlatformInstanceClass
)
assert (
platform_instance_wus[0].metadata.aspect.platform
== "urn:li:dataPlatform:hex"
)
assert platform_instance_wus[0].metadata.aspect.instance is None
# check ContainerClass
container_wus = [
wu for wu in work_units if wu.get_aspect_of_type(ContainerClass)
]
assert len(container_wus) == 1
assert isinstance(
container_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(container_wus[0].metadata.aspect, ContainerClass)
assert (
container_wus[0].metadata.aspect.container
== "urn:li:container:635fbdd141a7b358624369c6060847c3"
)
# check GlobalTagsClass
tags_wus = [wu for wu in work_units if wu.get_aspect_of_type(GlobalTagsClass)]
assert len(tags_wus) == 1
assert isinstance(
tags_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(tags_wus[0].metadata.aspect, GlobalTagsClass)
assert len(tags_wus[0].metadata.aspect.tags) == 4
tag_urns = {tag.tag for tag in tags_wus[0].metadata.aspect.tags}
assert tag_urns == {
"urn:li:tag:hex:status:Published",
"urn:li:tag:hex:category:Category1",
"urn:li:tag:hex:category:Category2",
"urn:li:tag:hex:collection:Collection1",
}
# check OwnershipClass
ownership_wus = [
wu for wu in work_units if wu.get_aspect_of_type(OwnershipClass)
]
assert len(ownership_wus) == 1
assert isinstance(
ownership_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(ownership_wus[0].metadata.aspect, OwnershipClass)
assert len(ownership_wus[0].metadata.aspect.owners) == 2
owner_urns = {owner.owner for owner in ownership_wus[0].metadata.aspect.owners}
assert owner_urns == {
"urn:li:corpuser:creator@example.com",
"urn:li:corpuser:owner@example.com",
}
assert all(
[
owner.type == OwnershipType.TECHNICAL_OWNER
for owner in ownership_wus[0].metadata.aspect.owners
]
)
# check DashboardUsageStatistics
dashboard_usage_statistics_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DashboardUsageStatistics)
]
assert len(dashboard_usage_statistics_wus) == 2
usage_stats_all_time_wu = dashboard_usage_statistics_wus[0]
usage_stats_last_7_days_wu = dashboard_usage_statistics_wus[1]
assert (
isinstance(usage_stats_all_time_wu.metadata, MetadataChangeProposalWrapper)
and isinstance(
usage_stats_all_time_wu.metadata.aspect, DashboardUsageStatistics
)
and isinstance(
usage_stats_last_7_days_wu.metadata, MetadataChangeProposalWrapper
)
and isinstance(
usage_stats_last_7_days_wu.metadata.aspect, DashboardUsageStatistics
)
)
assert (
usage_stats_all_time_wu.metadata.aspect.viewsCount == 100
and usage_stats_last_7_days_wu.metadata.aspect.viewsCount == 10
)
assert (
not usage_stats_all_time_wu.metadata.aspect.eventGranularity
and usage_stats_last_7_days_wu.metadata.aspect.eventGranularity
== TimeWindowSizeClass(unit=CalendarIntervalClass.WEEK, multiple=1)
)
assert (
usage_stats_all_time_wu.metadata.aspect.lastViewedAt
== usage_stats_last_7_days_wu.metadata.aspect.lastViewedAt
== make_ts_millis(datetime(2022, 1, 1))
)
# what if we set patch_metadata to True
mapper = Mapper(
workspace_name=self.workspace_name,
patch_metadata=True,
)
# mostly the same
work_units = list(mapper.map_project(project))
assert len(work_units) == 8
assert all(
isinstance(
wu.metadata,
(MetadataChangeProposalWrapper, MetadataChangeProposalClass),
)
and wu.metadata.entityUrn == "urn:li:dashboard:(hex,uuid1)"
for wu in work_units
)
# but DashboardInfo patch
patche_wus = [
wu
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalClass)
and wu.metadata.changeType == "PATCH"
]
assert len(patche_wus) == 1
assert isinstance(patche_wus[0].metadata, MetadataChangeProposalClass)
assert patche_wus[0].metadata.aspectName == "dashboardInfo"
# what if we set platform_instance
mapper = Mapper(
workspace_name=self.workspace_name,
platform_instance="test-platform",
)
# mostly the same but additional instance DataPlatformInstanceClass
work_units = list(mapper.map_project(project))
assert len(work_units) == 8
platform_instance_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DataPlatformInstanceClass)
]
assert len(platform_instance_wus) == 1
assert isinstance(
platform_instance_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(
platform_instance_wus[0].metadata.aspect, DataPlatformInstanceClass
)
assert (
platform_instance_wus[0].metadata.aspect.platform
== "urn:li:dataPlatform:hex"
)
assert (
platform_instance_wus[0].metadata.aspect.instance
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:hex,test-platform)"
)
def test_map_component(self):
mapper = Mapper(
workspace_name=self.workspace_name,
patch_metadata=False,
)
component = Component(
id="uuid1",
title="Test Component",
description="A test component",
created_at=self.created_at,
last_edited_at=self.last_edited_at,
status=Status(name="Draft"),
categories=[Category(name="Category3")],
collections=[Collection(name="Collection2")],
creator=Owner(email="creator@example.com"),
owner=Owner(email="owner@example.com"),
analytics=Analytics(
appviews_all_time=100,
appviews_last_7_days=10,
appviews_last_14_days=20,
appviews_last_30_days=30,
last_viewed_at=datetime(2022, 1, 1, 0, 0, 0),
),
)
# check URNs
work_units = list(mapper.map_component(component))
assert len(work_units) == 8
assert all(
isinstance(wu.metadata, MetadataChangeProposalWrapper)
and wu.metadata.entityUrn == "urn:li:dashboard:(hex,uuid1)"
for wu in work_units
)
# check DashboardInfoClass
dashboard_info_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DashboardInfoClass)
]
assert len(dashboard_info_wus) == 1
assert isinstance(
dashboard_info_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(dashboard_info_wus[0].metadata.aspect, DashboardInfoClass)
assert dashboard_info_wus[0].metadata.aspect.title == "Test Component"
assert dashboard_info_wus[0].metadata.aspect.description == "A test component"
assert (
dashboard_info_wus[0].metadata.aspect.externalUrl
== "https://app.hex.tech/test-workspace/hex/uuid1"
)
assert dashboard_info_wus[0].metadata.aspect.customProperties == {"id": "uuid1"}
# check SubTypesClass
subtypes_wus = [wu for wu in work_units if wu.get_aspect_of_type(SubTypesClass)]
assert len(subtypes_wus) == 1
assert isinstance(
subtypes_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(subtypes_wus[0].metadata.aspect, SubTypesClass)
assert subtypes_wus[0].metadata.aspect.typeNames == [
BIAssetSubTypes.HEX_COMPONENT
]
# check DataPlatformInstanceClass
platform_instance_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DataPlatformInstanceClass)
]
assert len(platform_instance_wus) == 1
assert isinstance(
platform_instance_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(
platform_instance_wus[0].metadata.aspect, DataPlatformInstanceClass
)
assert (
platform_instance_wus[0].metadata.aspect.platform
== "urn:li:dataPlatform:hex"
)
assert platform_instance_wus[0].metadata.aspect.instance is None
# check ContainerClass
container_wus = [
wu for wu in work_units if wu.get_aspect_of_type(ContainerClass)
]
assert len(container_wus) == 1
assert isinstance(
container_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(container_wus[0].metadata.aspect, ContainerClass)
assert (
container_wus[0].metadata.aspect.container
== "urn:li:container:635fbdd141a7b358624369c6060847c3"
)
# check GlobalTagsClass
tags_wus = [wu for wu in work_units if wu.get_aspect_of_type(GlobalTagsClass)]
assert len(tags_wus) == 1
assert isinstance(
tags_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(tags_wus[0].metadata.aspect, GlobalTagsClass)
assert len(tags_wus[0].metadata.aspect.tags) == 3
tag_urns = {tag.tag for tag in tags_wus[0].metadata.aspect.tags}
assert tag_urns == {
"urn:li:tag:hex:status:Draft",
"urn:li:tag:hex:category:Category3",
"urn:li:tag:hex:collection:Collection2",
}
# check OwnershipClass
ownership_wus = [
wu for wu in work_units if wu.get_aspect_of_type(OwnershipClass)
]
assert len(ownership_wus) == 1
assert isinstance(
ownership_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(ownership_wus[0].metadata.aspect, OwnershipClass)
assert len(ownership_wus[0].metadata.aspect.owners) == 2
owner_urns = {owner.owner for owner in ownership_wus[0].metadata.aspect.owners}
assert owner_urns == {
"urn:li:corpuser:creator@example.com",
"urn:li:corpuser:owner@example.com",
}
assert all(
[
owner.type == OwnershipType.TECHNICAL_OWNER
for owner in ownership_wus[0].metadata.aspect.owners
]
)
# check DashboardUsageStatistics
dashboard_usage_statistics_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DashboardUsageStatistics)
]
assert len(dashboard_usage_statistics_wus) == 2
usage_stats_all_time_wu = dashboard_usage_statistics_wus[0]
usage_stats_last_7_days_wu = dashboard_usage_statistics_wus[1]
assert (
isinstance(usage_stats_all_time_wu.metadata, MetadataChangeProposalWrapper)
and isinstance(
usage_stats_all_time_wu.metadata.aspect, DashboardUsageStatistics
)
and isinstance(
usage_stats_last_7_days_wu.metadata, MetadataChangeProposalWrapper
)
and isinstance(
usage_stats_last_7_days_wu.metadata.aspect, DashboardUsageStatistics
)
)
assert (
usage_stats_all_time_wu.metadata.aspect.viewsCount == 100
and usage_stats_last_7_days_wu.metadata.aspect.viewsCount == 10
)
assert (
not usage_stats_all_time_wu.metadata.aspect.eventGranularity
and usage_stats_last_7_days_wu.metadata.aspect.eventGranularity
== TimeWindowSizeClass(unit=CalendarIntervalClass.WEEK, multiple=1)
)
assert (
usage_stats_all_time_wu.metadata.aspect.lastViewedAt
== usage_stats_last_7_days_wu.metadata.aspect.lastViewedAt
== make_ts_millis(datetime(2022, 1, 1))
)
# what if we set patch_metadata to True
mapper = Mapper(
workspace_name=self.workspace_name,
patch_metadata=True,
)
# mostly the same
work_units = list(mapper.map_component(component))
assert len(work_units) == 8
assert all(
isinstance(
wu.metadata,
(MetadataChangeProposalWrapper, MetadataChangeProposalClass),
)
and wu.metadata.entityUrn == "urn:li:dashboard:(hex,uuid1)"
for wu in work_units
)
# but DashboardInfo patch
patche_wus = [
wu
for wu in work_units
if isinstance(
wu.metadata,
(MetadataChangeProposalWrapper, MetadataChangeProposalClass),
)
and wu.metadata.changeType == "PATCH"
]
assert len(patche_wus) == 1
assert isinstance(patche_wus[0].metadata, MetadataChangeProposalClass)
assert patche_wus[0].metadata.aspectName == "dashboardInfo"
# what if we set platform_instance
mapper = Mapper(
workspace_name=self.workspace_name,
platform_instance="test-platform",
)
# mostly the same but additional DataPlatformInstanceClass
work_units = list(mapper.map_component(component))
assert len(work_units) == 8
platform_instance_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DataPlatformInstanceClass)
]
assert len(platform_instance_wus) == 1
assert isinstance(
platform_instance_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(
platform_instance_wus[0].metadata.aspect, DataPlatformInstanceClass
)
assert (
platform_instance_wus[0].metadata.aspect.platform
== "urn:li:dataPlatform:hex"
)
assert (
platform_instance_wus[0].metadata.aspect.instance
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:hex,test-platform)"
)
def test_global_tags_status(self):
status = Status(name="Published")
mapper = Mapper(
workspace_name=self.workspace_name,
)
tags = mapper._global_tags(status, None, None)
assert tags is not None
assert len(tags.tags) == 1
assert tags.tags[0].tag == make_tag_urn("hex:status:Published")
mapper = Mapper(
workspace_name=self.workspace_name,
status_as_tag=False,
)
tags = mapper._global_tags(status, None, None)
assert tags is None
def test_global_tags_categories(self):
categories = [Category(name="Category1"), Category(name="Category2")]
mapper = Mapper(
workspace_name=self.workspace_name,
)
tags = mapper._global_tags(None, categories, None)
assert tags is not None
assert len(tags.tags) == 2
assert tags.tags[0].tag == make_tag_urn("hex:category:Category1")
assert tags.tags[1].tag == make_tag_urn("hex:category:Category2")
mapper = Mapper(
workspace_name=self.workspace_name,
categories_as_tags=False,
)
tags = mapper._global_tags(None, categories, None)
assert tags is None
def test_global_tags_collections(self):
collections = [Collection(name="Collection1")]
mapper = Mapper(
workspace_name=self.workspace_name,
)
tags = mapper._global_tags(None, None, collections)
assert tags is not None
assert len(tags.tags) == 1
assert tags.tags[0].tag == make_tag_urn("hex:collection:Collection1")
mapper = Mapper(
workspace_name=self.workspace_name,
collections_as_tags=False,
)
tags = mapper._global_tags(None, None, collections)
assert tags is None
def test_global_tags_all(self):
status = Status(name="Published")
categories = [Category(name="Category1"), Category(name="Category2")]
collections = [Collection(name="Collection1")]
mapper = Mapper(
workspace_name=self.workspace_name,
)
tags = mapper._global_tags(status, categories, collections)
assert tags is not None
assert len(tags.tags) == 4
def test_ownership(self):
mapper = Mapper(
workspace_name=self.workspace_name,
)
creator = Owner(email="creator@example.com")
owner = Owner(email="owner@example.com")
ownership = mapper._ownership(creator, owner)
assert ownership is not None
assert len(ownership.owners) == 2
creator_owner = next(
(
o
for o in ownership.owners
if o.owner == make_user_urn("creator@example.com")
),
None,
)
assert creator_owner is not None
assert creator_owner.type == OwnershipType.TECHNICAL_OWNER
primary_owner = next(
(
o
for o in ownership.owners
if o.owner == make_user_urn("owner@example.com")
),
None,
)
assert primary_owner is not None
assert primary_owner.type == OwnershipType.TECHNICAL_OWNER
ownership = mapper._ownership(creator, creator)
assert ownership is not None
assert len(ownership.owners) == 1
ownership = mapper._ownership(None, None)
assert ownership is None
mapper = Mapper(
workspace_name=self.workspace_name,
set_ownership_from_email=False,
)
ownership = mapper._ownership(creator, owner)
assert ownership is None
def test_dashboard_usage_statistics(self):
mapper = Mapper(
workspace_name=self.workspace_name,
)
analytics = Analytics(
appviews_all_time=100,
appviews_last_7_days=10,
appviews_last_14_days=20,
appviews_last_30_days=30,
last_viewed_at=datetime(2022, 1, 1, 0, 0, 0),
)
usage_stats_all_time, usage_stats_last_7_days = (
mapper._dashboard_usage_statistics(analytics)
)
assert usage_stats_all_time and usage_stats_last_7_days
assert (
usage_stats_all_time.viewsCount == 100
and usage_stats_last_7_days.viewsCount == 10
)
assert (
not usage_stats_all_time.eventGranularity
and usage_stats_last_7_days.eventGranularity
== TimeWindowSizeClass(unit=CalendarIntervalClass.WEEK, multiple=1)
)
assert (
usage_stats_all_time.lastViewedAt
== usage_stats_last_7_days.lastViewedAt
== make_ts_millis(datetime(2022, 1, 1))
)
analytics = Analytics(
appviews_all_time=None,
appviews_last_7_days=None,
appviews_last_14_days=None,
appviews_last_30_days=None,
last_viewed_at=None,
)
usage_stats_all_time, usage_stats_last_7_days = (
mapper._dashboard_usage_statistics(analytics)
)
assert not usage_stats_all_time and not usage_stats_last_7_days
analytics = Analytics(
appviews_all_time=None,
appviews_last_7_days=10,
appviews_last_14_days=None,
appviews_last_30_days=None,
last_viewed_at=None,
)
usage_stats_all_time, usage_stats_last_7_days = (
mapper._dashboard_usage_statistics(analytics)
)
assert not usage_stats_all_time and usage_stats_last_7_days
assert usage_stats_last_7_days.viewsCount == 10
assert usage_stats_last_7_days.eventGranularity == TimeWindowSizeClass(
unit=CalendarIntervalClass.WEEK, multiple=1
)
assert usage_stats_last_7_days.lastViewedAt is None
def test_platform_instance_aspect(self):
mapper = Mapper(
workspace_name=self.workspace_name,
)
platform_instance = mapper._platform_instance_aspect()
assert platform_instance
assert platform_instance.platform == "urn:li:dataPlatform:hex"
assert platform_instance.instance is None
mapper = Mapper(
workspace_name=self.workspace_name,
platform_instance="test-platform",
)
platform_instance = mapper._platform_instance_aspect()
assert platform_instance
assert platform_instance.platform == "urn:li:dataPlatform:hex"
assert (
platform_instance.instance
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:hex,test-platform)"
)
def test_get_dashboard_urn(self):
mapper = Mapper(
workspace_name=self.workspace_name,
)
dashboard_urn = mapper._get_dashboard_urn("dashboard_name")
assert dashboard_urn == DashboardUrn(
dashboard_id="dashboard_name", dashboard_tool=HEX_PLATFORM_NAME
)
assert dashboard_urn.urn() == "urn:li:dashboard:(hex,dashboard_name)"
mapper = Mapper(
workspace_name=self.workspace_name,
platform_instance="test-platform",
)
dashboard_urn = mapper._get_dashboard_urn("dashboard_name")
assert dashboard_urn == DashboardUrn(
dashboard_id="test-platform.dashboard_name",
dashboard_tool=HEX_PLATFORM_NAME,
)
assert (
dashboard_urn.urn() == "urn:li:dashboard:(hex,test-platform.dashboard_name)"
)
def test_dataset_edges(self):
from datahub.metadata.schema_classes import EdgeClass
from datahub.metadata.urns import DatasetUrn, SchemaFieldUrn
mapper = Mapper(
workspace_name=self.workspace_name,
)
# Test with empty list
edges = mapper._dataset_edges([])
assert not edges
# Test with only DatasetUrns
dataset_urn1 = DatasetUrn(
platform="snowflake",
name="test-dataset-1",
)
dataset_urn2 = DatasetUrn(
platform="bigquery",
name="test-dataset-2",
)
edges = mapper._dataset_edges([dataset_urn1, dataset_urn2])
assert edges and len(edges) == 2
assert all(isinstance(edge, EdgeClass) for edge in edges)
assert edges[0].destinationUrn == dataset_urn1.urn()
assert edges[1].destinationUrn == dataset_urn2.urn()
# Test with mixed DatasetUrns and SchemaFieldUrns - should filter out SchemaFieldUrns
schema_field_urn = SchemaFieldUrn(
parent=dataset_urn1,
field_path="test.field.path",
)
edges = mapper._dataset_edges([dataset_urn1, schema_field_urn, dataset_urn2])
assert edges and len(edges) == 2 # SchemaFieldUrn should be filtered out
assert edges[0].destinationUrn == dataset_urn1.urn()
assert edges[1].destinationUrn == dataset_urn2.urn()
def test_map_project_with_upstream_datasets(self):
from datahub.metadata.urns import DatasetUrn, SchemaFieldUrn
# Create a project with upstream datasets
dataset_urn1 = DatasetUrn(
platform="snowflake",
name="test-dataset-1",
)
dataset_urn2 = DatasetUrn(
platform="bigquery",
name="test-dataset-2",
)
schema_field_urn = SchemaFieldUrn(
parent=dataset_urn1,
field_path="test.field.path",
)
project = Project(
id="uuid1",
title="Test Project With Lineage",
description="A test project with upstream datasets",
created_at=self.created_at,
last_edited_at=self.last_edited_at,
status=Status(name="Published"),
creator=Owner(email="creator@example.com"),
owner=Owner(email="owner@example.com"),
upstream_datasets=[dataset_urn1, schema_field_urn, dataset_urn2],
)
mapper = Mapper(
workspace_name=self.workspace_name,
patch_metadata=False,
)
work_units = list(mapper.map_project(project))
dashboard_info_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DashboardInfoClass)
]
assert len(dashboard_info_wus) == 1
dashboard_info = dashboard_info_wus[0].get_aspect_of_type(DashboardInfoClass)
# Verify dataset edges
assert (
dashboard_info
and dashboard_info.datasetEdges
and len(dashboard_info.datasetEdges) == 2
)
edge_urns = [edge.destinationUrn for edge in dashboard_info.datasetEdges]
assert dataset_urn1.urn() in edge_urns
assert dataset_urn2.urn() in edge_urns
assert schema_field_urn.urn() not in edge_urns # Should be filtered out