feat(cli): support recursive deletes (#8709)

This commit is contained in:
Harshal Sheth 2023-08-30 12:07:41 -07:00 committed by GitHub
parent dee1bc854c
commit 5032af9123
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 7 deletions

View File

@ -43,6 +43,9 @@ datahub delete --platform snowflake
# Filters can be combined, which will select entities that match all filters.
datahub delete --platform looker --entity-type chart
datahub delete --platform bigquery --env PROD
# You can also do recursive deletes for container and dataPlatformInstance entities.
datahub delete --urn "urn:li:container:f76..." --recursive
```
When performing hard deletes, you can optionally add the `--only-soft-deleted` flag to only hard delete entities that were previously soft deleted.
@ -122,6 +125,14 @@ datahub delete --urn "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted
datahub delete --platform snowflake --env DEV
```
#### Delete everything within a specific Snowflake DB
```shell
# You can find your container urn by navigating to the relevant
# DB in the DataHub UI and clicking the "copy urn" button.
datahub delete --urn "urn:li:container:77644901c4f574845578ebd18b7c14fa" --recursive
```
#### Delete all BigQuery datasets in the PROD environment
```shell
@ -129,6 +140,13 @@ datahub delete --platform snowflake --env DEV
datahub delete --env PROD --entity-type dataset --platform bigquery
```
#### Delete everything within a MySQL platform instance
```shell
# The instance name comes from the `platform_instance` config option in the ingestion recipe.
datahub delete --urn 'urn:li:dataPlatformInstance:(urn:li:dataPlatform:mysql,my_instance_name)' --recursive
```
#### Delete all pipelines and tasks from Airflow
```shell
@ -138,6 +156,7 @@ datahub delete --platform "airflow"
#### Delete all containers for a particular platform
```shell
# Note: this will leave S3 datasets intact.
datahub delete --entity-type container --platform s3
```

View File

@ -37,6 +37,11 @@ _DELETE_WITH_REFERENCES_TYPES = {
"glossaryNode",
}
_RECURSIVE_DELETE_TYPES = {
"container",
"dataPlatformInstance",
}
@click.group(cls=DefaultGroup, default="by-filter")
def delete() -> None:
@ -252,6 +257,12 @@ def references(urn: str, dry_run: bool, force: bool) -> None:
help="Entity type filter (e.g. dataset)",
)
@click.option("--query", required=False, type=str, help="Elasticsearch query string")
@click.option(
"--recursive",
required=False,
is_flag=True,
help="Recursively delete all contained entities (only for containers and dataPlatformInstances)",
)
@click.option(
"--start-time",
required=False,
@ -298,6 +309,7 @@ def by_filter(
platform: Optional[str],
entity_type: Optional[str],
query: Optional[str],
recursive: bool,
start_time: Optional[datetime],
end_time: Optional[datetime],
batch_size: int,
@ -308,7 +320,12 @@ def by_filter(
# Validate the cli arguments.
_validate_user_urn_and_filters(
urn=urn, entity_type=entity_type, platform=platform, env=env, query=query
urn=urn,
entity_type=entity_type,
platform=platform,
env=env,
query=query,
recursive=recursive,
)
soft_delete_filter = _validate_user_soft_delete_flags(
soft=soft, aspect=aspect, only_soft_deleted=only_soft_deleted
@ -327,11 +344,29 @@ def by_filter(
logger.info(f"Using {graph}")
# Determine which urns to delete.
delete_by_urn = bool(urn) and not recursive
if urn:
delete_by_urn = True
urns = [urn]
if recursive:
# Add children urns to the list.
if guess_entity_type(urn) == "dataPlatformInstance":
urns.extend(
graph.get_urns_by_filter(
platform_instance=urn,
status=soft_delete_filter,
batch_size=batch_size,
)
)
else:
urns.extend(
graph.get_urns_by_filter(
container=urn,
status=soft_delete_filter,
batch_size=batch_size,
)
)
else:
delete_by_urn = False
urns = list(
graph.get_urns_by_filter(
entity_types=[entity_type] if entity_type else None,
@ -348,20 +383,22 @@ def by_filter(
)
return
# Print out a summary of the urns to be deleted and confirm with the user.
if not delete_by_urn:
urns_by_type: Dict[str, List[str]] = {}
for urn in urns:
entity_type = guess_entity_type(urn)
urns_by_type.setdefault(entity_type, []).append(urn)
if len(urns_by_type) > 1:
# Display a breakdown of urns by entity type if there's multiple.
click.echo("Filter matched urns of multiple entity types")
click.echo("Found urns of multiple entity types")
for entity_type, entity_urns in urns_by_type.items():
click.echo(
f"- {len(entity_urns)} {entity_type} urn(s). Sample: {choices(entity_urns, k=min(5, len(entity_urns)))}"
)
else:
click.echo(
f"Filter matched {len(urns)} {entity_type} urn(s). Sample: {choices(urns, k=min(5, len(urns)))}"
f"Found {len(urns)} {entity_type} urn(s). Sample: {choices(urns, k=min(5, len(urns)))}"
)
if not force and not dry_run:
@ -403,6 +440,7 @@ def _validate_user_urn_and_filters(
platform: Optional[str],
env: Optional[str],
query: Optional[str],
recursive: bool,
) -> None:
# Check urn / filters options.
if urn:
@ -423,6 +461,21 @@ def _validate_user_urn_and_filters(
f"Using --env without other filters will delete all metadata in the {env} environment. Please use with caution."
)
# Check recursive flag.
if recursive:
if not urn:
raise click.UsageError(
"The --recursive flag can only be used with a single urn."
)
elif guess_entity_type(urn) not in _RECURSIVE_DELETE_TYPES:
raise click.UsageError(
f"The --recursive flag can only be used with these entity types: {_RECURSIVE_DELETE_TYPES}."
)
elif urn and guess_entity_type(urn) in _RECURSIVE_DELETE_TYPES:
logger.warning(
f"This will only delete {urn}. Use --recursive to delete all contained entities."
)
def _validate_user_soft_delete_flags(
soft: bool, aspect: Optional[str], only_soft_deleted: bool

View File

@ -16,7 +16,12 @@ from requests.models import HTTPError
from datahub.cli.cli_utils import get_url_and_token
from datahub.configuration.common import ConfigModel, GraphError, OperationalError
from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
from datahub.emitter.mce_builder import DEFAULT_ENV, Aspect, make_data_platform_urn
from datahub.emitter.mce_builder import (
DEFAULT_ENV,
Aspect,
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.emitter.serialization_helper import post_json_transform
@ -543,8 +548,10 @@ class DataHubGraph(DatahubRestEmitter):
*,
entity_types: Optional[List[str]] = None,
platform: Optional[str] = None,
platform_instance: Optional[str] = None,
env: Optional[str] = None,
query: Optional[str] = None,
container: Optional[str] = None,
status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED,
batch_size: int = 10000,
extraFilters: Optional[List[SearchFilterRule]] = None,
@ -557,15 +564,25 @@ class DataHubGraph(DatahubRestEmitter):
:param entity_types: List of entity types to include. If None, all entity types will be returned.
:param platform: Platform to filter on. If None, all platforms will be returned.
:param platform_instance: Platform instance to filter on. If None, all platform instances will be returned.
:param env: Environment (e.g. PROD, DEV) to filter on. If None, all environments will be returned.
:param query: Query string to filter on. If None, all entities will be returned.
:param container: A container urn that entities must be within.
This works recursively, so it will include entities within sub-containers as well.
If None, all entities will be returned.
Note that this requires browsePathV2 aspects (added in 0.10.4+).
:param status: Filter on the deletion status of the entity. The default is only return non-soft-deleted entities.
:param extraFilters: Additional filters to apply. If specified, the results will match all of the filters.
:return: An iterable of urns that match the filters.
"""
types: Optional[List[str]] = None
if entity_types is not None:
if not entity_types:
raise ValueError("entity_types cannot be an empty list")
raise ValueError(
"entity_types cannot be an empty list; use None for all entities"
)
types = [_graphql_entity_type(entity_type) for entity_type in entity_types]
@ -584,6 +601,44 @@ class DataHubGraph(DatahubRestEmitter):
}
]
# Platform instance filter.
if platform_instance:
if platform:
# Massage the platform instance into a fully qualified urn, if necessary.
platform_instance = make_dataplatform_instance_urn(
platform, platform_instance
)
# Warn if platform_instance is not a fully qualified urn.
# TODO: Change this once we have a first-class data platform instance urn type.
if guess_entity_type(platform_instance) != "dataPlatformInstance":
raise ValueError(
f"Invalid data platform instance urn: {platform_instance}"
)
andFilters += [
{
"field": "platformInstance",
"values": [platform_instance],
"condition": "EQUAL",
}
]
# Browse path v2 filter.
if container:
# Warn if container is not a fully qualified urn.
# TODO: Change this once we have a first-class container urn type.
if guess_entity_type(container) != "container":
raise ValueError(f"Invalid container urn: {container}")
andFilters += [
{
"field": "browsePathV2",
"values": [container],
"condition": "CONTAIN",
}
]
# Status filter.
if status == RemovedStatusFilter.NOT_SOFT_DELETED:
# Subtle: in some cases (e.g. when the dataset doesn't have a status aspect), the