feat(platform): adds side-effect report for rollbacks (#4482)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
Pedro Silva 2022-03-31 01:33:35 +01:00 committed by GitHub
parent 0be0689093
commit 306ddff13e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 303 additions and 53 deletions

View File

@ -101,11 +101,14 @@ datahub ingest rollback --run-id <run-id>
to rollback all aspects added with this run and all entities created by this run.
:::note
### Unsafe Entities and Rollback
Since datahub v0.8.29, the `rollback` endpoint will now perform a *soft delete* of the entities ingested by a given run `<run-id>`.
This was done to preserve potential changes that were made directly via DataHub's UI and not part of the ingestion run itself. Such that this information can be retrieved later on if a re-ingestion for the same deleted entity is done.
> **_NOTE:_** Preservation of unsafe entities has been added in datahub `0.8.32`. Read on to understand what it means and how it works.
If you wish to keep old behaviour (hard delete), please use the `--hard-delete` flag (short-hand: `-d`).
In some cases, entities that were initially ingested by a run might have had further modifications to their metadata (e.g. adding terms, tags, or documentation) through the UI or other means. During a roll back of the ingestion that initially created these entities (technically, if the key aspect for these entities are being rolled back), the ingestion process will analyse the metadata graph for aspects that will be left "dangling" and will:
1. Leave these aspects untouched in the database, and soft-delete the entity. A re-ingestion of these entities will result in this additional metadata becoming visible again in the UI, so you don't lose any of your work.
2. The datahub cli will save information about these unsafe entities as a CSV for operators to later review and decide on next steps (keep or remove).
:::
The rollback command will report how many entities have such aspects and save as a CSV the urns of these entities under a rollback reports directory, which defaults to `rollback_reports` under the current directory where the cli is run, and can be configured further using the `--reports-dir` command line arg.
The operator can use `datahub get --urn <>` to inspect the aspects that were left behind and either keep them (do nothing) or delete the entity (and its aspects) completely using `datahub delete --urn <urn> --hard`. If the operator wishes to remove all the metadata associated with these unsafe entities, they can re-issue the rollback command with the `--nuke` flag.

View File

@ -234,24 +234,29 @@ def parse_run_restli_response(response: requests.Response) -> dict:
def post_rollback_endpoint(
payload_obj: dict,
path: str,
) -> typing.Tuple[typing.List[typing.List[str]], int, int]:
) -> typing.Tuple[typing.List[typing.List[str]], int, int, int, int, typing.List[dict]]:
session, gms_host = get_session_and_host()
url = gms_host + path
payload = json.dumps(payload_obj)
response = session.post(url, payload)
summary = parse_run_restli_response(response)
rows = summary.get("aspectRowSummaries", [])
entities_affected = summary.get("entitiesAffected", 0)
aspects_reverted = summary.get("aspectsReverted", 0)
aspects_affected = summary.get("aspectsAffected", 0)
unsafe_entity_count = summary.get("unsafeEntitiesCount", 0)
unsafe_entities = summary.get("unsafeEntities", [])
rolled_back_aspects = list(
filter(lambda row: row["runId"] == payload_obj["runId"], rows)
)
if len(rows) == 0:
click.secho(f"No entities found. Payload used: {payload}", fg="yellow")
local_timezone = datetime.now().astimezone().tzinfo
structured_rows = [
structured_rolled_back_results = [
[
row.get("urn"),
row.get("aspectName"),
@ -260,10 +265,17 @@ def post_rollback_endpoint(
)
+ f" ({local_timezone})",
]
for row in rows
for row in rolled_back_aspects
]
return structured_rows, entities_affected, aspects_affected
return (
structured_rolled_back_results,
entities_affected,
aspects_reverted,
aspects_affected,
unsafe_entity_count,
unsafe_entities,
)
def post_delete_endpoint(
@ -707,4 +719,7 @@ def get_aspects_for_entity(
except Exception as e:
log.error(f"Error on {json.dumps(aspect_dict)}", e)
return {k: v for (k, v) in aspect_map.items() if k in aspects}
if aspects:
return {k: v for (k, v) in aspect_map.items() if k in aspects}
else:
return {k: v for (k, v) in aspect_map.items()}

View File

@ -72,6 +72,9 @@ def delete_for_registry(
structured_rows,
entities_affected,
aspects_affected,
unsafe_aspects,
unsafe_entity_count,
unsafe_entities,
) = cli_utils.post_rollback_endpoint(registry_delete, "/entities?action=deleteAll")
deletion_result.num_entities = entities_affected
deletion_result.num_records = aspects_affected

View File

@ -1,5 +1,7 @@
import csv
import json
import logging
import os
import pathlib
import sys
from datetime import datetime
@ -182,24 +184,29 @@ def show(run_id: str) -> None:
"""Describe a provided ingestion run to datahub"""
payload_obj = {"runId": run_id, "dryRun": True, "hardDelete": True}
structured_rows, entities_affected, aspects_affected = post_rollback_endpoint(
payload_obj, "/runs?action=rollback"
)
(
structured_rows,
entities_affected,
aspects_modified,
aspects_affected,
unsafe_entity_count,
unsafe_entities,
) = post_rollback_endpoint(payload_obj, "/runs?action=rollback")
if aspects_affected >= ELASTIC_MAX_PAGE_SIZE:
if aspects_modified >= ELASTIC_MAX_PAGE_SIZE:
click.echo(
f"this run created at least {entities_affected} new entities and updated at least {aspects_affected} aspects"
f"this run created at least {entities_affected} new entities and updated at least {aspects_modified} aspects"
)
else:
click.echo(
f"this run created {entities_affected} new entities and updated {aspects_affected} aspects"
f"this run created {entities_affected} new entities and updated {aspects_modified} aspects"
)
click.echo(
"rolling back will delete the entities created and revert the updated aspects"
)
click.echo()
click.echo(
f"showing first {len(structured_rows)} of {aspects_affected} aspects touched by this run"
f"showing first {len(structured_rows)} of {aspects_modified} aspects touched by this run"
)
click.echo(tabulate(structured_rows, RUN_TABLE_COLUMNS, tablefmt="grid"))
@ -208,9 +215,18 @@ def show(run_id: str) -> None:
@click.option("--run-id", required=True, type=str)
@click.option("-f", "--force", required=False, is_flag=True)
@click.option("--dry-run", "-n", required=False, is_flag=True, default=False)
@click.option("--hard-delete", "-d", required=False, is_flag=True, default=False)
@click.option("--safe/--nuke", required=False, is_flag=True, default=True)
@click.option(
"--report-dir",
required=False,
type=str,
default="./rollback-reports",
help="Path to directory where rollback reports will be saved to",
)
@telemetry.with_telemetry
def rollback(run_id: str, force: bool, dry_run: bool, hard_delete: bool) -> None:
def rollback(
run_id: str, force: bool, dry_run: bool, safe: bool, report_dir: str
) -> None:
"""Rollback a provided ingestion run to datahub"""
cli_utils.test_connectivity_complain_exit("ingest")
@ -221,18 +237,57 @@ def rollback(run_id: str, force: bool, dry_run: bool, hard_delete: bool) -> None
abort=True,
)
payload_obj = {"runId": run_id, "dryRun": dry_run, "hardDelete": hard_delete}
structured_rows, entities_affected, aspects_affected = post_rollback_endpoint(
payload_obj, "/runs?action=rollback"
)
payload_obj = {"runId": run_id, "dryRun": dry_run, "safe": safe}
(
structured_rows,
entities_affected,
aspects_reverted,
aspects_affected,
unsafe_entity_count,
unsafe_entities,
) = post_rollback_endpoint(payload_obj, "/runs?action=rollback")
click.echo(
"Rolling back deletes the entities created by a run and reverts the updated aspects"
)
click.echo(
f"This rollback {'will' if dry_run else ''} {'delete' if dry_run else 'deleted'} {entities_affected} entities and {'will roll' if dry_run else 'rolled'} back {aspects_affected} aspects"
f"This rollback {'will' if dry_run else ''} {'delete' if dry_run else 'deleted'} {entities_affected} entities and {'will roll' if dry_run else 'rolled'} back {aspects_reverted} aspects"
)
click.echo(
f"showing first {len(structured_rows)} of {aspects_affected} aspects {'that will be' if dry_run else ''} reverted by this run"
f"showing first {len(structured_rows)} of {aspects_reverted} aspects {'that will be ' if dry_run else ''}reverted by this run"
)
click.echo(tabulate(structured_rows, RUN_TABLE_COLUMNS, tablefmt="grid"))
if aspects_affected > 0:
if safe:
click.echo(
f"WARNING: This rollback {'will hide' if dry_run else 'has hidden'} {aspects_affected} aspects related to {unsafe_entity_count} entities being rolled back that are not part ingestion run id."
)
else:
click.echo(
f"WARNING: This rollback {'will delete' if dry_run else 'has deleted'} {aspects_affected} aspects related to {unsafe_entity_count} entities being rolled back that are not part ingestion run id."
)
if unsafe_entity_count > 0:
now = datetime.now()
current_time = now.strftime("%Y-%m-%d %H:%M:%S")
try:
folder_name = report_dir + "/" + current_time
ingestion_config_file_name = folder_name + "/config.json"
os.makedirs(os.path.dirname(ingestion_config_file_name), exist_ok=True)
with open(ingestion_config_file_name, "w") as file_handle:
json.dump({"run_id": run_id}, file_handle)
csv_file_name = folder_name + "/unsafe_entities.csv"
with open(csv_file_name, "w") as file_handle:
writer = csv.writer(file_handle)
writer.writerow(["urn"])
for row in unsafe_entities:
writer.writerow([row.get("urn")])
except IOError as e:
print(e)
sys.exit("Unable to write reports to " + report_dir)

View File

@ -127,7 +127,13 @@ public class ElasticSearchSystemMetadataService implements SystemMetadataService
return findByParams(Collections.singletonMap(FIELD_RUNID, runId), includeSoftDeleted);
}
private List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted) {
@Override
public List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap(FIELD_URN, urn), includeSoftDeleted);
}
@Override
public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted) {
SearchResponse searchResponse = _esDAO.findByParams(systemMetaParams, includeSoftDeleted);
if (searchResponse != null) {
SearchHits hits = searchResponse.getHits();

View File

@ -4,6 +4,7 @@ import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.run.IngestionRunSummary;
import com.linkedin.mxe.SystemMetadata;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@ -24,6 +25,10 @@ public interface SystemMetadataService {
List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted);
List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted);
List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted);
List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted);
List<IngestionRunSummary> listRuns(Integer pageOffset, Integer pageSize, boolean includeSoftDeleted);

