2022-03-07 13:14:29 -08:00
|
|
|
from typing import Iterable
|
|
|
|
|
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
2023-05-17 00:01:57 -04:00
|
|
|
from datahub.ingestion.api.common import PipelineContext
|
2022-03-07 13:14:29 -08:00
|
|
|
from datahub.ingestion.api.source import Source, SourceReport
|
2023-05-17 00:01:57 -04:00
|
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
2025-07-03 17:07:39 +05:30
|
|
|
from datahub.metadata.schema_classes import (
|
|
|
|
StatusClass,
|
|
|
|
SubTypesClass,
|
|
|
|
)
|
2022-03-07 13:14:29 -08:00
|
|
|
from datahub.utilities.urns.dataset_urn import DatasetUrn
|
|
|
|
|
|
|
|
|
2025-07-03 17:07:39 +05:30
|
|
|
def _get_urn() -> str:
|
|
|
|
return str(
|
|
|
|
DatasetUrn.create_from_ids(
|
|
|
|
platform_id="elasticsearch",
|
|
|
|
table_name="fooIndex",
|
|
|
|
env="PROD",
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2022-05-26 12:39:40 -04:00
|
|
|
class FakeSource(Source):
|
2023-05-24 16:36:19 -04:00
|
|
|
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
|
2022-03-07 13:14:29 -08:00
|
|
|
return [
|
2023-05-17 00:01:57 -04:00
|
|
|
MetadataWorkUnit(
|
2022-03-07 13:14:29 -08:00
|
|
|
id="test-workunit",
|
|
|
|
mcp=MetadataChangeProposalWrapper(
|
2025-07-03 17:07:39 +05:30
|
|
|
entityUrn=_get_urn(),
|
2022-03-07 13:14:29 -08:00
|
|
|
aspect=StatusClass(removed=False),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
]
|
|
|
|
|
|
|
|
def __init__(self, ctx: PipelineContext):
|
2023-06-02 15:50:38 -04:00
|
|
|
super().__init__(ctx)
|
2022-03-07 13:14:29 -08:00
|
|
|
self.source_report = SourceReport()
|
|
|
|
|
|
|
|
@classmethod
|
2022-05-26 12:39:40 -04:00
|
|
|
def create(cls, config_dict: dict, ctx: PipelineContext) -> "FakeSource":
|
|
|
|
return FakeSource(ctx)
|
2022-03-07 13:14:29 -08:00
|
|
|
|
|
|
|
def get_report(self) -> SourceReport:
|
|
|
|
return self.source_report
|
|
|
|
|
|
|
|
def close(self) -> None:
|
|
|
|
return super().close()
|
2025-07-03 17:07:39 +05:30
|
|
|
|
|
|
|
|
|
|
|
def test_aspects_by_subtypes():
|
|
|
|
source = FakeSource(PipelineContext(run_id="test_aspects_by_subtypes"))
|
|
|
|
for wu in source.get_workunits_internal():
|
|
|
|
source.source_report.report_workunit(wu)
|
|
|
|
|
|
|
|
source.source_report.compute_stats()
|
|
|
|
assert source.source_report.get_aspects_by_subtypes_dict() == {
|
|
|
|
"dataset": {
|
|
|
|
"unknown": {"status": 1},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
source.source_report.report_workunit(
|
|
|
|
MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=_get_urn(),
|
|
|
|
aspect=SubTypesClass(typeNames=["Table"]),
|
|
|
|
).as_workunit()
|
|
|
|
)
|
|
|
|
source.source_report.compute_stats()
|
|
|
|
assert source.source_report.get_aspects_by_subtypes_dict() == {
|
|
|
|
"dataset": {
|
|
|
|
"Table": {"status": 1, "subTypes": 1},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def test_discretize_dict_values():
|
|
|
|
"""Test the _discretize_dict_values static method."""
|
|
|
|
test_dict = {
|
|
|
|
"dataset": {
|
|
|
|
"schemaMetadata": 5,
|
|
|
|
"status": 12,
|
|
|
|
"ownership": 3,
|
|
|
|
},
|
|
|
|
"chart": {
|
|
|
|
"status": 8,
|
|
|
|
"ownership": 1,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
result = SourceReport._discretize_dict_values(test_dict)
|
|
|
|
assert result == {
|
|
|
|
"dataset": {
|
|
|
|
"schemaMetadata": 4,
|
|
|
|
"status": 8,
|
|
|
|
"ownership": 2,
|
|
|
|
},
|
|
|
|
"chart": {
|
|
|
|
"status": 8,
|
|
|
|
"ownership": 1,
|
|
|
|
},
|
|
|
|
}
|