datahub/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py
2025-05-14 23:26:18 -07:00

160 lines
5.5 KiB
Python

import json
import logging
import sys
import tempfile
from json import JSONDecodeError
from typing import Any, Dict, List, Optional
import datahub.emitter.mce_builder as builder
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata.schema_classes import DatasetProfileClass
from tests.aspect_generators.timeseries.dataset_profile_gen import gen_dataset_profiles
from tests.utils import (
get_strftime_from_timestamp_millis,
run_datahub_cmd,
wait_for_writes_to_sync,
)
logger = logging.getLogger(__name__)
test_aspect_name: str = "datasetProfile"
test_dataset_urn: str = builder.make_dataset_urn_with_platform_instance(
"test_platform",
"test_dataset",
"test_platform_instance",
"TEST",
)
def sync_elastic() -> None:
wait_for_writes_to_sync()
def datahub_put_profile(auth_session, dataset_profile: DatasetProfileClass) -> None:
with tempfile.NamedTemporaryFile("w+t", suffix=".json") as aspect_file:
aspect_text: str = json.dumps(pre_json_transform(dataset_profile.to_obj()))
aspect_file.write(aspect_text)
aspect_file.seek(0)
put_args: List[str] = [
"put",
"--urn",
test_dataset_urn,
"-a",
test_aspect_name,
"-d",
aspect_file.name,
]
put_result = run_datahub_cmd(
put_args,
env={
"DATAHUB_GMS_URL": auth_session.gms_url(),
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
},
)
assert put_result.exit_code == 0
def datahub_get_and_verify_profile(
auth_session,
expected_profile: Optional[DatasetProfileClass],
) -> None:
# Wait for writes to stabilize in elastic
sync_elastic()
get_args: List[str] = ["get", "--urn", test_dataset_urn, "-a", test_aspect_name]
get_result = run_datahub_cmd(
get_args,
env={
"DATAHUB_GMS_URL": auth_session.gms_url(),
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
},
)
if expected_profile is None:
assert get_result.exit_code != 0
assert (
test_dataset_urn in get_result.stderr and "not found" in get_result.stderr
), f"Got stderr of {get_result.stderr} in get_and_verify_profile"
else:
assert get_result.exit_code == 0
try:
get_result_output_obj: Dict = json.loads(get_result.stdout)
except JSONDecodeError as e:
print("Failed to decode: " + get_result.stdout, file=sys.stderr)
raise e
profile_from_get = DatasetProfileClass.from_obj(
get_result_output_obj["datasetProfile"]
)
assert profile_from_get == expected_profile
def datahub_delete(auth_session, params: List[str]) -> None:
sync_elastic()
args: List[str] = ["delete"]
args.extend(params)
args.append("--hard")
logger.info(f"Running delete command with args: {args}")
delete_result = run_datahub_cmd(
args,
input="y\ny\n",
env={
"DATAHUB_GMS_URL": auth_session.gms_url(),
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
},
)
logger.info(delete_result.stdout)
if delete_result.stderr:
logger.error(delete_result.stderr)
assert delete_result.exit_code == 0
def test_timeseries_delete(auth_session: Any) -> None:
num_test_profiles: int = 10
verification_batch_size: int = int(num_test_profiles / 2)
num_latest_profiles_to_delete = 2
expected_profile_after_latest_deletion: DatasetProfileClass
delete_ts_start: str
delete_ts_end: str
# 1. Ingest `num_test_profiles` datasetProfile aspects against the test_dataset_urn via put
# and validate using get.
for i, dataset_profile in enumerate(gen_dataset_profiles(num_test_profiles)):
# Use put command to ingest the aspect value.
datahub_put_profile(auth_session, dataset_profile)
# Validate against all ingested values once every verification_batch_size to reduce overall test time. Since we
# are ingesting the aspects in the ascending order of timestampMillis, get should return the one just put.
if (i % verification_batch_size) == 0:
datahub_get_and_verify_profile(auth_session, dataset_profile)
# Init the params for time-range based deletion.
if i == (num_test_profiles - num_latest_profiles_to_delete - 1):
expected_profile_after_latest_deletion = dataset_profile
elif i == (num_test_profiles - num_latest_profiles_to_delete):
delete_ts_start = get_strftime_from_timestamp_millis(
dataset_profile.timestampMillis - 100
)
elif i == (num_test_profiles - 1):
delete_ts_end = get_strftime_from_timestamp_millis(
dataset_profile.timestampMillis + 100
)
# 2. Verify time-range based deletion.
datahub_delete(
auth_session,
[
"--urn",
test_dataset_urn,
"-a",
test_aspect_name,
"--start-time",
delete_ts_start,
"--end-time",
delete_ts_end,
],
)
assert expected_profile_after_latest_deletion is not None
datahub_get_and_verify_profile(auth_session, expected_profile_after_latest_deletion)
# 3. Delete everything via the delete command & validate that we don't get any profiles back.
datahub_delete(auth_session, ["-p", "test_platform"])
datahub_get_and_verify_profile(auth_session, None)