View File

@ -5,4 +5,7 @@ record RollbackResponse {
entitiesAffected: long
aspectsAffected: long
entitiesDeleted: optional long
aspectsReverted: optional long
unsafeEntitiesCount: optional long
unsafeEntities: array[UnsafeEntityInfo]
}

View File

@ -0,0 +1,11 @@
namespace com.linkedin.metadata.run
/**
* This record serves as a holder of information for entities that were unsafe to fully delete as part of a rollback operation.
*/
record UnsafeEntityInfo {
/**
* Name of the entity this aspect information instance refers to.
*/
urn: string
}

View File

@ -38,9 +38,16 @@
"type" : "boolean",
"optional" : true
}, {
"annotations" : {
"deprecated" : { }
},
"name" : "hardDelete",
"type" : "boolean",
"optional" : true
}, {
"name" : "safe",
"type" : "boolean",
"optional" : true
} ],
"returns" : "com.linkedin.metadata.run.RollbackResponse"
} ],

View File

@ -5178,8 +5178,31 @@
"name" : "entitiesDeleted",
"type" : "long",
"optional" : true
}, {
"name" : "aspectsReverted",
"type" : "long",
"optional" : true
}, {
"name" : "unsafeEntitiesCount",
"type" : "long",
"optional" : true
}, {
"name" : "unsafeEntities",
"type" : {
"type" : "array",
"items" : {
"type" : "record",
"name" : "UnsafeEntityInfo",
"doc" : " This record serves as a holder of information for entities that were unsafe to fully delete as part of a rollback operation.",
"fields" : [ {
"name" : "urn",
"type" : "string",
"doc" : "Name of the entity this aspect information instance refers to."
} ]
}
}
} ]
}, {
}, "com.linkedin.metadata.run.UnsafeEntityInfo", {
"type" : "record",
"name" : "AggregationMetadata",
"namespace" : "com.linkedin.metadata.search",

View File

@ -3201,8 +3201,31 @@
"name" : "entitiesDeleted",
"type" : "long",
"optional" : true
}, {
"name" : "aspectsReverted",
"type" : "long",
"optional" : true
}, {
"name" : "unsafeEntitiesCount",
"type" : "long",
"optional" : true
}, {
"name" : "unsafeEntities",
"type" : {
"type" : "array",
"items" : {
"type" : "record",
"name" : "UnsafeEntityInfo",
"doc" : " This record serves as a holder of information for entities that were unsafe to fully delete as part of a rollback operation.",
"fields" : [ {
"name" : "urn",
"type" : "string",
"doc" : "Name of the entity this aspect information instance refers to."
} ]
}
}
} ]
}, "com.linkedin.ml.metadata.BaseData", "com.linkedin.ml.metadata.CaveatDetails", "com.linkedin.ml.metadata.CaveatsAndRecommendations", "com.linkedin.ml.metadata.EthicalConsiderations", "com.linkedin.ml.metadata.EvaluationData", "com.linkedin.ml.metadata.HyperParameterValueType", "com.linkedin.ml.metadata.IntendedUse", "com.linkedin.ml.metadata.IntendedUserType", "com.linkedin.ml.metadata.MLFeatureProperties", "com.linkedin.ml.metadata.MLHyperParam", "com.linkedin.ml.metadata.MLMetric", "com.linkedin.ml.metadata.MLModelFactorPrompts", "com.linkedin.ml.metadata.MLModelFactors", "com.linkedin.ml.metadata.MLModelProperties", "com.linkedin.ml.metadata.Metrics", "com.linkedin.ml.metadata.QuantitativeAnalyses", "com.linkedin.ml.metadata.ResultsType", "com.linkedin.ml.metadata.SourceCode", "com.linkedin.ml.metadata.SourceCodeUrl", "com.linkedin.ml.metadata.SourceCodeUrlType", "com.linkedin.ml.metadata.TrainingData", "com.linkedin.schema.ArrayType", "com.linkedin.schema.BinaryJsonSchema", "com.linkedin.schema.BooleanType", "com.linkedin.schema.BytesType", "com.linkedin.schema.DatasetFieldForeignKey", "com.linkedin.schema.DateType", "com.linkedin.schema.EditableSchemaFieldInfo", "com.linkedin.schema.EditableSchemaMetadata", "com.linkedin.schema.EnumType", "com.linkedin.schema.EspressoSchema", "com.linkedin.schema.FixedType", "com.linkedin.schema.ForeignKeyConstraint", "com.linkedin.schema.ForeignKeySpec", "com.linkedin.schema.KafkaSchema", "com.linkedin.schema.KeyValueSchema", "com.linkedin.schema.MapType", "com.linkedin.schema.MySqlDDL", "com.linkedin.schema.NullType", "com.linkedin.schema.NumberType", "com.linkedin.schema.OracleDDL", "com.linkedin.schema.OrcSchema", "com.linkedin.schema.OtherSchema", "com.linkedin.schema.PrestoDDL", "com.linkedin.schema.RecordType", "com.linkedin.schema.SchemaField", "com.linkedin.schema.SchemaFieldDataType", "com.linkedin.schema.SchemaMetadata", "com.linkedin.schema.SchemaMetadataKey", "com.linkedin.schema.Schemaless", "com.linkedin.schema.StringType", "com.linkedin.schema.TimeType", "com.linkedin.schema.UnionType", "com.linkedin.schema.UrnForeignKey", "com.linkedin.tag.TagProperties" ],
}, "com.linkedin.metadata.run.UnsafeEntityInfo", "com.linkedin.ml.metadata.BaseData", "com.linkedin.ml.metadata.CaveatDetails", "com.linkedin.ml.metadata.CaveatsAndRecommendations", "com.linkedin.ml.metadata.EthicalConsiderations", "com.linkedin.ml.metadata.EvaluationData", "com.linkedin.ml.metadata.HyperParameterValueType", "com.linkedin.ml.metadata.IntendedUse", "com.linkedin.ml.metadata.IntendedUserType", "com.linkedin.ml.metadata.MLFeatureProperties", "com.linkedin.ml.metadata.MLHyperParam", "com.linkedin.ml.metadata.MLMetric", "com.linkedin.ml.metadata.MLModelFactorPrompts", "com.linkedin.ml.metadata.MLModelFactors", "com.linkedin.ml.metadata.MLModelProperties", "com.linkedin.ml.metadata.Metrics", "com.linkedin.ml.metadata.QuantitativeAnalyses", "com.linkedin.ml.metadata.ResultsType", "com.linkedin.ml.metadata.SourceCode", "com.linkedin.ml.metadata.SourceCodeUrl", "com.linkedin.ml.metadata.SourceCodeUrlType", "com.linkedin.ml.metadata.TrainingData", "com.linkedin.schema.ArrayType", "com.linkedin.schema.BinaryJsonSchema", "com.linkedin.schema.BooleanType", "com.linkedin.schema.BytesType", "com.linkedin.schema.DatasetFieldForeignKey", "com.linkedin.schema.DateType", "com.linkedin.schema.EditableSchemaFieldInfo", "com.linkedin.schema.EditableSchemaMetadata", "com.linkedin.schema.EnumType", "com.linkedin.schema.EspressoSchema", "com.linkedin.schema.FixedType", "com.linkedin.schema.ForeignKeyConstraint", "com.linkedin.schema.ForeignKeySpec", "com.linkedin.schema.KafkaSchema", "com.linkedin.schema.KeyValueSchema", "com.linkedin.schema.MapType", "com.linkedin.schema.MySqlDDL", "com.linkedin.schema.NullType", "com.linkedin.schema.NumberType", "com.linkedin.schema.OracleDDL", "com.linkedin.schema.OrcSchema", "com.linkedin.schema.OtherSchema", "com.linkedin.schema.PrestoDDL", "com.linkedin.schema.RecordType", "com.linkedin.schema.SchemaField", "com.linkedin.schema.SchemaFieldDataType", "com.linkedin.schema.SchemaMetadata", "com.linkedin.schema.SchemaMetadataKey", "com.linkedin.schema.Schemaless", "com.linkedin.schema.StringType", "com.linkedin.schema.TimeType", "com.linkedin.schema.UnionType", "com.linkedin.schema.UrnForeignKey", "com.linkedin.tag.TagProperties" ],
"schema" : {
"name" : "runs",
"namespace" : "com.linkedin.entity",
@ -3243,9 +3266,16 @@
"type" : "boolean",
"optional" : true
}, {
"annotations" : {
"deprecated" : { }
},
"name" : "hardDelete",
"type" : "boolean",
"optional" : true
}, {
"name" : "safe",
"type" : "boolean",
"optional" : true
} ],
"returns" : "com.linkedin.metadata.run.RollbackResponse"
} ],

