584 lines
21 KiB
Python
Raw Normal View History

import logging
from datetime import datetime
from typing import Any, Dict, Iterable, List, Union
from unittest.mock import patch
import pytest
from freezegun import freeze_time
import datahub.metadata.schema_classes as models
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mce_builder import (
make_container_urn,
make_dataplatform_instance_urn,
make_dataset_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_empty_dataset_usage_statistics,
auto_lowercase_urns,
auto_status_aspect,
auto_workunit,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
_base_metadata: List[
Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]
] = [
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a",
aspect=models.ContainerPropertiesClass(
name="test",
),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:108e111aa1d250dd52e0fd5d4b307b12",
aspect=models.StatusClass(removed=True),
),
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.staffing,PROD)",
aspects=[
models.DatasetPropertiesClass(
customProperties={
"key": "value",
},
),
],
),
),
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.hospital_beds,PROD)",
aspects=[
models.StatusClass(removed=True),
],
),
),
]
def test_auto_workunit():
wu = list(auto_workunit(_base_metadata))
assert all(isinstance(w, MetadataWorkUnit) for w in wu)
ids = [w.id for w in wu]
assert ids == [
"urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a-containerProperties",
"urn:li:container:108e111aa1d250dd52e0fd5d4b307b12-status",
"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.staffing,PROD)/mce",
"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.hospital_beds,PROD)/mce",
]
def test_auto_status_aspect():
initial_wu = list(auto_workunit(_base_metadata))
expected = [
*initial_wu,
*list(
auto_workunit(
[
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a",
aspect=models.StatusClass(removed=False),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.staffing,PROD)",
aspect=models.StatusClass(removed=False),
),
]
)
),
]
assert list(auto_status_aspect(initial_wu)) == expected
def _create_container_aspects(
d: Dict[str, Any],
other_aspects: Dict[str, List[models._Aspect]] = {},
root: bool = True,
) -> Iterable[MetadataWorkUnit]:
for k, v in d.items():
urn = make_container_urn(k)
yield MetadataChangeProposalWrapper(
entityUrn=urn, aspect=models.StatusClass(removed=False)
).as_workunit()
for aspect in other_aspects.pop(k, []):
yield MetadataChangeProposalWrapper(
entityUrn=urn, aspect=aspect
).as_workunit()
for child in list(v):
yield MetadataChangeProposalWrapper(
entityUrn=make_container_urn(child),
aspect=models.ContainerClass(container=urn),
).as_workunit()
if isinstance(v, dict):
yield from _create_container_aspects(
v, other_aspects=other_aspects, root=False
)
if root:
for k, v in other_aspects.items():
for aspect in v:
yield MetadataChangeProposalWrapper(
entityUrn=make_container_urn(k), aspect=aspect
).as_workunit()
def _make_container_browse_path_entries(
path: List[str],
) -> List[models.BrowsePathEntryClass]:
return [
models.BrowsePathEntryClass(id=make_container_urn(s), urn=make_container_urn(s))
for s in path
]
def _make_browse_path_entries(path: List[str]) -> List[models.BrowsePathEntryClass]:
return [models.BrowsePathEntryClass(id=s, urn=None) for s in path]
def _get_browse_paths_from_wu(
stream: Iterable[MetadataWorkUnit],
) -> Dict[str, List[models.BrowsePathEntryClass]]:
paths = {}
for wu in stream:
browse_path_v2 = wu.get_aspect_of_type(models.BrowsePathsV2Class)
if browse_path_v2:
name = wu.get_urn().split(":")[-1]
paths[name] = browse_path_v2.path
return paths
@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_by_container_hierarchy(telemetry_ping_mock):
structure = {
"one": {
"a": {"i": ["1", "2", "3"], "ii": ["4"]},
"b": {"iii": ["5", "6"]},
},
"two": {
"c": {"iv": [], "v": ["7", "8"]},
},
"three": {"d": {}},
"four": {},
}
wus = list(auto_status_aspect(_create_container_aspects(structure)))
assert ( # Sanity check
sum(bool(wu.get_aspect_of_type(models.StatusClass)) for wu in wus) == 21
)
new_wus = list(auto_browse_path_v2(wus))
assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list
assert (
sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
== 21
)
paths = _get_browse_paths_from_wu(new_wus)
assert paths["one"] == []
assert (
paths["7"]
== paths["8"]
== _make_container_browse_path_entries(["two", "c", "v"])
)
assert paths["d"] == _make_container_browse_path_entries(["three"])
assert paths["i"] == _make_container_browse_path_entries(["one", "a"])
# Check urns emitted on demand -- not all at end
for urn in set(wu.get_urn() for wu in new_wus):
try:
idx = next(
i
for i, wu in enumerate(new_wus)
if wu.get_aspect_of_type(models.ContainerClass) and wu.get_urn() == urn
)
except StopIteration:
idx = next(
i
for i, wu in enumerate(new_wus)
if wu.get_aspect_of_type(models.StatusClass) and wu.get_urn() == urn
)
assert new_wus[idx + 1].get_aspect_of_type(
models.BrowsePathsV2Class
) or new_wus[idx + 2].get_aspect_of_type(models.BrowsePathsV2Class)
@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_ignores_urns_already_with(telemetry_ping_mock):
structure = {"a": {"b": {"c": {"d": ["e"]}}}}
wus = [
*auto_status_aspect(
_create_container_aspects(
structure,
other_aspects={
"f": [
models.BrowsePathsClass(paths=["/one/two"]),
models.BrowsePathsV2Class(
path=_make_browse_path_entries(["my", "path"])
),
],
"c": [
models.BrowsePathsV2Class(
path=_make_container_browse_path_entries(["custom", "path"])
)
],
},
),
)
]
new_wus = list(auto_browse_path_v2(wus))
assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list
assert (
sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
== 6
)
paths = _get_browse_paths_from_wu(new_wus)
assert paths["a"] == []
assert paths["c"] == _make_container_browse_path_entries(["custom", "path"])
assert paths["e"] == _make_container_browse_path_entries(["a", "b", "c", "d"])
assert paths["f"] == _make_browse_path_entries(["my", "path"])
@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock):
platform = "platform"
env = "PROD"
wus = [
MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(platform, "dataset-1", env),
aspect=models.BrowsePathsClass(["/one/two"]),
).as_workunit(),
MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(platform, "dataset-2", env),
aspect=models.BrowsePathsClass([f"/{platform}/{env}/something"]),
).as_workunit(),
MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(platform, "dataset-3", env),
aspect=models.BrowsePathsClass([f"/{platform}/one/two"]),
).as_workunit(),
]
new_wus = list(auto_browse_path_v2(wus, drop_dirs=["platform", "PROD", "unused"]))
assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list
assert len(new_wus) == 6
paths = _get_browse_paths_from_wu(new_wus)
assert (
paths["platform,dataset-1,PROD)"]
== paths["platform,dataset-3,PROD)"]
== _make_browse_path_entries(["one", "two"])
)
assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"])
def test_auto_lowercase_aspects():
mcws = auto_workunit(
[
MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(
"bigquery", "myProject.mySchema.myTable", "PROD"
),
aspect=models.DatasetKeyClass(
"urn:li:dataPlatform:bigquery", "myProject.mySchema.myTable", "PROD"
),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a",
aspect=models.ContainerPropertiesClass(
name="test",
),
),
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-Public-Data.Covid19_Aha.staffing,PROD)",
aspects=[
models.DatasetPropertiesClass(
customProperties={
"key": "value",
},
),
],
),
),
]
)
expected = [
*list(
auto_workunit(
[
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,myproject.myschema.mytable,PROD)",
aspect=models.DatasetKeyClass(
"urn:li:dataPlatform:bigquery",
"myProject.mySchema.myTable",
"PROD",
),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a",
aspect=models.ContainerPropertiesClass(
name="test",
),
),
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.staffing,PROD)",
aspects=[
models.DatasetPropertiesClass(
customProperties={
"key": "value",
},
),
],
),
),
]
)
),
]
assert list(auto_lowercase_urns(mcws)) == expected
@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock):
structure = {"a": {"b": ["c"]}}
wus = list(
auto_status_aspect(
_create_container_aspects(
structure,
other_aspects={"b": [models.BrowsePathsClass(paths=["/one/two"])]},
),
)
)
new_wus = list(auto_browse_path_v2(wus))
assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list
assert (
sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
== 3
)
paths = _get_browse_paths_from_wu(new_wus)
assert paths["a"] == []
assert paths["b"] == _make_container_browse_path_entries(["a"])
assert paths["c"] == _make_container_browse_path_entries(["a", "b"])
@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_with_platform_instsance(telemetry_ping_mock):
platform = "my_platform"
platform_instance = "my_instance"
platform_instance_urn = make_dataplatform_instance_urn(platform, platform_instance)
platform_instance_entry = models.BrowsePathEntryClass(
platform_instance_urn, platform_instance_urn
)
structure = {"a": {"b": ["c"]}}
wus = list(auto_status_aspect(_create_container_aspects(structure)))
new_wus = list(
auto_browse_path_v2(
wus,
platform=platform,
platform_instance=platform_instance,
)
)
assert telemetry_ping_mock.call_count == 0
assert (
sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
== 3
)
paths = _get_browse_paths_from_wu(new_wus)
assert paths["a"] == [platform_instance_entry]
assert paths["b"] == [
platform_instance_entry,
*_make_container_browse_path_entries(["a"]),
]
assert paths["c"] == [
platform_instance_entry,
*_make_container_browse_path_entries(["a", "b"]),
]
@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_invalid_batch_telemetry(telemetry_ping_mock):
structure = {"a": {"b": ["c"]}}
b_urn = make_container_urn("b")
wus = [
*_create_container_aspects(structure),
MetadataChangeProposalWrapper( # Browse path for b separate from its Container aspect
entityUrn=b_urn,
aspect=models.BrowsePathsClass(paths=["/one/two"]),
).as_workunit(),
]
wus = list(auto_status_aspect(wus))
assert telemetry_ping_mock.call_count == 0
_ = list(auto_browse_path_v2(wus))
assert telemetry_ping_mock.call_count == 1
assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2"
assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 0
assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 1
@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_no_invalid_batch_telemetry_for_unrelated_aspects(
telemetry_ping_mock,
):
structure = {"a": {"b": ["c"]}}
b_urn = make_container_urn("b")
wus = [
*_create_container_aspects(structure),
MetadataChangeProposalWrapper( # Browse path for b separate from its Container aspect
entityUrn=b_urn,
aspect=models.ContainerPropertiesClass("container name"),
).as_workunit(),
]
wus = list(auto_status_aspect(wus))
assert telemetry_ping_mock.call_count == 0
_ = list(auto_browse_path_v2(wus))
assert telemetry_ping_mock.call_count == 0
@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_invalid_order_telemetry(telemetry_ping_mock):
structure = {"a": {"b": ["c"]}}
wus = list(reversed(list(_create_container_aspects(structure))))
wus = list(auto_status_aspect(wus))
assert telemetry_ping_mock.call_count == 0
new_wus = list(auto_browse_path_v2(wus))
assert (
sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
> 0
)
assert telemetry_ping_mock.call_count == 1
assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2"
assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 1
assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 0
@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_dry_run(telemetry_ping_mock):
structure = {"a": {"b": ["c"]}}
wus = list(reversed(list(_create_container_aspects(structure))))
wus = list(auto_status_aspect(wus))
assert telemetry_ping_mock.call_count == 0
new_wus = list(auto_browse_path_v2(wus, dry_run=True))
assert wus == new_wus
assert (
sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus)
== 0
)
assert telemetry_ping_mock.call_count == 1
@freeze_time("2023-01-02 00:00:00")
def test_auto_empty_dataset_usage_statistics(caplog: pytest.LogCaptureFixture) -> None:
has_urn = make_dataset_urn("my_platform", "has_aspect")
empty_urn = make_dataset_urn("my_platform", "no_aspect")
config = BaseTimeWindowConfig()
wus = [
MetadataChangeProposalWrapper(
entityUrn=has_urn,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(config.start_time.timestamp() * 1000),
eventGranularity=models.TimeWindowSizeClass(
models.CalendarIntervalClass.DAY
),
uniqueUserCount=1,
totalSqlQueries=1,
),
).as_workunit()
]
caplog.clear()
with caplog.at_level(logging.WARNING):
new_wus = list(
auto_empty_dataset_usage_statistics(
wus,
dataset_urns={has_urn, empty_urn},
config=config,
all_buckets=False,
)
)
assert not caplog.records
assert new_wus == [
*wus,
MetadataChangeProposalWrapper(
entityUrn=empty_urn,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(datetime(2023, 1, 1).timestamp() * 1000),
eventGranularity=models.TimeWindowSizeClass(
models.CalendarIntervalClass.DAY
),
uniqueUserCount=0,
totalSqlQueries=0,
topSqlQueries=[],
userCounts=[],
fieldCounts=[],
),
).as_workunit(),
]
@freeze_time("2023-01-02 00:00:00")
def test_auto_empty_dataset_usage_statistics_invalid_timestamp(
caplog: pytest.LogCaptureFixture,
) -> None:
urn = make_dataset_urn("my_platform", "my_dataset")
config = BaseTimeWindowConfig()
wus = [
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=0,
eventGranularity=models.TimeWindowSizeClass(
models.CalendarIntervalClass.DAY
),
uniqueUserCount=1,
totalSqlQueries=1,
),
).as_workunit()
]
caplog.clear()
with caplog.at_level(logging.WARNING):
new_wus = list(
auto_empty_dataset_usage_statistics(
wus,
dataset_urns={urn},
config=config,
all_buckets=True,
)
)
assert len(caplog.records) == 1
assert "1970-01-01 00:00:00+00:00" in caplog.records[0].msg
assert new_wus == [
*wus,
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(config.start_time.timestamp() * 1000),
eventGranularity=models.TimeWindowSizeClass(
models.CalendarIntervalClass.DAY
),
uniqueUserCount=0,
totalSqlQueries=0,
topSqlQueries=[],
userCounts=[],
fieldCounts=[],
),
changeType=models.ChangeTypeClass.CREATE,
).as_workunit(),
]