diff --git a/docs/how/delete-metadata.md b/docs/how/delete-metadata.md index 8ac054193a..7c5009ecf9 100644 --- a/docs/how/delete-metadata.md +++ b/docs/how/delete-metadata.md @@ -33,9 +33,28 @@ This physically deletes all rows for all aspects of the entity. This action cann datahub delete --urn "" --hard ``` -As of datahub v.0.8.35 doing a hard delete by urn will also provide you with a way to remove references to the urn being deleted across the metadata graph. This is important to use if you don't want to have ghost references in your metadata model and want to save space in the graph database. +As of datahub v0.8.35 doing a hard delete by urn will also provide you with a way to remove references to the urn being deleted across the metadata graph. This is important to use if you don't want to have ghost references in your metadata model and want to save space in the graph database. For now, this behaviour must be opted into by a prompt that will appear for you to manually accept or deny. +Starting v0.8.44.2, this also supports deletion of a specific `timeseries` aspect associated with the entity, optionally for a specific time range. + +_Note: Deletion by a specific aspect and time range is currently supported only for timeseries aspects._ + +```bash +# Delete all of the aspect values for a given entity and a timeseries aspect. +datahub delete --urn "" -a "" --hard +Eg: datahub delete --urn "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_dataset,TEST)" -a "datasetProfile" --hard + +# Delete all of the aspect values for a given platform and a timeseries aspect. +datahub delete -p "" -a "" --hard +Eg: datahub delete -p "snowflake" -a "datasetProfile" --hard + +# Delete the aspect values for a given platform and a timeseries aspect corresponding to a specific time range. +datahub delete -p "" -a "" --start-time '' --end-time '' --hard +Eg: datahub delete -p "snowflake" -a "datasetProfile" --start-time '2022-05-29 00:00:00' --end-time '2022-05-31 00:00:00' --hard +``` + + You can optionally add `-n` or `--dry-run` to execute a dry run before issuing the final delete command. You can optionally add `-f` or `--force` to skip confirmations You can optionally add `--only-soft-deleted` flag to remove soft-deleted items only. @@ -119,6 +138,7 @@ datahub ingest rollback --run-id ``` to rollback all aspects added with this run and all entities created by this run. +This deletes both the versioned and the timeseries aspects associated with these entities. ### Unsafe Entities and Rollback diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/EntitySpecUtils.java b/entity-registry/src/main/java/com/linkedin/metadata/models/EntitySpecUtils.java new file mode 100644 index 0000000000..a25bf1c2de --- /dev/null +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/EntitySpecUtils.java @@ -0,0 +1,23 @@ +package com.linkedin.metadata.models; + +import com.linkedin.metadata.models.registry.EntityRegistry; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; + + +public class EntitySpecUtils { + private EntitySpecUtils() { + } + + public static List getEntityTimeseriesAspectNames(@Nonnull EntityRegistry entityRegistry, + @Nonnull String entityName) { + final EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + final List timeseriesAspectNames = entitySpec.getAspectSpecs() + .stream() + .filter(x -> x.isTimeseries()) + .map(x -> x.getName()) + .collect(Collectors.toList()); + return timeseriesAspectNames; + } +} diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index 34b0362896..bfb79fae1c 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -303,7 +303,7 @@ def post_delete_endpoint( payload_obj: dict, path: str, cached_session_host: Optional[Tuple[Session, str]] = None, -) -> typing.Tuple[str, int]: +) -> typing.Tuple[str, int, int]: session, gms_host = cached_session_host or get_session_and_host() url = gms_host + path @@ -314,16 +314,17 @@ def post_delete_endpoint_with_session_and_url( session: Session, url: str, payload_obj: dict, -) -> typing.Tuple[str, int]: +) -> typing.Tuple[str, int, int]: payload = json.dumps(payload_obj) response = session.post(url, payload) summary = parse_run_restli_response(response) - urn = summary.get("urn", "") - rows_affected = summary.get("rows", 0) + urn: str = summary.get("urn", "") + rows_affected: int = summary.get("rows", 0) + timeseries_rows_affected: int = summary.get("timeseriesRows", 0) - return urn, rows_affected + return urn, rows_affected, timeseries_rows_affected def get_urns_by_filter( @@ -624,7 +625,7 @@ def get_aspects_for_entity( # Process timeseries aspects & append to aspect_list timeseries_aspects: List[str] = [a for a in aspects if a in TIMESERIES_ASPECT_MAP] for timeseries_aspect in timeseries_aspects: - timeseries_response = get_latest_timeseries_aspect_values( + timeseries_response: Dict = get_latest_timeseries_aspect_values( entity_urn, timeseries_aspect, cached_session_host ) values: List[Dict] = timeseries_response.get("value", {}).get("values", []) @@ -633,18 +634,13 @@ def get_aspects_for_entity( timeseries_aspect ) if aspect_cls is not None: - aspect_value = values[0] + ts_aspect = values[0]["aspect"] # Decode the json-encoded generic aspect value. - aspect_value["aspect"]["value"] = json.loads( - aspect_value["aspect"]["value"] - ) - aspect_list[ - aspect_cls.RECORD_SCHEMA.fullname.replace("pegasus2avro.", "") - ] = aspect_value + ts_aspect["value"] = json.loads(ts_aspect["value"]) + aspect_list[timeseries_aspect] = ts_aspect aspect_map: Dict[str, Union[dict, _Aspect]] = {} - for a in aspect_list.values(): - aspect_name = a["name"] + for aspect_name, a in aspect_list.items(): aspect_py_class: Optional[Type[Any]] = _get_pydantic_class_from_aspect_name( aspect_name ) diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index 32fb531fa2..d835eafd2d 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -1,8 +1,9 @@ import logging import time from dataclasses import dataclass +from datetime import datetime from random import choices -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple import click import progressbar @@ -30,25 +31,27 @@ UNKNOWN_NUM_RECORDS = -1 @dataclass class DeletionResult: - start_time_millis: int = int(time.time() * 1000.0) - end_time_millis: int = 0 + start_time: int = int(time.time() * 1000.0) + end_time: int = 0 num_records: int = 0 + num_timeseries_records: int = 0 num_entities: int = 0 sample_records: Optional[List[List[str]]] = None def start(self) -> None: - self.start_time_millis = int(time.time() * 1000.0) + self.start_time = int(time.time() * 1000.0) def end(self) -> None: - self.end_time_millis = int(time.time() * 1000.0) + self.end_time = int(time.time() * 1000.0) def merge(self, another_result: "DeletionResult") -> None: - self.end_time_millis = another_result.end_time_millis + self.end_time = another_result.end_time self.num_records = ( self.num_records + another_result.num_records if another_result.num_records != UNKNOWN_NUM_RECORDS else UNKNOWN_NUM_RECORDS ) + self.num_timeseries_records += another_result.num_timeseries_records self.num_entities += another_result.num_entities if another_result.sample_records: if not self.sample_records: @@ -82,13 +85,50 @@ def delete_for_registry( @click.command() -@click.option("--urn", required=False, type=str) -@click.option("-f", "--force", required=False, is_flag=True) -@click.option("--soft/--hard", required=False, is_flag=True, default=True) -@click.option("-e", "--env", required=False, type=str) -@click.option("-p", "--platform", required=False, type=str) -@click.option("--entity_type", required=False, type=str, default="dataset") +@click.option("--urn", required=False, type=str, help="the urn of the entity") +@click.option( + "-a", + "--aspect_name", + required=False, + type=str, + help="the aspect name associated with the entity(only for timeseries aspects)", +) +@click.option( + "-f", "--force", required=False, is_flag=True, help="force the delete if set" +) +@click.option( + "--soft/--hard", + required=False, + is_flag=True, + default=True, + help="specifies soft/hard deletion", +) +@click.option( + "-e", "--env", required=False, type=str, help="the environment of the entity" +) +@click.option( + "-p", "--platform", required=False, type=str, help="the platform of the entity" +) +@click.option( + "--entity_type", + required=False, + type=str, + default="dataset", + help="the entity_type of the entity", +) @click.option("--query", required=False, type=str) +@click.option( + "--start-time", + required=False, + type=click.DateTime(), + help="the start time(only for timeseries aspects)", +) +@click.option( + "--end-time", + required=False, + type=click.DateTime(), + help="the end time(only for timeseries aspects)", +) @click.option("--registry-id", required=False, type=str) @click.option("-n", "--dry-run", required=False, is_flag=True) @click.option("--only-soft-deleted", required=False, is_flag=True, default=False) @@ -96,12 +136,15 @@ def delete_for_registry( @telemetry.with_telemetry def delete( urn: str, + aspect_name: Optional[str], force: bool, soft: bool, env: str, platform: str, entity_type: str, query: str, + start_time: Optional[datetime], + end_time: Optional[datetime], registry_id: str, dry_run: bool, only_soft_deleted: bool, @@ -161,9 +204,12 @@ def delete( deletion_result: DeletionResult = delete_one_urn_cmd( urn, + aspect_name=aspect_name, soft=soft, dry_run=dry_run, entity_type=entity_type, + start_time=start_time, + end_time=end_time, cached_session_host=(session, host), ) @@ -201,11 +247,14 @@ def delete( if not dry_run: message = "soft delete" if soft else "hard delete" click.echo( - f"Took {(deletion_result.end_time_millis-deletion_result.start_time_millis)/1000.0} seconds to {message} {deletion_result.num_records} rows for {deletion_result.num_entities} entities" + f"Took {(deletion_result.end_time-deletion_result.start_time)/1000.0} seconds to {message}" + f" {deletion_result.num_records} versioned rows" + f" and {deletion_result.num_timeseries_records} timeseries aspect rows" + f" for {deletion_result.num_entities} entities." ) else: click.echo( - f"{deletion_result.num_entities} entities with {deletion_result.num_records if deletion_result.num_records != UNKNOWN_NUM_RECORDS else 'unknown'} rows will be affected. Took {(deletion_result.end_time_millis-deletion_result.start_time_millis)/1000.0} seconds to evaluate." + f"{deletion_result.num_entities} entities with {deletion_result.num_records if deletion_result.num_records != UNKNOWN_NUM_RECORDS else 'unknown'} rows will be affected. Took {(deletion_result.end_time-deletion_result.start_time)/1000.0} seconds to evaluate." ) if deletion_result.sample_records: click.echo( @@ -276,7 +325,7 @@ def delete_with_filters( click.echo( f"No urns to delete. Maybe you want to change entity_type={entity_type} or platform={platform} to be something different?" ) - return DeletionResult(end_time_millis=int(time.time() * 1000.0)) + return DeletionResult(end_time=int(time.time() * 1000.0)) if not force and not dry_run: type_delete = "soft" if soft else "permanently" @@ -320,6 +369,9 @@ def _delete_one_urn( soft: bool = False, dry_run: bool = False, entity_type: str = "dataset", + aspect_name: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, cached_session_host: Optional[Tuple[sessions.Session, str]] = None, cached_emitter: Optional[rest_emitter.DatahubRestEmitter] = None, run_id: str = "delete-run-id", @@ -359,13 +411,22 @@ def _delete_one_urn( else: logger.info(f"[Dry-run] Would soft-delete {urn}") elif not dry_run: - payload_obj = {"urn": urn} - urn, rows_affected = cli_utils.post_delete_endpoint( + payload_obj: Dict[str, Any] = {"urn": urn} + if aspect_name: + payload_obj["aspectName"] = aspect_name + if start_time: + payload_obj["startTimeMillis"] = int(round(start_time.timestamp() * 1000)) + if end_time: + payload_obj["endTimeMillis"] = int(round(end_time.timestamp() * 1000)) + rows_affected: int + ts_rows_affected: int + urn, rows_affected, ts_rows_affected = cli_utils.post_delete_endpoint( payload_obj, "/entities?action=delete", cached_session_host=cached_session_host, ) deletion_result.num_records = rows_affected + deletion_result.num_timeseries_records = ts_rows_affected else: logger.info(f"[Dry-run] Would hard-delete {urn} {soft_delete_msg}") deletion_result.num_records = ( @@ -379,9 +440,12 @@ def _delete_one_urn( @telemetry.with_telemetry def delete_one_urn_cmd( urn: str, + aspect_name: Optional[str] = None, soft: bool = False, dry_run: bool = False, entity_type: str = "dataset", + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, cached_session_host: Optional[Tuple[sessions.Session, str]] = None, cached_emitter: Optional[rest_emitter.DatahubRestEmitter] = None, ) -> DeletionResult: @@ -396,6 +460,9 @@ def delete_one_urn_cmd( soft, dry_run, entity_type, + aspect_name, + start_time, + end_time, cached_session_host, cached_emitter, ) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java index 78c28290c5..c45b4aaf06 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java @@ -5,6 +5,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.metadata.aspect.EnvelopedAspect; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.timeseries.AggregationSpec; +import com.linkedin.timeseries.DeleteAspectValuesResult; import com.linkedin.timeseries.GenericTable; import com.linkedin.timeseries.GroupingBucket; import java.util.List; @@ -29,4 +30,23 @@ public interface TimeseriesAspectService { @Nonnull GenericTable getAggregatedStats(@Nonnull String entityName, @Nonnull String aspectName, @Nonnull AggregationSpec[] aggregationSpecs, @Nullable Filter filter, @Nullable GroupingBucket[] groupingBuckets); + + /** + * Generic filter based deletion for timseries aspects. + * @param entityName - The name of the entity. + * @param aspectName - The name of the aspect. + * @param filter - The filter to be used for deletion of the documents on the index. + * @return - number of documents deleted. + */ + @Nonnull + DeleteAspectValuesResult deleteAspectValues(@Nonnull String entityName, @Nonnull String aspectName, + @Nonnull Filter filter); + + /** + * Rollback the timeseries aspects associated with a runId. + * @param runId The runId that needs to be rolledback. + * @return + */ + @Nonnull + DeleteAspectValuesResult rollbackTimeseriesAspects(@Nonnull String runId); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java index 852fa8e405..978ab524b0 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java @@ -9,11 +9,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.common.urn.Urn; import com.linkedin.data.ByteString; import com.linkedin.metadata.aspect.EnvelopedAspect; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.Criterion; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.search.utils.ESUtils; +import com.linkedin.metadata.search.utils.QueryUtils; import com.linkedin.metadata.timeseries.TimeseriesAspectService; import com.linkedin.metadata.timeseries.elastic.indexbuilder.MappingsBuilder; import com.linkedin.metadata.timeseries.elastic.indexbuilder.TimeseriesAspectIndexBuilders; @@ -23,8 +26,10 @@ import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.SystemMetadata; import com.linkedin.timeseries.AggregationSpec; +import com.linkedin.timeseries.DeleteAspectValuesResult; import com.linkedin.timeseries.GenericTable; import com.linkedin.timeseries.GroupingBucket; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; @@ -40,9 +45,12 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -62,6 +70,7 @@ public class ElasticSearchTimeseriesAspectService implements TimeseriesAspectSer private final TimeseriesAspectIndexBuilders _indexBuilders; private final RestHighLevelClient _searchClient; private final ESAggregatedStatsDAO _esAggregatedStatsDAO; + private final EntityRegistry _entityRegistry; public ElasticSearchTimeseriesAspectService(@Nonnull RestHighLevelClient searchClient, @Nonnull IndexConvention indexConvention, @Nonnull TimeseriesAspectIndexBuilders indexBuilders, @@ -70,6 +79,7 @@ public class ElasticSearchTimeseriesAspectService implements TimeseriesAspectSer _indexBuilders = indexBuilders; _searchClient = searchClient; _bulkProcessor = bulkProcessor; + _entityRegistry = entityRegistry; _esAggregatedStatsDAO = new ESAggregatedStatsDAO(indexConvention, searchClient, entityRegistry); } @@ -160,7 +170,7 @@ public class ElasticSearchTimeseriesAspectService implements TimeseriesAspectSer final SearchResponse searchResponse = _searchClient.search(searchRequest, RequestOptions.DEFAULT); hits = searchResponse.getHits(); } catch (Exception e) { - log.error("Search query failed:" + e.getMessage()); + log.error("Search query failed:", e); throw new ESQueryException("Search query failed:", e); } return Arrays.stream(hits.getHits()) @@ -175,4 +185,55 @@ public class ElasticSearchTimeseriesAspectService implements TimeseriesAspectSer @Nullable GroupingBucket[] groupingBuckets) { return _esAggregatedStatsDAO.getAggregatedStats(entityName, aspectName, aggregationSpecs, filter, groupingBuckets); } + + /** + * A generic delete by filter API which uses elasticsearch's deleteByQuery. + * NOTE: There is no need for the client to explicitly walk each scroll page with this approach. Elastic will synchronously + * delete all of the documents matching the query that is specified by the filter, and internally handles the batching logic + * by the scroll page size specified(i.e. the DEFAULT_LIMIT value of 10,000). + * @param entityName the name of the entity. + * @param aspectName the name of the aspect. + * @param filter the filter to be used for deletion of the documents on the index. + * @return the numer of documents returned. + */ + @Nonnull + @Override + public DeleteAspectValuesResult deleteAspectValues(@Nonnull String entityName, @Nonnull String aspectName, + @Nonnull Filter filter) { + final String indexName = _indexConvention.getTimeseriesAspectIndexName(entityName, aspectName); + final BoolQueryBuilder filterQueryBuilder = ESUtils.buildFilterQuery(filter); + final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName).setQuery(filterQueryBuilder) + .setBatchSize(DEFAULT_LIMIT) + .setRefresh(true) + .setTimeout(TimeValue.timeValueMinutes(10)); + try { + final BulkByScrollResponse response = _searchClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); + return new DeleteAspectValuesResult().setNumDocsDeleted(response.getDeleted()); + } catch (IOException e) { + log.error("Delete query failed:", e); + throw new ESQueryException("Delete query failed:", e); + } + } + + @Nonnull + @Override + public DeleteAspectValuesResult rollbackTimeseriesAspects(@Nonnull String runId) { + DeleteAspectValuesResult rollbackResult = new DeleteAspectValuesResult(); + // Construct the runId filter for deletion. + Filter filter = QueryUtils.newFilter("runId", runId); + + // Delete the timeseries aspects across all entities with the runId. + for (Map.Entry entry : _entityRegistry.getEntitySpecs().entrySet()) { + for (AspectSpec aspectSpec : entry.getValue().getAspectSpecs()) { + if (aspectSpec.isTimeseries()) { + DeleteAspectValuesResult result = this.deleteAspectValues(entry.getKey(), aspectSpec.getName(), filter); + rollbackResult.setNumDocsDeleted(rollbackResult.getNumDocsDeleted() + result.getNumDocsDeleted()); + log.info("Number of timeseries docs deleted for entity:{}, aspect:{}, runId:{}={}", entry.getKey(), + aspectSpec.getName(), runId, result.getNumDocsDeleted()); + } + } + } + + return rollbackResult; + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/indexbuilder/MappingsBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/indexbuilder/MappingsBuilder.java index f585f3dae0..37a5dc304c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/indexbuilder/MappingsBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/indexbuilder/MappingsBuilder.java @@ -22,6 +22,7 @@ public class MappingsBuilder { public static final String PARTITION_SPEC = "partitionSpec"; public static final String PARTITION_SPEC_PARTITION = "partition"; public static final String PARTITION_SPEC_TIME_PARTITION = "timePartition"; + public static final String RUN_ID_FIELD = "runId"; private MappingsBuilder() { } @@ -34,6 +35,7 @@ public class MappingsBuilder { Map mappings = new HashMap<>(); + mappings.put(RUN_ID_FIELD, ImmutableMap.of("type", "keyword")); mappings.put(URN_FIELD, ImmutableMap.of("type", "keyword")); mappings.put(MESSAGE_ID_FIELD, ImmutableMap.of("type", "keyword")); mappings.put(TIMESTAMP_FIELD, ImmutableMap.of("type", "date")); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/transformer/TimeseriesAspectTransformer.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/transformer/TimeseriesAspectTransformer.java index 723be5e0b9..96b2a6237a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/transformer/TimeseriesAspectTransformer.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/transformer/TimeseriesAspectTransformer.java @@ -12,10 +12,10 @@ import com.linkedin.data.DataMap; import com.linkedin.data.schema.ArrayDataSchema; import com.linkedin.data.schema.DataSchema; import com.linkedin.data.template.RecordTemplate; -import com.linkedin.metadata.models.extractor.FieldExtractor; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.TimeseriesFieldCollectionSpec; import com.linkedin.metadata.models.TimeseriesFieldSpec; +import com.linkedin.metadata.models.extractor.FieldExtractor; import com.linkedin.metadata.timeseries.elastic.indexbuilder.MappingsBuilder; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; @@ -80,6 +80,10 @@ public class TimeseriesAspectTransformer { (Long) timeseriesAspect.data().get(MappingsBuilder.TIMESTAMP_MILLIS_FIELD)); document.put(MappingsBuilder.TIMESTAMP_MILLIS_FIELD, (Long) timeseriesAspect.data().get(MappingsBuilder.TIMESTAMP_MILLIS_FIELD)); + if (systemMetadata != null && systemMetadata.getRunId() != null) { + // We need this as part of the common document for rollback support. + document.put(MappingsBuilder.RUN_ID_FIELD, systemMetadata.getRunId()); + } Object eventGranularity = timeseriesAspect.data().get(MappingsBuilder.EVENT_GRANULARITY); if (eventGranularity != null) { try { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java index a1d7a06235..3c175834c9 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java @@ -36,6 +36,7 @@ import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import com.linkedin.timeseries.AggregationSpec; import com.linkedin.timeseries.AggregationType; import com.linkedin.timeseries.CalendarInterval; +import com.linkedin.timeseries.DeleteAspectValuesResult; import com.linkedin.timeseries.GenericTable; import com.linkedin.timeseries.GroupingBucket; import com.linkedin.timeseries.GroupingBucketType; @@ -758,4 +759,35 @@ public class ElasticSearchTimeseriesAspectServiceTest { assertEquals(resultTable.getRows(), new StringArrayArray(new StringArray("col1", "3264"), new StringArray("col2", "3288"))); } + + @Test(groups = {"deleteAspectValues1"}, dependsOnGroups = {"getAggregatedStats", "getAspectValues"}) + public void testDeleteAspectValuesByUrnAndTimeRangeDay1() { + Criterion hasUrnCriterion = + new Criterion().setField("urn").setCondition(Condition.EQUAL).setValue(TEST_URN.toString()); + Criterion startTimeCriterion = new Criterion().setField(ES_FILED_TIMESTAMP) + .setCondition(Condition.GREATER_THAN_OR_EQUAL_TO) + .setValue(_startTime.toString()); + Criterion endTimeCriterion = new Criterion().setField(ES_FILED_TIMESTAMP) + .setCondition(Condition.LESS_THAN_OR_EQUAL_TO) + .setValue(String.valueOf(_startTime + 23 * TIME_INCREMENT)); + + Filter filter = + QueryUtils.getFilterFromCriteria(ImmutableList.of(hasUrnCriterion, startTimeCriterion, endTimeCriterion)); + DeleteAspectValuesResult result = + _elasticSearchTimeseriesAspectService.deleteAspectValues(ENTITY_NAME, ASPECT_NAME, filter); + // For day1, we expect 24 (number of hours) * 3 (each testEntityProfile aspect expands 3 elastic docs: + // 1 original + 2 for componentProfiles) = 72 total. + assertEquals(result.getNumDocsDeleted(), Long.valueOf(72L)); + } + + @Test(groups = {"deleteAspectValues2"}, dependsOnGroups = {"deleteAspectValues1"}) + public void testDeleteAspectValuesByUrn() { + Criterion hasUrnCriterion = + new Criterion().setField("urn").setCondition(Condition.EQUAL).setValue(TEST_URN.toString()); + Filter filter = QueryUtils.getFilterFromCriteria(ImmutableList.of(hasUrnCriterion)); + DeleteAspectValuesResult result = + _elasticSearchTimeseriesAspectService.deleteAspectValues(ENTITY_NAME, ASPECT_NAME, filter); + // Of the 300 elastic docs upserted for TEST_URN, 72 got deleted by deleteAspectValues1 test group leaving 228. + assertEquals(result.getNumDocsDeleted(), Long.valueOf(228L)); + } } diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/run/DeleteEntityResponse.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/run/DeleteEntityResponse.pdl index eca128f94c..6cb00950d7 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/run/DeleteEntityResponse.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/run/DeleteEntityResponse.pdl @@ -3,4 +3,5 @@ namespace com.linkedin.metadata.run record DeleteEntityResponse { urn: string rows: long + timeseriesRows: optional long } diff --git a/metadata-models/src/main/pegasus/com/linkedin/timeseries/DeleteAspectValuesResult.pdl b/metadata-models/src/main/pegasus/com/linkedin/timeseries/DeleteAspectValuesResult.pdl new file mode 100644 index 0000000000..16977a118b --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/timeseries/DeleteAspectValuesResult.pdl @@ -0,0 +1,12 @@ +namespace com.linkedin.timeseries + +/** + * Encapsulates the response of the deleteAspectValues API so that it can be extended + * as required in future. + */ +record DeleteAspectValuesResult{ + /** + * Number of documents deleted. + */ + numDocsDeleted: long = 0 +} \ No newline at end of file diff --git a/metadata-service/restli-api/src/main/idl/com.linkedin.entity.entities.restspec.json b/metadata-service/restli-api/src/main/idl/com.linkedin.entity.entities.restspec.json index 9116dd03c8..90c186622f 100644 --- a/metadata-service/restli-api/src/main/idl/com.linkedin.entity.entities.restspec.json +++ b/metadata-service/restli-api/src/main/idl/com.linkedin.entity.entities.restspec.json @@ -86,9 +86,26 @@ "returns" : "com.linkedin.metadata.browse.BrowseResult" }, { "name" : "delete", + "doc" : "Deletes all data related to an individual urn(entity).\nService Returns: - a DeleteEntityResponse object.", "parameters" : [ { "name" : "urn", - "type" : "string" + "type" : "string", + "doc" : "- the urn of the entity." + }, { + "name" : "aspectName", + "type" : "string", + "optional" : true, + "doc" : "- the optional aspect name if only want to delete the aspect (applicable only for timeseries aspects)." + }, { + "name" : "startTimeMillis", + "type" : "long", + "optional" : true, + "doc" : "- the optional start time (applicable only for timeseries aspects)." + }, { + "name" : "endTimeMillis", + "type" : "long", + "optional" : true, + "doc" : "- the optional end time (applicable only for the timeseries aspects)." } ], "returns" : "com.linkedin.metadata.run.DeleteEntityResponse" }, { diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index cafc039167..fb4cf8de0c 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -5328,6 +5328,10 @@ }, { "name" : "rows", "type" : "long" + }, { + "name" : "timeseriesRows", + "type" : "long", + "optional" : true } ] }, { "type" : "record", @@ -5701,9 +5705,26 @@ "returns" : "com.linkedin.metadata.browse.BrowseResult" }, { "name" : "delete", + "doc" : "Deletes all data related to an individual urn(entity).\nService Returns: - a DeleteEntityResponse object.", "parameters" : [ { "name" : "urn", - "type" : "string" + "type" : "string", + "doc" : "- the urn of the entity." + }, { + "name" : "aspectName", + "type" : "string", + "optional" : true, + "doc" : "- the optional aspect name if only want to delete the aspect (applicable only for timeseries aspects)." + }, { + "name" : "startTimeMillis", + "type" : "long", + "optional" : true, + "doc" : "- the optional start time (applicable only for timeseries aspects)." + }, { + "name" : "endTimeMillis", + "type" : "long", + "optional" : true, + "doc" : "- the optional end time (applicable only for the timeseries aspects)." } ], "returns" : "com.linkedin.metadata.run.DeleteEntityResponse" }, { diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java index dd4e5c7f9c..dd3a95dbae 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java @@ -22,6 +22,7 @@ import com.linkedin.metadata.run.UnsafeEntityInfo; import com.linkedin.metadata.run.UnsafeEntityInfoArray; import com.linkedin.metadata.search.utils.ESUtils; import com.linkedin.metadata.systemmetadata.SystemMetadataService; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeProposal; @@ -31,6 +32,7 @@ import com.linkedin.restli.server.annotations.ActionParam; import com.linkedin.restli.server.annotations.Optional; import com.linkedin.restli.server.annotations.RestLiCollection; import com.linkedin.restli.server.resources.CollectionResourceTaskTemplate; +import com.linkedin.timeseries.DeleteAspectValuesResult; import io.opentelemetry.extension.annotations.WithSpan; import java.util.List; import java.util.Map; @@ -69,6 +71,10 @@ public class BatchIngestionRunResource extends CollectionResourceTaskTemplate get(@Nonnull String urnStr, @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) throws URISyntaxException { log.info("GET {}", urnStr); @@ -206,7 +235,8 @@ public class EntityResource extends CollectionResourceTaskTemplate { _entityService.ingestEntity(entity, auditStamp, finalSystemMetadata); - tryIndexRunId(com.datahub.util.ModelUtils.getUrnFromSnapshotUnion(entity.getValue()), systemMetadata, _entitySearchService); + tryIndexRunId(com.datahub.util.ModelUtils.getUrnFromSnapshotUnion(entity.getValue()), systemMetadata, + _entitySearchService); return null; }, MetricRegistry.name(this.getClass(), "ingest")); } @@ -247,7 +277,8 @@ public class EntityResource extends CollectionResourceTaskTemplate searchAcrossLineage(@ActionParam(PARAM_URN) @Nonnull String urnStr, @ActionParam(PARAM_DIRECTION) String direction, @ActionParam(PARAM_ENTITIES) @Optional @Nullable String[] entities, - @ActionParam(PARAM_INPUT) @Optional @Nullable String input, @ActionParam(PARAM_MAX_HOPS) @Optional @Nullable Integer maxHops, - @ActionParam(PARAM_FILTER) @Optional @Nullable Filter filter, @ActionParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion, - @ActionParam(PARAM_START) int start, @ActionParam(PARAM_COUNT) int count) throws URISyntaxException { + @ActionParam(PARAM_INPUT) @Optional @Nullable String input, + @ActionParam(PARAM_MAX_HOPS) @Optional @Nullable Integer maxHops, + @ActionParam(PARAM_FILTER) @Optional @Nullable Filter filter, + @ActionParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion, @ActionParam(PARAM_START) int start, + @ActionParam(PARAM_COUNT) int count) throws URISyntaxException { Urn urn = Urn.createFromString(urnStr); List entityList = entities == null ? Collections.emptyList() : Arrays.asList(entities); log.info("GET SEARCH RESULTS ACROSS RELATIONSHIPS for source urn {}, direction {}, entities {} with query {}", @@ -348,8 +381,6 @@ public class EntityResource extends CollectionResourceTaskTemplate deleteEntity(@ActionParam(PARAM_URN) @Nonnull String urnStr) - throws URISyntaxException { + public Task deleteEntity(@ActionParam(PARAM_URN) @Nonnull String urnStr, + @ActionParam(PARAM_ASPECT_NAME) @Optional String aspectName, + @ActionParam(PARAM_START_TIME_MILLIS) @Optional Long startTimeMills, + @ActionParam(PARAM_END_TIME_MILLIS) @Optional Long endTimeMillis) throws URISyntaxException { Urn urn = Urn.createFromString(urnStr); return RestliUtil.toTask(() -> { + // Find the timeseries aspects to delete. If aspectName is null, delete all. + List timeseriesAspectNames = + EntitySpecUtils.getEntityTimeseriesAspectNames(_entityService.getEntityRegistry(), urn.getEntityType()); + if (aspectName != null && !timeseriesAspectNames.contains(aspectName)) { + throw new UnsupportedOperationException( + String.format("Not supported for non-timeseries aspect '{}'.", aspectName)); + } + List timeseriesAspectsToDelete = + (aspectName == null) ? timeseriesAspectNames : ImmutableList.of(aspectName); + DeleteEntityResponse response = new DeleteEntityResponse(); - RollbackRunResult result = _entityService.deleteUrn(urn); + if (aspectName == null) { + RollbackRunResult result = _entityService.deleteUrn(urn); + response.setRows(result.getRowsDeletedFromEntityDeletion()); + } + Long numTimeseriesDocsDeleted = + deleteTimeseriesAspects(urn, startTimeMills, endTimeMillis, timeseriesAspectsToDelete); + log.info("Total number of timeseries aspect docs deleted: {}", numTimeseriesDocsDeleted); response.setUrn(urnStr); - response.setRows(result.getRowsDeletedFromEntityDeletion()); + response.setTimeseriesRows(numTimeseriesDocsDeleted); return response; }, MetricRegistry.name(this.getClass(), "delete")); } + /** + * Deletes the set of timeseries aspect values for the specified aspects that are associated with the given + * entity urn between startTimeMillis and endTimeMillis. + * @param urn The entity urn whose timeseries aspect values need to be deleted. + * @param startTimeMillis The start time in milliseconds from when the aspect values need to be deleted. + * If this is null, the deletion starts from the oldest value. + * @param endTimeMillis The end time in milliseconds up to when the aspect values need to be deleted. + * If this is null, the deletion will go till the most recent value. + * @param aspectsToDelete - The list of aspect names whose values need to be deleted. + * @return The total number of documents deleted. + */ + private Long deleteTimeseriesAspects(@Nonnull Urn urn, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis, + @Nonnull List aspectsToDelete) { + long totalNumberOfDocsDeleted = 0; + // Construct the filter. + List criteria = new ArrayList<>(); + criteria.add(QueryUtils.newCriterion("urn", urn.toString())); + if (startTimeMillis != null) { + criteria.add( + QueryUtils.newCriterion(ES_FILED_TIMESTAMP, startTimeMillis.toString(), Condition.GREATER_THAN_OR_EQUAL_TO)); + } + if (endTimeMillis != null) { + criteria.add( + QueryUtils.newCriterion(ES_FILED_TIMESTAMP, endTimeMillis.toString(), Condition.LESS_THAN_OR_EQUAL_TO)); + } + final Filter filter = QueryUtils.getFilterFromCriteria(criteria); + + // Delete all the timeseries aspects by the filter. + final String entityType = urn.getEntityType(); + for (final String aspect : aspectsToDelete) { + DeleteAspectValuesResult result = _timeseriesAspectService.deleteAspectValues(entityType, aspect, filter); + totalNumberOfDocsDeleted += result.getNumDocsDeleted(); + + log.debug("Number of timeseries docs deleted for entity:{}, aspect:{}, urn:{}, startTime:{}, endTime:{}={}", + entityType, aspect, urn, startTimeMillis, endTimeMillis, result.getNumDocsDeleted()); + } + return totalNumberOfDocsDeleted; + } + @Action(name = "deleteReferences") @Nonnull @WithSpan public Task deleteReferencesTo(@ActionParam(PARAM_URN) @Nonnull String urnStr, - @ActionParam("dryRun") @Optional Boolean dry) - throws URISyntaxException { + @ActionParam("dryRun") @Optional Boolean dry) throws URISyntaxException { boolean dryRun = dry != null ? dry : false; Urn urn = Urn.createFromString(urnStr); - return RestliUtil.toTask(() -> _deleteEntityService.deleteReferencesTo(urn, dryRun), MetricRegistry.name(this.getClass(), "deleteReferences")); + return RestliUtil.toTask(() -> _deleteEntityService.deleteReferencesTo(urn, dryRun), + MetricRegistry.name(this.getClass(), "deleteReferences")); } /* @@ -473,19 +567,6 @@ public class EntityResource extends CollectionResourceTaskTemplate _entityService.listUrns(entityName, start, count), "listUrns"); } - public static ListResult toListResult(final SearchResult searchResult) { - if (searchResult == null) { - return null; - } - final ListResult listResult = new ListResult(); - listResult.setStart(searchResult.getFrom()); - listResult.setCount(searchResult.getPageSize()); - listResult.setTotal(searchResult.getNumEntities()); - listResult.setEntities( - new UrnArray(searchResult.getEntities().stream().map(SearchEntity::getEntity).collect(Collectors.toList()))); - return listResult; - } - @Action(name = ACTION_FILTER) @Nonnull @WithSpan diff --git a/smoke-test/tests/aspect_generators/__init__.py b/smoke-test/tests/aspect_generators/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/smoke-test/tests/aspect_generators/timeseries/__init__.py b/smoke-test/tests/aspect_generators/timeseries/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/smoke-test/tests/aspect_generators/timeseries/dataset_profile_gen.py b/smoke-test/tests/aspect_generators/timeseries/dataset_profile_gen.py new file mode 100644 index 0000000000..bc22b74ed1 --- /dev/null +++ b/smoke-test/tests/aspect_generators/timeseries/dataset_profile_gen.py @@ -0,0 +1,42 @@ +from typing import Iterable + +from datahub.metadata.schema_classes import (DatasetFieldProfileClass, + DatasetProfileClass, + TimeWindowSizeClass) + +from tests.utils import get_timestampmillis_at_start_of_day + + +def gen_dataset_profiles( + num_days: int = 30, +) -> Iterable[DatasetProfileClass]: + """ + Generates `num_days` number of test dataset profiles for the entity + represented by the test_dataset_urn, starting from the start time of + now - num_days + 1 day to the start of today. + """ + num_rows: int = 100 + num_columns: int = 1 + # [-num_days + 1, -num_days + 2, ..., 0] + for relative_day_num in range(-num_days + 1, 1): + timestampMillis: int = get_timestampmillis_at_start_of_day(relative_day_num) + profile = DatasetProfileClass( + timestampMillis=timestampMillis, + eventGranularity=TimeWindowSizeClass(unit="DAY", multiple=1), + ) + profile.rowCount = num_rows + num_rows += 100 + profile.columnCount = num_columns + profile.fieldProfiles = [] + field_profile = DatasetFieldProfileClass(fieldPath="test_column") + field_profile.uniqueCount = int(num_rows / 2) + field_profile.uniqueProportion = float(0.5) + field_profile.nullCount = int(num_rows / 10) + field_profile.nullProportion = float(0.1) + field_profile.min = "10" + field_profile.max = "20" + field_profile.mean = "15" + field_profile.median = "12" + field_profile.stdev = "3" + profile.fieldProfiles.append(field_profile) + yield profile diff --git a/smoke-test/tests/cli/delete_cmd/__init__.py b/smoke-test/tests/cli/delete_cmd/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py b/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py new file mode 100644 index 0000000000..e478795586 --- /dev/null +++ b/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py @@ -0,0 +1,124 @@ +import json +import tempfile +import time +from typing import Any, Dict, List, Optional + +from click.testing import CliRunner, Result + +import datahub.emitter.mce_builder as builder +from datahub.emitter.serialization_helper import pre_json_transform +from datahub.entrypoints import datahub +from datahub.metadata.schema_classes import DatasetProfileClass +from tests.aspect_generators.timeseries.dataset_profile_gen import \ + gen_dataset_profiles +from tests.utils import get_strftime_from_timestamp_millis + +test_aspect_name: str = "datasetProfile" +test_dataset_urn: str = builder.make_dataset_urn_with_platform_instance( + "test_platform", + "test_dataset", + "test_platform_instance", + "TEST", +) + +runner = CliRunner() + + +def sync_elastic() -> None: + elastic_sync_wait_time_seconds: int = 5 + time.sleep(elastic_sync_wait_time_seconds) + + +def datahub_put_profile(dataset_profile: DatasetProfileClass) -> None: + with tempfile.NamedTemporaryFile("w+t", suffix=".json") as aspect_file: + aspect_text: str = json.dumps(pre_json_transform(dataset_profile.to_obj())) + aspect_file.write(aspect_text) + aspect_file.seek(0) + put_args: List[str] = [ + "put", + "--urn", + test_dataset_urn, + "-a", + test_aspect_name, + "-d", + aspect_file.name, + ] + put_result = runner.invoke(datahub, put_args) + assert put_result.exit_code == 0 + + +def datahub_get_and_verify_profile( + expected_profile: Optional[DatasetProfileClass], +) -> None: + # Wait for writes to stabilize in elastic + sync_elastic() + get_args: List[str] = ["get", "--urn", test_dataset_urn, "-a", test_aspect_name] + get_result: Result = runner.invoke(datahub, get_args) + assert get_result.exit_code == 0 + get_result_output_obj: Dict = json.loads(get_result.output) + if expected_profile is None: + assert not get_result_output_obj + else: + profile_from_get = DatasetProfileClass.from_obj( + get_result_output_obj["datasetProfile"] + ) + assert profile_from_get == expected_profile + + +def datahub_delete(params: List[str]) -> None: + sync_elastic() + + args: List[str] = ["delete"] + args.extend(params) + args.append("--hard") + delete_result: Result = runner.invoke(datahub, args, input="y\ny\n") + assert delete_result.exit_code == 0 + + +def test_timeseries_delete(wait_for_healthchecks: Any) -> None: + num_test_profiles: int = 10 + verification_batch_size: int = int(num_test_profiles / 2) + num_latest_profiles_to_delete = 2 + expected_profile_after_latest_deletion: DatasetProfileClass + delete_ts_start: str + delete_ts_end: str + # 1. Ingest `num_test_profiles` datasetProfile aspects against the test_dataset_urn via put + # and validate using get. + for i, dataset_profile in enumerate(gen_dataset_profiles(num_test_profiles)): + # Use put command to ingest the aspect value. + datahub_put_profile(dataset_profile) + # Validate against all ingested values once every verification_batch_size to reduce overall test time. Since we + # are ingesting the aspects in the ascending order of timestampMillis, get should return the one just put. + if (i % verification_batch_size) == 0: + datahub_get_and_verify_profile(dataset_profile) + + # Init the params for time-range based deletion. + if i == (num_test_profiles - num_latest_profiles_to_delete - 1): + expected_profile_after_latest_deletion = dataset_profile + elif i == (num_test_profiles - num_latest_profiles_to_delete): + delete_ts_start = get_strftime_from_timestamp_millis( + dataset_profile.timestampMillis - 100 + ) + elif i == (num_test_profiles - 1): + delete_ts_end = get_strftime_from_timestamp_millis( + dataset_profile.timestampMillis + 100 + ) + # 2. Verify time-range based deletion. + datahub_delete( + [ + "--urn", + test_dataset_urn, + "-a", + test_aspect_name, + "--start-time", + delete_ts_start, + "--end-time", + delete_ts_end, + ], + ) + assert expected_profile_after_latest_deletion is not None + datahub_get_and_verify_profile(expected_profile_after_latest_deletion) + + # 3. Delete everything via the delete command & validate that we don't get any profiles back. + datahub_delete(["-p", "test_platform"]) + datahub_get_and_verify_profile(None) diff --git a/smoke-test/tests/cli/ingest_cmd/__init__.py b/smoke-test/tests/cli/ingest_cmd/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/smoke-test/tests/cli/ingest_cmd/test_timeseries_rollback.json b/smoke-test/tests/cli/ingest_cmd/test_timeseries_rollback.json new file mode 100644 index 0000000000..4e550009d8 --- /dev/null +++ b/smoke-test/tests/cli/ingest_cmd/test_timeseries_rollback.json @@ -0,0 +1,392 @@ +[ +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1650783600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 3000, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1550, \"uniqueProportion\": 0.5, \"nullCount\": 310, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1650870000000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 2900, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1500, \"uniqueProportion\": 0.5, \"nullCount\": 300, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1650956400000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 2800, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1450, \"uniqueProportion\": 0.5, \"nullCount\": 290, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651042800000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 2700, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1400, \"uniqueProportion\": 0.5, \"nullCount\": 280, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651129200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 2600, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1350, \"uniqueProportion\": 0.5, \"nullCount\": 270, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651215600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 2500, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1300, \"uniqueProportion\": 0.5, \"nullCount\": 260, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651302000000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 2400, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1250, \"uniqueProportion\": 0.5, \"nullCount\": 250, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651388400000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 2300, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1200, \"uniqueProportion\": 0.5, \"nullCount\": 240, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651474800000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 2200, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1150, \"uniqueProportion\": 0.5, \"nullCount\": 230, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651561200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 2100, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1100, \"uniqueProportion\": 0.5, \"nullCount\": 220, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651647600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 2000, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1050, \"uniqueProportion\": 0.5, \"nullCount\": 210, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651734000000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 1900, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 1000, \"uniqueProportion\": 0.5, \"nullCount\": 200, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651820400000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 1800, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 950, \"uniqueProportion\": 0.5, \"nullCount\": 190, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651906800000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 1700, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 900, \"uniqueProportion\": 0.5, \"nullCount\": 180, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1651993200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 1600, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 850, \"uniqueProportion\": 0.5, \"nullCount\": 170, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652079600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 1500, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 800, \"uniqueProportion\": 0.5, \"nullCount\": 160, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652166000000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 1400, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 750, \"uniqueProportion\": 0.5, \"nullCount\": 150, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652252400000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 1300, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 700, \"uniqueProportion\": 0.5, \"nullCount\": 140, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652338800000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 1200, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 650, \"uniqueProportion\": 0.5, \"nullCount\": 130, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652425200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 1100, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 600, \"uniqueProportion\": 0.5, \"nullCount\": 120, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652511600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 1000, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 550, \"uniqueProportion\": 0.5, \"nullCount\": 110, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652598000000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 900, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 500, \"uniqueProportion\": 0.5, \"nullCount\": 100, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652684400000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 800, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 450, \"uniqueProportion\": 0.5, \"nullCount\": 90, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652770800000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 700, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 400, \"uniqueProportion\": 0.5, \"nullCount\": 80, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652857200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 600, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 350, \"uniqueProportion\": 0.5, \"nullCount\": 70, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1652943600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 500, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 300, \"uniqueProportion\": 0.5, \"nullCount\": 60, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1653030000000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 400, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 250, \"uniqueProportion\": 0.5, \"nullCount\": 50, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1653116400000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 300, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 200, \"uniqueProportion\": 0.5, \"nullCount\": 40, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1653202800000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 200, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 150, \"uniqueProportion\": 0.5, \"nullCount\": 30, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:test_rollback,rollback_test_dataset,TEST)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1653289200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 100, \"columnCount\": 1, \"fieldProfiles\": [{\"fieldPath\": \"test_column\", \"uniqueCount\": 100, \"uniqueProportion\": 0.5, \"nullCount\": 20, \"nullProportion\": 0.1, \"min\": \"10\", \"max\": \"20\", \"mean\": \"15\", \"median\": \"12\", \"stdev\": \"3\"}]}", + "contentType": "application/json" + }, + "systemMetadata": null +} +] diff --git a/smoke-test/tests/cli/ingest_cmd/test_timeseries_rollback.py b/smoke-test/tests/cli/ingest_cmd/test_timeseries_rollback.py new file mode 100644 index 0000000000..70f2d15fbe --- /dev/null +++ b/smoke-test/tests/cli/ingest_cmd/test_timeseries_rollback.py @@ -0,0 +1,60 @@ +import json +import time +from typing import Any, Dict, List, Optional + +from click.testing import CliRunner, Result + +import datahub.emitter.mce_builder as builder +from datahub.emitter.serialization_helper import post_json_transform +from datahub.entrypoints import datahub +from datahub.metadata.schema_classes import DatasetProfileClass +from tests.utils import ingest_file_via_rest + +runner = CliRunner() + + +def sync_elastic() -> None: + elastic_sync_wait_time_seconds: int = 5 + time.sleep(elastic_sync_wait_time_seconds) + + +def datahub_rollback(run_id: str) -> None: + sync_elastic() + rollback_args: List[str] = ["ingest", "rollback", "--run-id", run_id, "-f"] + rollback_result: Result = runner.invoke(datahub, rollback_args) + assert rollback_result.exit_code == 0 + + +def datahub_get_and_verify_profile( + urn: str, + aspect_name: str, + expected_profile: Optional[DatasetProfileClass], +) -> None: + # Wait for writes to stabilize in elastic + sync_elastic() + get_args: List[str] = ["get", "--urn", urn, "-a", aspect_name] + get_result: Result = runner.invoke(datahub, get_args) + assert get_result.exit_code == 0 + get_result_output_obj: Dict = json.loads(get_result.output) + if expected_profile is None: + assert not get_result_output_obj + else: + profile_as_dict: Dict = post_json_transform( + get_result_output_obj["datasetProfile"] + ) + profile_from_get = DatasetProfileClass.from_obj(profile_as_dict) + assert profile_from_get == expected_profile + + +def test_timeseries_rollback(wait_for_healthchecks: Any) -> None: + pipeline = ingest_file_via_rest( + "tests/cli/ingest_cmd/test_timeseries_rollback.json" + ) + test_aspect_name: str = "datasetProfile" + test_dataset_urn: str = builder.make_dataset_urn( + "test_rollback", + "rollback_test_dataset", + "TEST", + ) + datahub_rollback(pipeline.config.run_id) + datahub_get_and_verify_profile(test_dataset_urn, test_aspect_name, None) diff --git a/smoke-test/tests/test_stateful_ingestion.py b/smoke-test/tests/test_stateful_ingestion.py index 4431099568..c738a67b68 100644 --- a/smoke-test/tests/test_stateful_ingestion.py +++ b/smoke-test/tests/test_stateful_ingestion.py @@ -79,7 +79,6 @@ def test_stateful_ingestion(wait_for_healthchecks): "reporting": [ { "type": "datahub", - "config": {"datahub_api": {"server": get_gms_url()}}, } ], } diff --git a/smoke-test/tests/timeline/timeline_test.py b/smoke-test/tests/timeline/timeline_test.py index 85a581b328..0683792c7e 100644 --- a/smoke-test/tests/timeline/timeline_test.py +++ b/smoke-test/tests/timeline/timeline_test.py @@ -20,8 +20,7 @@ def test_all(): res_data = timeline_cli.get_timeline(dataset_urn, ["TAG", "DOCUMENTATION", "TECHNICAL_SCHEMA", "GLOSSARY_TERM", "OWNER"], None, None, False) - - delete_cli.delete_one_urn_cmd(dataset_urn, False, False, "dataset", None, None) + delete_cli.delete_one_urn_cmd(urn=dataset_urn) assert res_data assert len(res_data) == 3 assert res_data[0]["semVerChange"] == "MINOR" @@ -47,7 +46,7 @@ def test_schema(): res_data = timeline_cli.get_timeline(dataset_urn, ["TECHNICAL_SCHEMA"], None, None, False) - delete_cli.delete_one_urn_cmd(dataset_urn, False, False, "dataset", None, None) + delete_cli.delete_one_urn_cmd(urn=dataset_urn) assert res_data assert len(res_data) == 3 assert res_data[0]["semVerChange"] == "MINOR" @@ -73,7 +72,7 @@ def test_glossary(): res_data = timeline_cli.get_timeline(dataset_urn, ["GLOSSARY_TERM"], None, None, False) - delete_cli.delete_one_urn_cmd(dataset_urn, False, False, "dataset", None, None) + delete_cli.delete_one_urn_cmd(urn=dataset_urn) assert res_data assert len(res_data) == 3 assert res_data[0]["semVerChange"] == "MINOR" @@ -99,7 +98,7 @@ def test_documentation(): res_data = timeline_cli.get_timeline(dataset_urn, ["DOCUMENTATION"], None, None, False) - delete_cli.delete_one_urn_cmd(dataset_urn, False, False, "dataset", None, None) + delete_cli.delete_one_urn_cmd(urn=dataset_urn) assert res_data assert len(res_data) == 3 assert res_data[0]["semVerChange"] == "MINOR" @@ -125,7 +124,7 @@ def test_tags(): res_data = timeline_cli.get_timeline(dataset_urn, ["TAG"], None, None, False) - delete_cli.delete_one_urn_cmd(dataset_urn, False, False, "dataset", None, None) + delete_cli.delete_one_urn_cmd(urn=dataset_urn) assert res_data assert len(res_data) == 3 assert res_data[0]["semVerChange"] == "MINOR" @@ -151,7 +150,7 @@ def test_ownership(): res_data = timeline_cli.get_timeline(dataset_urn, ["OWNER"], None, None, False) - delete_cli.delete_one_urn_cmd(dataset_urn, False, False, "dataset", None, None) + delete_cli.delete_one_urn_cmd(urn=dataset_urn) assert res_data assert len(res_data) == 3 assert res_data[0]["semVerChange"] == "MINOR" diff --git a/smoke-test/tests/utils.py b/smoke-test/tests/utils.py index 7247caa879..b0588d3bd7 100644 --- a/smoke-test/tests/utils.py +++ b/smoke-test/tests/utils.py @@ -1,11 +1,13 @@ import json import os -from typing import Any, Tuple +from datetime import datetime, timedelta +from typing import Tuple import requests + from datahub.cli import cli_utils -from datahub.ingestion.run.pipeline import Pipeline from datahub.cli.docker import check_local_docker_containers +from datahub.ingestion.run.pipeline import Pipeline def get_frontend_session(): @@ -23,7 +25,10 @@ def get_frontend_session(): def get_admin_credentials(): - return (os.getenv("ADMIN_USERNAME", "datahub"), os.getenv("ADMIN_PASSWORD", "datahub")) + return ( + os.getenv("ADMIN_USERNAME", "datahub"), + os.getenv("ADMIN_PASSWORD", "datahub"), + ) def get_gms_url(): @@ -86,7 +91,7 @@ def check_endpoint(url): raise SystemExit(f"{url}: is Not reachable \nErr: {e}") -def ingest_file_via_rest(filename: str) -> Any: +def ingest_file_via_rest(filename: str) -> Pipeline: pipeline = Pipeline.create( { "source": { @@ -133,3 +138,30 @@ def delete_urns_from_file(filename: str) -> None: get_gms_url() + "/entities?action=delete", payload_obj, ) + + +# Fixed now value +NOW: datetime = datetime.now() + + +def get_timestampmillis_at_start_of_day(relative_day_num: int) -> int: + """ + Returns the time in milliseconds from epoch at the start of the day + corresponding to `now + relative_day_num` + + """ + time: datetime = NOW + timedelta(days=float(relative_day_num)) + time = datetime( + year=time.year, + month=time.month, + day=time.day, + hour=0, + minute=0, + second=0, + microsecond=0, + ) + return int(time.timestamp() * 1000) + + +def get_strftime_from_timestamp_millis(ts_millis: int) -> str: + return datetime.fromtimestamp(ts_millis / 1000).strftime("%Y-%m-%d %H:%M:%S")