View File

@ -5,6 +5,8 @@ import com.linkedin.metadata.aspect.VersionedAspect;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.RollbackRunResult;
import com.linkedin.metadata.restli.RestliUtil;
import com.linkedin.metadata.run.UnsafeEntityInfo;
import com.linkedin.metadata.run.UnsafeEntityInfoArray;
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.run.AspectRowSummaryArray;
import com.linkedin.metadata.run.IngestionRunSummary;
@ -18,14 +20,16 @@ import com.linkedin.restli.server.annotations.Optional;
import com.linkedin.restli.server.annotations.RestLiCollection;
import com.linkedin.restli.server.resources.CollectionResourceTaskTemplate;
import io.opentelemetry.extension.annotations.WithSpan;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -37,6 +41,7 @@ public class BatchIngestionRunResource extends CollectionResourceTaskTemplate<St
private static final Integer DEFAULT_OFFSET = 0;
private static final Integer DEFAULT_PAGE_SIZE = 100;
private static final Integer DEFAULT_UNSAFE_ENTITIES_PAGE_SIZE = 1000000;
private static final boolean DEFAULT_INCLUDE_SOFT_DELETED = false;
private static final boolean DEFAULT_HARD_DELETE = false;
private static final Integer ELASTIC_MAX_PAGE_SIZE = 10000;
@ -57,10 +62,16 @@ public class BatchIngestionRunResource extends CollectionResourceTaskTemplate<St
@Nonnull
@WithSpan
public Task<RollbackResponse> rollback(@ActionParam("runId") @Nonnull String runId,
@ActionParam("dryRun") @Optional Boolean dryRun, @ActionParam("hardDelete") @Optional Boolean hardDelete) {
@ActionParam("dryRun") @Optional Boolean dryRun,
@Deprecated @ActionParam("hardDelete") @Optional Boolean hardDelete,
@ActionParam("safe") @Optional Boolean safe) {
log.info("ROLLBACK RUN runId: {} dry run: {}", runId, dryRun);
boolean doHardDelete = hardDelete != null ? hardDelete : DEFAULT_HARD_DELETE;
boolean doHardDelete = safe != null ? !safe : hardDelete != null ? hardDelete : DEFAULT_HARD_DELETE;
if (safe != null && hardDelete != null) {
log.warn("Both Safe & hardDelete flags were defined, honouring safe flag as hardDelete is deprecated");
}
return RestliUtil.toTask(() -> {
if (runId.equals(EntityService.DEFAULT_RUN_ID)) {
@ -75,22 +86,59 @@ public class BatchIngestionRunResource extends CollectionResourceTaskTemplate<St
log.info("found {} rows to delete...", stringifyRowCount(aspectRowsToDelete.size()));
if (dryRun) {
if (!doHardDelete) {
aspectRowsToDelete.removeIf(AspectRowSummary::isKeyAspect);
}
final Map<Boolean, List<AspectRowSummary>> aspectsSplitByIsKeyAspects = aspectRowsToDelete.stream()
.collect(Collectors.partitioningBy(AspectRowSummary::isKeyAspect));
response.setAspectsAffected(aspectRowsToDelete.size());
response.setEntitiesAffected(
aspectRowsToDelete.stream().collect(Collectors.groupingBy(AspectRowSummary::getUrn)).keySet().size());
response.setEntitiesDeleted(aspectRowsToDelete.stream().filter(row -> row.isKeyAspect()).count());
response.setAspectRowSummaries(
new AspectRowSummaryArray(aspectRowsToDelete.subList(0, Math.min(100, aspectRowsToDelete.size()))));
return response;
final List<AspectRowSummary> keyAspects = aspectsSplitByIsKeyAspects.get(true);
long entitiesDeleted = keyAspects.size();
long aspectsReverted = aspectRowsToDelete.size();
final long affectedEntities = aspectRowsToDelete.stream()
.collect(Collectors.groupingBy(AspectRowSummary::getUrn)).keySet().size();
final AspectRowSummaryArray rowSummaries = new AspectRowSummaryArray(
aspectRowsToDelete.subList(0, Math.min(100, aspectRowsToDelete.size())));
// If we are soft deleting, remove key aspects from count of aspects being deleted
if (!doHardDelete) {
aspectsReverted -= keyAspects.size();
rowSummaries.removeIf(AspectRowSummary::isKeyAspect);
}
// Compute the aspects that exist referencing the key aspects we are deleting
final List<AspectRowSummary> affectedAspectsList = keyAspects.stream()
.map((AspectRowSummary urn) -> _systemMetadataService.findByUrn(urn.getUrn(), false))
.flatMap(List::stream)
.filter(row -> !row.getRunId().equals(runId))
.collect(Collectors.toList());
long affectedAspects = affectedAspectsList.size();
long unsafeEntitiesCount = affectedAspectsList.stream()
.collect(Collectors.groupingBy(AspectRowSummary::getUrn)).keySet().size();
final List<UnsafeEntityInfo> unsafeEntityInfos = affectedAspectsList.stream().map(AspectRowSummary::getUrn)
.distinct()
.map(urn -> {
UnsafeEntityInfo unsafeEntityInfo = new UnsafeEntityInfo();
unsafeEntityInfo.setUrn(urn);
return unsafeEntityInfo;
})
// Return at most 1 million rows
.limit(DEFAULT_UNSAFE_ENTITIES_PAGE_SIZE)
.collect(Collectors.toList());
return response.setAspectsAffected(affectedAspects)
.setAspectsReverted(aspectsReverted)
.setEntitiesAffected(affectedEntities)
.setEntitiesDeleted(entitiesDeleted)
.setUnsafeEntitiesCount(unsafeEntitiesCount)
.setUnsafeEntities(new UnsafeEntityInfoArray(unsafeEntityInfos))
.setAspectRowSummaries(rowSummaries);
}
RollbackRunResult rollbackRunResult = _entityService.rollbackRun(aspectRowsToDelete, runId, doHardDelete);
List<AspectRowSummary> deletedRows = rollbackRunResult.getRowsRolledBack();
Integer rowsDeletedFromEntityDeletion = rollbackRunResult.getRowsDeletedFromEntityDeletion();
final List<AspectRowSummary> deletedRows = rollbackRunResult.getRowsRolledBack();
int rowsDeletedFromEntityDeletion = rollbackRunResult.getRowsDeletedFromEntityDeletion();
// since elastic limits how many rows we can access at once, we need to iteratively delete
while (aspectRowsToDelete.size() >= ELASTIC_MAX_PAGE_SIZE) {
@ -104,11 +152,52 @@ public class BatchIngestionRunResource extends CollectionResourceTaskTemplate<St
}
log.info("finished deleting {} rows", deletedRows.size());
response.setAspectsAffected(deletedRows.size() + rowsDeletedFromEntityDeletion);
response.setEntitiesAffected(deletedRows.stream().filter(row -> row.isKeyAspect()).count());
response.setAspectRowSummaries(
new AspectRowSummaryArray(deletedRows.subList(0, Math.min(100, deletedRows.size()))));
return response;
int aspectsReverted = deletedRows.size() + rowsDeletedFromEntityDeletion;
final Map<Boolean, List<AspectRowSummary>> aspectsSplitByIsKeyAspects = aspectRowsToDelete.stream()
.collect(Collectors.partitioningBy(AspectRowSummary::isKeyAspect));
final List<AspectRowSummary> keyAspects = aspectsSplitByIsKeyAspects.get(true);
final long entitiesDeleted = keyAspects.size();
final long affectedEntities = deletedRows.stream()
.collect(Collectors.groupingBy(AspectRowSummary::getUrn)).keySet().size();
final AspectRowSummaryArray rowSummaries = new AspectRowSummaryArray(
aspectRowsToDelete.subList(0, Math.min(100, aspectRowsToDelete.size())));
log.info("computing aspects affected by this rollback...");
// Compute the aspects that exist referencing the key aspects we are deleting
final List<AspectRowSummary> affectedAspectsList = keyAspects.stream()
.map((AspectRowSummary urn) -> _systemMetadataService.findByUrn(urn.getUrn(), false))
.flatMap(List::stream)
.filter(row -> !row.getRunId().equals(runId))
.collect(Collectors.toList());
long affectedAspects = affectedAspectsList.size();
long unsafeEntitiesCount = affectedAspectsList.stream()
.collect(Collectors.groupingBy(AspectRowSummary::getUrn)).keySet().size();
final List<UnsafeEntityInfo> unsafeEntityInfos = affectedAspectsList.stream().map(AspectRowSummary::getUrn)
.distinct()
.map(urn -> {
UnsafeEntityInfo unsafeEntityInfo = new UnsafeEntityInfo();
unsafeEntityInfo.setUrn(urn);
return unsafeEntityInfo;
})
// Return at most 1 million rows
.limit(DEFAULT_UNSAFE_ENTITIES_PAGE_SIZE)
.collect(Collectors.toList());
log.info("calculation done.");
return response.setAspectsAffected(affectedAspects)
.setAspectsReverted(aspectsReverted)
.setEntitiesAffected(affectedEntities)
.setEntitiesDeleted(entitiesDeleted)
.setUnsafeEntitiesCount(unsafeEntitiesCount)
.setUnsafeEntities(new UnsafeEntityInfoArray(unsafeEntityInfos))
.setAspectRowSummaries(rowSummaries);
}, MetricRegistry.name(this.getClass(), "rollback"));
}
@ -116,7 +205,7 @@ public class BatchIngestionRunResource extends CollectionResourceTaskTemplate<St
if (size < ELASTIC_MAX_PAGE_SIZE) {
return String.valueOf(size);
} else {
return "at least " + String.valueOf(size);
return "at least " + size;
}
}