mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-24 16:38:19 +00:00
feat(cli): adding a put command and docs (#3614)
This commit is contained in:
parent
192cf65c4d
commit
9f9aaaac23
@ -55,6 +55,7 @@ module.exports = {
|
||||
"Getting Started": [
|
||||
// Serves as user guides.
|
||||
"docs/quickstart",
|
||||
"docs/cli",
|
||||
"docs/debugging",
|
||||
"metadata-ingestion/README",
|
||||
"docs/policies",
|
||||
@ -210,6 +211,7 @@ module.exports = {
|
||||
"docs/advanced/mcp-mcl",
|
||||
"docs/advanced/field-path-spec-v2",
|
||||
"docs/advanced/monitoring",
|
||||
"docs/how/add-custom-ingestion-source",
|
||||
// WIP "docs/advanced/backfilling",
|
||||
// WIP "docs/advanced/derived-aspects",
|
||||
// WIP "docs/advanced/entity-hierarchy",
|
||||
|
||||
168
docs/cli.md
Normal file
168
docs/cli.md
Normal file
@ -0,0 +1,168 @@
|
||||
# DataHub CLI
|
||||
|
||||
DataHub comes with a friendly cli called `datahub` that allows you to perform a lot of common operations using just the command line.
|
||||
|
||||
## Install
|
||||
|
||||
### Using pip
|
||||
|
||||
We recommend python virtual environments (venv-s) to namespace pip modules. Here's an example setup:
|
||||
|
||||
```shell
|
||||
python3 -m venv datahub-env # create the environment
|
||||
source datahub-env/bin/activate # activate the environment
|
||||
```
|
||||
|
||||
**_NOTE:_** If you install `datahub` in a virtual environment, that same virtual environment must be re-activated each time a shell window or session is created.
|
||||
|
||||
Once inside the virtual environment, install `datahub` using the following commands
|
||||
```console
|
||||
# Requires Python 3.6+
|
||||
python3 -m pip install --upgrade pip wheel setuptools
|
||||
python3 -m pip install --upgrade acryl-datahub
|
||||
datahub version
|
||||
# If you see "command not found", try running this instead: python3 -m datahub version
|
||||
```
|
||||
|
||||
If you run into an error, try checking the [_common setup issues_](../metadata-ingestion/developing.md#Common-setup-issues).
|
||||
|
||||
## User Guide
|
||||
|
||||
The `datahub` cli allows you to do many things, such as quickstarting a DataHub docker instance locally, ingesting metadata from your sources, as well as retrieving and modifying metadata.
|
||||
Like most command line tools, `--help` is your best friend. Use it to discover the capabilities of the cli and the different commands and sub-commands that are supported.
|
||||
|
||||
```console
|
||||
datahub --help
|
||||
Usage: datahub [OPTIONS] COMMAND [ARGS]...
|
||||
|
||||
Options:
|
||||
--debug / --no-debug
|
||||
--version Show the version and exit.
|
||||
--help Show this message and exit.
|
||||
|
||||
Commands:
|
||||
check Helper commands for checking various aspects of DataHub.
|
||||
delete Delete metadata from datahub using a single urn or a combination of filters
|
||||
docker Helper commands for setting up and interacting with a local DataHub instance using Docker.
|
||||
get Get metadata for an entity with an optional list of aspects to project
|
||||
ingest Ingest metadata into DataHub.
|
||||
init Configure which datahub instance to connect to
|
||||
put Update a single aspect of an entity
|
||||
version Print version number and exit.
|
||||
```
|
||||
|
||||
The following top-level commands listed below are here mainly to give the reader a high-level picture of what are the kinds of things you can accomplish with the cli.
|
||||
We've ordered them roughly in the order we expect you to interact with these commands as you get deeper into the `datahub`-verse.
|
||||
|
||||
### docker
|
||||
|
||||
The `docker` command allows you to start up a local DataHub instance using `datahub docker quickstart`. You can also check if the docker cluster is healthy using `datahub docker check`.
|
||||
|
||||
### ingest
|
||||
|
||||
The `ingest` command allows you to ingest metadata from your sources using ingestion configuration files, which we call recipes. The main [ingestion page](../metadata-ingestion/README.md) contains detailed instructions about how you can use the ingest command and perform advanced operations like rolling-back previously ingested metadata through the `rollback` sub-command.
|
||||
|
||||
### check
|
||||
|
||||
The datahub package is composed of different plugins that allow you to connect to different metadata sources and ingest metadata from them.
|
||||
The `check` command allows you to check if all plugins are loaded correctly as well as validate an individual MCE-file.
|
||||
|
||||
### init
|
||||
|
||||
The init command is used to tell `datahub` about where your DataHub instance is located. The CLI will point to localhost DataHub by default.
|
||||
Running `datahub init` will allow you to customize the datahub instance you are communicating with.
|
||||
|
||||
**_Note_**: Provide your GMS instance's host when the prompt asks you for the DataHub host.
|
||||
|
||||
Alternatively, you can set the following env variables if you don't want to use a config file
|
||||
|
||||
```shell
|
||||
DATAHUB_SKIP_CONFIG=True
|
||||
DATAHUB_GMS_HOST=http://localhost:8080
|
||||
DATAHUB_GMS_TOKEN= # Used for communicating with DataHub Cloud
|
||||
The env variables take precedence over what is in the config.
|
||||
```
|
||||
|
||||
### delete
|
||||
|
||||
The `delete` command allows you to delete metadata from DataHub. Read this [guide](./how/delete-metadata.md) to understand how you can delete metadata from DataHub.
|
||||
|
||||
```console
|
||||
datahub delete --urn "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)" --soft
|
||||
```
|
||||
|
||||
### get
|
||||
|
||||
The `get` command allows you to easily retrieve metadata from DataHub, by using the REST API.
|
||||
For example the following command gets the ownership aspect from the dataset `urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)`
|
||||
|
||||
```console
|
||||
datahub get --urn "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)" --aspect ownership | jq put_command
|
||||
{
|
||||
"value": {
|
||||
"com.linkedin.metadata.snapshot.DatasetSnapshot": {
|
||||
"aspects": [
|
||||
{
|
||||
"com.linkedin.metadata.key.DatasetKey": {
|
||||
"name": "SampleHiveDataset",
|
||||
"origin": "PROD",
|
||||
"platform": "urn:li:dataPlatform:hive"
|
||||
}
|
||||
},
|
||||
{
|
||||
"com.linkedin.common.Ownership": {
|
||||
"lastModified": {
|
||||
"actor": "urn:li:corpuser:jdoe",
|
||||
"time": 1581407189000
|
||||
},
|
||||
"owners": [
|
||||
{
|
||||
"owner": "urn:li:corpuser:jdoe",
|
||||
"type": "DATAOWNER"
|
||||
},
|
||||
{
|
||||
"owner": "urn:li:corpuser:datahub",
|
||||
"type": "DATAOWNER"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
],
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### put
|
||||
|
||||
The `put` command allows you to write metadata into DataHub. This is a flexible way for you to issue edits to metadata from the command line.
|
||||
For example, the following command instructs `datahub` to set the `ownership` aspect of the dataset `urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)` to the value in the file `ownership.json`.
|
||||
The JSON in the `ownership.json` file needs to conform to the [`Ownership`](https://github.com/linkedin/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/common/Ownership.pdl) Aspect model as shown below.
|
||||
```json
|
||||
{
|
||||
"owners": [
|
||||
{
|
||||
"owner": "urn:li:corpUser:jdoe",
|
||||
"type": "DEVELOPER"
|
||||
},
|
||||
{
|
||||
"owner": "urn:li:corpUser:jdub",
|
||||
"type": "DATAOWNER"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
```console
|
||||
datahub --debug put --urn "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)" --aspect ownership -d ownership.json
|
||||
|
||||
[DATE_TIMESTAMP] DEBUG {datahub.cli.cli_utils:340} - Attempting to emit to DataHub GMS; using curl equivalent to:
|
||||
curl -X POST -H 'User-Agent: python-requests/2.26.0' -H 'Accept-Encoding: gzip, deflate' -H 'Accept: */*' -H 'Connection: keep-alive' -H 'X-RestLi-Protocol-Version: 2.0.0' -H 'Content-Type: application/json' --data '{"proposal": {"entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", "aspectName": "ownership", "changeType": "UPSERT", "aspect": {"contentType": "application/json", "value": "{\"owners\": [{\"owner\": \"urn:li:corpUser:jdoe\", \"type\": \"DEVELOPER\"}, {\"owner\": \"urn:li:corpUser:jdub\", \"type\": \"DATAOWNER\"}]}"}}}' 'http://localhost:8080/aspects/?action=ingestProposal'
|
||||
Update succeeded with status 200
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@ -8,43 +8,32 @@ Read on to find out how to perform these kinds of deletes.
|
||||
|
||||
_Note: Deleting metadata should only be done with care. Always use `--dry-run` to understand what will be deleted before proceeding. Prefer soft-deletes (`--soft`) unless you really want to nuke metadata rows. Hard deletes will actually delete rows in the primary store and recovering them will require using backups of the primary metadata store. Make sure you understand the implications of issuing soft-deletes versus hard-deletes before proceeding._
|
||||
|
||||
## Accessing datahub CLI
|
||||
## The `datahub` CLI
|
||||
|
||||
To use the datahub CLI you need to have the datahub Python package installed as explained in [Metadata Ingestion](../../metadata-ingestion/README.md) or you can use the `datahub-ingestion` docker image as explained in [Docker Images](../../docker/README.md). In case you are using Kubernetes you can start a pod with the `datahub-ingestion` docker image, get in the shell of the pod and you will have the access to datahub CLI in your kubernetes cluster.
|
||||
To use the datahub CLI you follow the installation and configuration guide at [DataHub CLI](../cli.md) or you can use the `datahub-ingestion` docker image as explained in [Docker Images](../../docker/README.md). In case you are using Kubernetes you can start a pod with the `datahub-ingestion` docker image, log onto a shell on the pod and you should have the access to datahub CLI in your kubernetes cluster.
|
||||
|
||||
## Configuring DataHub CLI
|
||||
|
||||
The CLI will point to localhost DataHub by default. Running
|
||||
|
||||
```
|
||||
datahub init
|
||||
```
|
||||
|
||||
will allow you to customize the datahub instance you are communicating with.
|
||||
|
||||
_Note: Provide your GMS instance's host when the prompt asks you for the DataHub host._
|
||||
|
||||
Alternatively, you can set the following env variables if you don't want to use a config file
|
||||
```
|
||||
DATAHUB_SKIP_CONFIG=True
|
||||
DATAHUB_GMS_HOST=http://localhost:8080
|
||||
DATAHUB_GMS_TOKEN=
|
||||
```
|
||||
|
||||
The env variables take precendence over what is in the config.
|
||||
|
||||
## Delete By Urn
|
||||
|
||||
To delete all the data related to a single entity, run
|
||||
|
||||
### Soft Delete
|
||||
### Soft Delete (the default)
|
||||
|
||||
This sets the `Status` aspect of the entity to `Removed`, which hides the entity and all its aspects from being returned by the UI.
|
||||
```
|
||||
datahub delete --urn "<my urn>"
|
||||
```
|
||||
or
|
||||
```
|
||||
datahub delete --urn "<my urn>" --soft
|
||||
```
|
||||
|
||||
### Hard Delete
|
||||
|
||||
This physically deletes all rows for all aspects of the entity. This action cannot be undone, so execute this only after you are sure you want to delete all data associated with this entity.
|
||||
|
||||
```
|
||||
datahub delete --urn "<my urn>"
|
||||
datahub delete --urn "<my urn>" --hard
|
||||
```
|
||||
|
||||
You can optionally add `-n` or `--dry-run` to execute a dry run before issuing the final delete command.
|
||||
@ -95,10 +84,15 @@ datahub ingest show --run-id <run-id>
|
||||
|
||||
to see more info of the run.
|
||||
|
||||
Finally, run
|
||||
Alternately, you can execute a dry-run rollback to achieve the same outcome.
|
||||
```
|
||||
datahub ingest rollback --dry-run --run-id <run-id>
|
||||
```
|
||||
|
||||
Finally, once you are sure you want to delete this data forever, run
|
||||
|
||||
```
|
||||
datahub ingest rollback --run-id <run-id>
|
||||
```
|
||||
|
||||
To rollback all aspects added with this run and all entities created by this run.
|
||||
to rollback all aspects added with this run and all entities created by this run.
|
||||
|
||||
@ -14,6 +14,8 @@ from pydantic import BaseModel, ValidationError
|
||||
from requests.models import Response
|
||||
from requests.sessions import Session
|
||||
|
||||
from datahub.emitter.rest_emitter import _make_curl_command
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_GMS_HOST = "http://localhost:8080"
|
||||
@ -90,6 +92,11 @@ def first_non_null(ls: List[Optional[str]]) -> Optional[str]:
|
||||
return next((el for el in ls if el is not None and el.strip() != ""), None)
|
||||
|
||||
|
||||
def guess_entity_type(urn: str) -> str:
|
||||
assert urn.startswith("urn:li:"), "urns must start with urn:li:"
|
||||
return urn.split(":")[2]
|
||||
|
||||
|
||||
def get_session_and_host():
|
||||
session = requests.Session()
|
||||
|
||||
@ -304,3 +311,41 @@ def get_entity(
|
||||
|
||||
response = session.get(gms_host + endpoint)
|
||||
return response.json()
|
||||
|
||||
|
||||
def post_entity(
|
||||
urn: str,
|
||||
entity_type: str,
|
||||
aspect_name: str,
|
||||
aspect_value: Dict,
|
||||
cached_session_host: Optional[Tuple[Session, str]] = None,
|
||||
) -> Dict:
|
||||
if not cached_session_host:
|
||||
session, gms_host = get_session_and_host()
|
||||
else:
|
||||
session, gms_host = cached_session_host
|
||||
|
||||
endpoint: str = "/aspects/?action=ingestProposal"
|
||||
|
||||
proposal = {
|
||||
"proposal": {
|
||||
"entityType": entity_type,
|
||||
"entityUrn": urn,
|
||||
"aspectName": aspect_name,
|
||||
"changeType": "UPSERT",
|
||||
"aspect": {
|
||||
"contentType": "application/json",
|
||||
"value": json.dumps(aspect_value),
|
||||
},
|
||||
}
|
||||
}
|
||||
payload = json.dumps(proposal)
|
||||
url = gms_host + endpoint
|
||||
curl_command = _make_curl_command(session, "POST", url, payload)
|
||||
log.debug(
|
||||
"Attempting to emit to DataHub GMS; using curl equivalent to:\n%s",
|
||||
curl_command,
|
||||
)
|
||||
response = session.post(url, payload)
|
||||
response.raise_for_status()
|
||||
return response.status_code
|
||||
|
||||
@ -10,6 +10,7 @@ import progressbar
|
||||
from requests import sessions
|
||||
|
||||
from datahub.cli import cli_utils
|
||||
from datahub.cli.cli_utils import guess_entity_type
|
||||
from datahub.emitter import rest_emitter
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.metadata.schema_classes import ChangeTypeClass, StatusClass
|
||||
@ -50,7 +51,7 @@ class DeletionResult:
|
||||
@click.command()
|
||||
@click.option("--urn", required=False, type=str)
|
||||
@click.option("-f", "--force", required=False, is_flag=True)
|
||||
@click.option("--soft", required=False, is_flag=True)
|
||||
@click.option("--soft/--hard", required=False, is_flag=True, default=True)
|
||||
@click.option("-e", "--env", required=False, type=str)
|
||||
@click.option("-p", "--platform", required=False, type=str)
|
||||
@click.option("--entity_type", required=False, type=str, default="dataset")
|
||||
@ -66,7 +67,7 @@ def delete(
|
||||
query: str,
|
||||
dry_run: bool,
|
||||
) -> None:
|
||||
"""Delete a provided URN from datahub"""
|
||||
"""Delete metadata from datahub using a single urn or a combination of filters"""
|
||||
|
||||
# First test connectivity
|
||||
try:
|
||||
@ -96,11 +97,13 @@ def delete(
|
||||
if urn:
|
||||
# Single urn based delete
|
||||
session, host = cli_utils.get_session_and_host()
|
||||
entity_type = guess_entity_type(urn=urn)
|
||||
logger.info(f"DataHub configured with {host}")
|
||||
deletion_result: DeletionResult = delete_one_urn(
|
||||
urn,
|
||||
soft=soft,
|
||||
dry_run=dry_run,
|
||||
entity_type=entity_type,
|
||||
cached_session_host=(session, host),
|
||||
)
|
||||
|
||||
@ -124,7 +127,7 @@ def delete(
|
||||
)
|
||||
|
||||
if not dry_run:
|
||||
message = "soft delete" if soft else "delete"
|
||||
message = "soft delete" if soft else "hard delete"
|
||||
click.echo(
|
||||
f"Took {(deletion_result.end_time_millis-deletion_result.start_time_millis)/1000.0} seconds to {message} {deletion_result.num_records} rows for {deletion_result.num_entities} entities"
|
||||
)
|
||||
@ -182,6 +185,7 @@ def delete_one_urn(
|
||||
urn: str,
|
||||
soft: bool = False,
|
||||
dry_run: bool = False,
|
||||
entity_type: str = "dataset",
|
||||
cached_session_host: Optional[Tuple[sessions.Session, str]] = None,
|
||||
cached_emitter: Optional[rest_emitter.DatahubRestEmitter] = None,
|
||||
) -> DeletionResult:
|
||||
@ -199,7 +203,7 @@ def delete_one_urn(
|
||||
if not dry_run:
|
||||
emitter.emit_mcp(
|
||||
MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
entityType=entity_type,
|
||||
changeType=ChangeTypeClass.UPSERT,
|
||||
entityUrn=urn,
|
||||
aspectName="status",
|
||||
|
||||
@ -3,6 +3,7 @@ import logging
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import click
|
||||
from click.exceptions import UsageError
|
||||
|
||||
from datahub.cli.cli_utils import get_entity
|
||||
|
||||
@ -20,7 +21,10 @@ logger = logging.getLogger(__name__)
|
||||
@click.option("-a", "--aspect", required=False, multiple=True, type=str)
|
||||
@click.pass_context
|
||||
def get(ctx: Any, urn: Optional[str], aspect: List[str]) -> None:
|
||||
"""Get metadata for an entity with an optional list of aspects to project"""
|
||||
if urn is None:
|
||||
if not ctx.args:
|
||||
raise UsageError("Nothing for me to get. Maybe provide an urn?")
|
||||
urn = ctx.args[0]
|
||||
logger.debug(f"Using urn from args {urn}")
|
||||
click.echo(json.dumps(get_entity(urn=urn, aspect=aspect), sort_keys=True, indent=2))
|
||||
|
||||
@ -163,25 +163,27 @@ def show(run_id: str) -> None:
|
||||
|
||||
@ingest.command()
|
||||
@click.option("--run-id", required=True, type=str)
|
||||
def rollback(run_id: str) -> None:
|
||||
@click.option("--dry-run", "-n", required=False, is_flag=True, default=False)
|
||||
def rollback(run_id: str, dry_run: bool) -> None:
|
||||
"""Rollback a provided ingestion run to datahub"""
|
||||
click.confirm(
|
||||
"This will permanently delete data from DataHub. Do you want to continue?",
|
||||
abort=True,
|
||||
)
|
||||
if not dry_run:
|
||||
click.confirm(
|
||||
"This will permanently delete data from DataHub. Do you want to continue?",
|
||||
abort=True,
|
||||
)
|
||||
|
||||
payload_obj = {"runId": run_id, "dryRun": False}
|
||||
payload_obj = {"runId": run_id, "dryRun": dry_run}
|
||||
structured_rows, entities_affected, aspects_affected = post_rollback_endpoint(
|
||||
payload_obj, "/runs?action=rollback"
|
||||
)
|
||||
|
||||
click.echo(
|
||||
"rolling back deletes the entities created by a run and reverts the updated aspects"
|
||||
"Rolling back deletes the entities created by a run and reverts the updated aspects"
|
||||
)
|
||||
click.echo(
|
||||
f"this rollback deleted {entities_affected} entities and rolled back {aspects_affected} aspects"
|
||||
f"This rollback {'will' if dry_run else ''} {'delete' if dry_run else 'deleted'} {entities_affected} entities and {'will roll' if dry_run else 'rolled'} back {aspects_affected} aspects"
|
||||
)
|
||||
click.echo(
|
||||
f"showing first {len(structured_rows)} of {aspects_affected} aspects reverted by this run"
|
||||
f"showing first {len(structured_rows)} of {aspects_affected} aspects {'that will be' if dry_run else ''} reverted by this run"
|
||||
)
|
||||
click.echo(tabulate(structured_rows, RUN_TABLE_COLUMNS, tablefmt="grid"))
|
||||
|
||||
34
metadata-ingestion/src/datahub/cli/put_cli.py
Normal file
34
metadata-ingestion/src/datahub/cli/put_cli.py
Normal file
@ -0,0 +1,34 @@
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
|
||||
from datahub.cli.cli_utils import guess_entity_type, post_entity
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@click.command(
|
||||
name="put",
|
||||
context_settings=dict(
|
||||
ignore_unknown_options=True,
|
||||
allow_extra_args=True,
|
||||
),
|
||||
)
|
||||
@click.option("--urn", required=True, type=str)
|
||||
@click.option("-a", "--aspect", required=True, type=str)
|
||||
@click.option("-d", "--aspect-data", required=True, type=str)
|
||||
@click.pass_context
|
||||
def put(ctx: Any, urn: str, aspect: str, aspect_data: str) -> None:
|
||||
"""Update a single aspect of an entity"""
|
||||
entity_type = guess_entity_type(urn)
|
||||
with open(aspect_data) as fp:
|
||||
aspect_obj = json.load(fp)
|
||||
status = post_entity(
|
||||
urn=urn,
|
||||
aspect_name=aspect,
|
||||
entity_type=entity_type,
|
||||
aspect_value=aspect_obj,
|
||||
)
|
||||
click.secho(f"Update succeeded with status {status}", fg="green")
|
||||
@ -12,6 +12,7 @@ from datahub.cli.delete_cli import delete
|
||||
from datahub.cli.docker import docker
|
||||
from datahub.cli.get_cli import get
|
||||
from datahub.cli.ingest_cli import ingest
|
||||
from datahub.cli.put_cli import put
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -85,6 +86,7 @@ datahub.add_command(docker)
|
||||
datahub.add_command(ingest)
|
||||
datahub.add_command(delete)
|
||||
datahub.add_command(get)
|
||||
datahub.add_command(put)
|
||||
|
||||
|
||||
def main(**kwargs):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user