Pedro Silva e8f6c4cabd
feat(cli) Changes rollback behaviour to apply soft deletes by default (#4358)
* Changes rollback behaviour to apply soft deletes by default

Summary:
Addresses feature request: Flag in delete command to only delete aspects touched by an ingestion run; add flag to nuke everything by modifying the default behaviour of a rollback operation which will not by default delete an entity if a keyAspect is being rolled-back.

Instead the key aspect is kept and a StatusAspect is upserted with removed=true, effectively making a soft delete.
Another PR will follow to perform garbage collection on these soft deleted entities.

To keep old behaviour, a new parameter to the cli ingest rollback endpoint: --hard-delete was added.

* Adds restli specs

* Fixes deleteAspect endpoint & adds support for nested transactions

* Enable regression test & fix docker-compose for local development

* Add generated quickstart

* Fix quickstart generation script

* Adds missing var env to docker-compose-without-neo4j

* Sets status removed=true when ingesting resources

* Adds soft deletes for ElasticSearch + soft delete flags across ingestion sub-commands

* Makes elastic search consistent

* Update tests with new behaviour

* apply review comments

* apply review comment

* Forces Elastic search to add documents with status removed false when ingesting

* Reset gradle properties to default

* Fix tests
2022-03-15 12:05:52 -07:00

58 lines
1.5 KiB
Python

import json
import requests
from typing import Any
from datahub.cli import cli_utils
from datahub.ingestion.run.pipeline import Pipeline
GMS_ENDPOINT = "http://localhost:8080"
FRONTEND_ENDPOINT = "http://localhost:9002"
def ingest_file_via_rest(filename: str) -> Any:
pipeline = Pipeline.create(
{
"source": {
"type": "file",
"config": {"filename": filename},
},
"sink": {
"type": "datahub-rest",
"config": {"server": GMS_ENDPOINT},
},
}
)
pipeline.run()
pipeline.raise_from_status()
return pipeline
def delete_urns_from_file(filename: str) -> None:
session = requests.Session()
session.headers.update(
{
"X-RestLi-Protocol-Version": "2.0.0",
"Content-Type": "application/json",
}
)
with open(filename) as f:
d = json.load(f)
for entry in d:
is_mcp = 'entityUrn' in entry
urn = None
# Kill Snapshot
if is_mcp:
urn = entry['entityUrn']
else:
snapshot_union = entry['proposedSnapshot']
snapshot = list(snapshot_union.values())[0]
urn = snapshot['urn']
payload_obj = {"urn": urn}
cli_utils.post_delete_endpoint_with_session_and_url(
session,
GMS_ENDPOINT + "/entities?action=delete",
payload_obj,
)