mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-12 17:34:18 +00:00
feat(datahub-lite): adding tab completion, small serialization fixes (#7079)
This commit is contained in:
parent
bdcc356cc5
commit
f31ff9c91e
@ -1,3 +1,6 @@
|
|||||||
|
import Tabs from '@theme/Tabs';
|
||||||
|
import TabItem from '@theme/TabItem';
|
||||||
|
|
||||||
# DataHub Lite (Experimental)
|
# DataHub Lite (Experimental)
|
||||||
|
|
||||||
## What is it?
|
## What is it?
|
||||||
@ -112,6 +115,12 @@ The `datahub lite` group of commands provides a set of capabilities for you to e
|
|||||||
|
|
||||||
Listing functions like a directory structure that is customized based on the kind of system being explored. DataHub's metadata is automatically organized into databases, tables, views, dashboards, charts, etc.
|
Listing functions like a directory structure that is customized based on the kind of system being explored. DataHub's metadata is automatically organized into databases, tables, views, dashboards, charts, etc.
|
||||||
|
|
||||||
|
:::note
|
||||||
|
|
||||||
|
Using the `ls` command below is much more pleasant when you have tab completion enabled on your shell. Check out the [Setting up Tab Completion](#tab-completion) section at the bottom of the guide.
|
||||||
|
|
||||||
|
:::
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
> datahub lite ls /
|
> datahub lite ls /
|
||||||
databases
|
databases
|
||||||
@ -133,6 +142,9 @@ metadata_index
|
|||||||
metadata_aspect_v2
|
metadata_aspect_v2
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
### Read (get)
|
### Read (get)
|
||||||
|
|
||||||
Once you have located a path of interest, you can read metadata at that entity, by issuing a **get**. You can additionally filter the metadata retrieved from an entity by the aspect type of the metadata (e.g. to request the schema, filter by the **schemaMetadata** aspect).
|
Once you have located a path of interest, you can read metadata at that entity, by issuing a **get**. You can additionally filter the metadata retrieved from an entity by the aspect type of the metadata (e.g. to request the schema, filter by the **schemaMetadata** aspect).
|
||||||
@ -157,7 +169,7 @@ Get metadata for an entity by path
|
|||||||
</summary>
|
</summary>
|
||||||
|
|
||||||
```json
|
```json
|
||||||
> datahub lite get /databases/mysql/instances/default/databases/datahub/tables/metadata_aspect_v2
|
> datahub lite get --path /databases/mysql/instances/default/databases/datahub/tables/metadata_aspect_v2
|
||||||
{
|
{
|
||||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)",
|
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)",
|
||||||
"container": {
|
"container": {
|
||||||
@ -313,7 +325,7 @@ Get metadata for an entity by path
|
|||||||
#### Get metadata for an entity filtered by specific aspect
|
#### Get metadata for an entity filtered by specific aspect
|
||||||
|
|
||||||
```json
|
```json
|
||||||
> datahub lite get /databases/mysql/instances/default/databases/datahub/tables/metadata_aspect_v2 --aspect status
|
> datahub lite get --path /databases/mysql/instances/default/databases/datahub/tables/metadata_aspect_v2 --aspect status
|
||||||
{
|
{
|
||||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)",
|
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)",
|
||||||
"status": {
|
"status": {
|
||||||
@ -324,10 +336,17 @@ Get metadata for an entity by path
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
:::note
|
||||||
|
|
||||||
|
Using the `get` command by path is much more pleasant when you have tab completion enabled on your shell. Check out the [Setting up Tab Completion](#tab-completion) section at the bottom of the guide.
|
||||||
|
|
||||||
|
:::
|
||||||
|
|
||||||
|
|
||||||
#### Get metadata using the urn of the entity
|
#### Get metadata using the urn of the entity
|
||||||
|
|
||||||
```json
|
```json
|
||||||
> datahub lite get "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)" --aspect status
|
> datahub lite get --urn "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)" --aspect status
|
||||||
{
|
{
|
||||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)",
|
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)",
|
||||||
"status": {
|
"status": {
|
||||||
@ -344,7 +363,7 @@ Get metadata with additional details (systemMetadata)
|
|||||||
</summary>
|
</summary>
|
||||||
|
|
||||||
```json
|
```json
|
||||||
> datahub lite get /databases/mysql/instances/default/databases/datahub/tables/metadata_aspect_v2 --aspect status --verbose
|
> datahub lite get --path /databases/mysql/instances/default/databases/datahub/tables/metadata_aspect_v2 --aspect status --verbose
|
||||||
{
|
{
|
||||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)",
|
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)",
|
||||||
"status": {
|
"status": {
|
||||||
@ -519,3 +538,80 @@ DataHub Lite maintains a few derived tables to make access possible via both the
|
|||||||
## Caveat Emptor!
|
## Caveat Emptor!
|
||||||
|
|
||||||
DataHub Lite is a very new project. Do not use it for production use-cases. The API-s and storage formats are subject to change and we get feedback from early adopters. That said, we are really interested in accepting PR-s and suggestions for improvements to this fledgling project.
|
DataHub Lite is a very new project. Do not use it for production use-cases. The API-s and storage formats are subject to change and we get feedback from early adopters. That said, we are really interested in accepting PR-s and suggestions for improvements to this fledgling project.
|
||||||
|
|
||||||
|
|
||||||
|
## Advanced Options
|
||||||
|
|
||||||
|
### Tab Completion
|
||||||
|
|
||||||
|
Using the datahub lite commands like `ls` or `get` is much more pleasant when you have tab completion enabled on your shell. Tab completion is supported on the command line through the [Click Shell completion](https://click.palletsprojects.com/en/8.1.x/shell-completion/) module.
|
||||||
|
To set up shell completion for your shell, follow the instructions below:
|
||||||
|
|
||||||
|
#### Option 1 (inline eval)
|
||||||
|
<Tabs>
|
||||||
|
<TabItem value="zsh" label="Zsh" default>
|
||||||
|
|
||||||
|
Add this to ~/.zshrc:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
eval "$(_DATAHUB_COMPLETE=zsh_source datahub)"
|
||||||
|
```
|
||||||
|
|
||||||
|
</TabItem>
|
||||||
|
<TabItem value="bash" label="Bash">
|
||||||
|
|
||||||
|
Add this to ~/.bashrc:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
eval "$(_DATAHUB_COMPLETE=bash_source datahub)"
|
||||||
|
```
|
||||||
|
|
||||||
|
</TabItem>
|
||||||
|
|
||||||
|
</Tabs>
|
||||||
|
|
||||||
|
#### Option 2 (external completion script)
|
||||||
|
|
||||||
|
Using eval means that the command is invoked and evaluated every time a shell is started, which can delay shell responsiveness. To speed it up, write the generated script to a file, then source that.
|
||||||
|
|
||||||
|
<Tabs>
|
||||||
|
<TabItem value="zsh" label="Zsh" default>
|
||||||
|
|
||||||
|
Save the script somewhere.
|
||||||
|
|
||||||
|
```shell
|
||||||
|
_DATAHUB_COMPLETE=zsh_source datahub > ~/.datahub-complete.zsh
|
||||||
|
```
|
||||||
|
|
||||||
|
Source the file in ~/.zshrc.
|
||||||
|
|
||||||
|
```shell
|
||||||
|
. ~/.datahub-complete.zsh
|
||||||
|
```
|
||||||
|
|
||||||
|
</TabItem>
|
||||||
|
<TabItem value="bash" label="Bash">
|
||||||
|
|
||||||
|
```shell
|
||||||
|
_DATAHUB_COMPLETE=bash_source datahub > ~/.datahub-complete.bash
|
||||||
|
```
|
||||||
|
|
||||||
|
Source the file in ~/.bashrc.
|
||||||
|
|
||||||
|
```shell
|
||||||
|
. ~/.datahub-complete.bash
|
||||||
|
```
|
||||||
|
|
||||||
|
</TabItem>
|
||||||
|
|
||||||
|
<TabItem value="fish" label="Fish">
|
||||||
|
|
||||||
|
Save the script to ~/.config/fish/completions/datahub.fish:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
_DATAHUB_COMPLETE=fish_source datahub > ~/.config/fish/completions/datahub.fish
|
||||||
|
```
|
||||||
|
|
||||||
|
</TabItem>
|
||||||
|
</Tabs>
|
||||||
|
|
||||||
|
|||||||
@ -6,6 +6,7 @@ from datetime import datetime
|
|||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
from click.shell_completion import CompletionItem
|
||||||
from click_default_group import DefaultGroup
|
from click_default_group import DefaultGroup
|
||||||
|
|
||||||
from datahub.cli.cli_utils import (
|
from datahub.cli.cli_utils import (
|
||||||
@ -73,9 +74,36 @@ def list_urns() -> None:
|
|||||||
click.echo(result)
|
click.echo(result)
|
||||||
|
|
||||||
|
|
||||||
|
class CompleteablePath(click.ParamType):
|
||||||
|
name = "path"
|
||||||
|
|
||||||
|
def shell_complete(self, ctx, param, incomplete):
|
||||||
|
path = incomplete or "/"
|
||||||
|
lite = _get_datahub_lite(read_only=True)
|
||||||
|
try:
|
||||||
|
completions = lite.ls(path)
|
||||||
|
return [
|
||||||
|
CompletionItem(browseable.auto_complete.suggested_path, type="plain")
|
||||||
|
if browseable.auto_complete
|
||||||
|
else CompletionItem(
|
||||||
|
f"{incomplete}/{browseable.name}".replace("//", "/")
|
||||||
|
)
|
||||||
|
for browseable in completions
|
||||||
|
if not browseable.leaf
|
||||||
|
]
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"failed with {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
@lite.command(context_settings=dict(allow_extra_args=True))
|
@lite.command(context_settings=dict(allow_extra_args=True))
|
||||||
@click.option("--urn", required=False, type=str, help="Get metadata rooted at an urn")
|
@click.option("--urn", required=False, type=str, help="Get metadata rooted at an urn")
|
||||||
@click.option("--path", required=False, type=str, help="Get metadata rooted at a path")
|
@click.option(
|
||||||
|
"--path",
|
||||||
|
required=False,
|
||||||
|
type=CompleteablePath(),
|
||||||
|
help="Get metadata rooted at a path",
|
||||||
|
)
|
||||||
@click.option("-a", "--aspect", required=False, multiple=True, type=str)
|
@click.option("-a", "--aspect", required=False, multiple=True, type=str)
|
||||||
@click.option("--asof", required=False, type=click.DateTime(formats=["%Y-%m-%d"]))
|
@click.option("--asof", required=False, type=click.DateTime(formats=["%Y-%m-%d"]))
|
||||||
@click.option("--verbose", required=False, is_flag=True, default=False)
|
@click.option("--verbose", required=False, is_flag=True, default=False)
|
||||||
@ -90,7 +118,7 @@ def get(
|
|||||||
verbose: bool,
|
verbose: bool,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Get one or more metadata elements"""
|
"""Get one or more metadata elements"""
|
||||||
|
start_time = time.time()
|
||||||
if urn is None and path is None:
|
if urn is None and path is None:
|
||||||
if not ctx.args:
|
if not ctx.args:
|
||||||
raise click.UsageError(
|
raise click.UsageError(
|
||||||
@ -142,6 +170,8 @@ def get(
|
|||||||
indent=2,
|
indent=2,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
end_time = time.time()
|
||||||
|
logger.debug(f"Time taken: {int((end_time - start_time)*1000.0)} millis")
|
||||||
|
|
||||||
|
|
||||||
@lite.command()
|
@lite.command()
|
||||||
@ -182,16 +212,19 @@ def serve(port: int) -> None:
|
|||||||
|
|
||||||
|
|
||||||
@lite.command(context_settings=dict(allow_extra_args=True))
|
@lite.command(context_settings=dict(allow_extra_args=True))
|
||||||
@click.argument("path", required=False)
|
@click.argument("path", required=False, type=CompleteablePath())
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
@telemetry.with_telemetry
|
@telemetry.with_telemetry
|
||||||
def ls(ctx: click.Context, path: Optional[str]) -> None:
|
def ls(ctx: click.Context, path: Optional[str]) -> None:
|
||||||
"""List at a path"""
|
"""List at a path"""
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
path = path or "/"
|
path = path or "/"
|
||||||
lite = _get_datahub_lite(read_only=True)
|
lite = _get_datahub_lite(read_only=True)
|
||||||
try:
|
try:
|
||||||
browseables = lite.ls(path)
|
browseables = lite.ls(path)
|
||||||
|
end_time = time.time()
|
||||||
|
logger.debug(f"Time taken: {int((end_time - start_time)*1000.0)} millis")
|
||||||
auto_complete: List[AutoComplete] = [
|
auto_complete: List[AutoComplete] = [
|
||||||
b.auto_complete for b in browseables if b.auto_complete is not None
|
b.auto_complete for b in browseables if b.auto_complete is not None
|
||||||
]
|
]
|
||||||
|
|||||||
@ -10,6 +10,7 @@ from datahub.configuration.common import ConfigModel
|
|||||||
from datahub.emitter.aspect import ASPECT_MAP
|
from datahub.emitter.aspect import ASPECT_MAP
|
||||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||||
from datahub.emitter.mcp_builder import mcps_from_mce
|
from datahub.emitter.mcp_builder import mcps_from_mce
|
||||||
|
from datahub.emitter.serialization_helper import post_json_transform
|
||||||
from datahub.lite.lite_local import (
|
from datahub.lite.lite_local import (
|
||||||
AutoComplete,
|
AutoComplete,
|
||||||
Browseable,
|
Browseable,
|
||||||
@ -260,7 +261,7 @@ class DuckDBLite(DataHubLiteLocal[DuckDBLiteConfig]):
|
|||||||
aspect: Union[dict, _Aspect] = json.loads(r[2])
|
aspect: Union[dict, _Aspect] = json.loads(r[2])
|
||||||
if typed:
|
if typed:
|
||||||
assert isinstance(aspect, dict)
|
assert isinstance(aspect, dict)
|
||||||
aspect = ASPECT_MAP[aspect_name].from_obj(aspect)
|
aspect = ASPECT_MAP[aspect_name].from_obj(post_json_transform(aspect))
|
||||||
|
|
||||||
result_map[aspect_name] = {"value": aspect}
|
result_map[aspect_name] = {"value": aspect}
|
||||||
if details:
|
if details:
|
||||||
@ -496,7 +497,9 @@ class DuckDBLite(DataHubLiteLocal[DuckDBLiteConfig]):
|
|||||||
aspect_name in ASPECT_MAP
|
aspect_name in ASPECT_MAP
|
||||||
), f"Missing aspect name {aspect_name} in the registry"
|
), f"Missing aspect name {aspect_name} in the registry"
|
||||||
try:
|
try:
|
||||||
aspect_payload = ASPECT_MAP[aspect_name].from_obj(aspect_payload)
|
aspect_payload = ASPECT_MAP[aspect_name].from_obj(
|
||||||
|
post_json_transform(aspect_payload)
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Failed to process urn: {urn}, aspect_name: {aspect_name}, metadata: {aspect_payload}",
|
f"Failed to process urn: {urn}, aspect_name: {aspect_name}, metadata: {aspect_payload}",
|
||||||
@ -524,7 +527,7 @@ class DuckDBLite(DataHubLiteLocal[DuckDBLiteConfig]):
|
|||||||
for r in results.fetchall():
|
for r in results.fetchall():
|
||||||
urn = r[0]
|
urn = r[0]
|
||||||
aspect_name = r[1]
|
aspect_name = r[1]
|
||||||
aspect_metadata = ASPECT_MAP[aspect_name].from_obj(json.loads(r[2])) # type: ignore
|
aspect_metadata = ASPECT_MAP[aspect_name].from_obj(post_json_transform(json.loads(r[2]))) # type: ignore
|
||||||
system_metadata = SystemMetadataClass.from_obj(json.loads(r[3]))
|
system_metadata = SystemMetadataClass.from_obj(json.loads(r[3]))
|
||||||
mcp = MetadataChangeProposalWrapper(
|
mcp = MetadataChangeProposalWrapper(
|
||||||
entityUrn=urn,
|
entityUrn=urn,
|
||||||
@ -560,17 +563,17 @@ class DuckDBLite(DataHubLiteLocal[DuckDBLiteConfig]):
|
|||||||
"iceberg",
|
"iceberg",
|
||||||
"trino",
|
"trino",
|
||||||
],
|
],
|
||||||
"streaming_systems": ["kafka"],
|
"streaming": ["kafka"],
|
||||||
"orchestrators": ["airflow", "spark"],
|
"orchestrators": ["airflow", "spark"],
|
||||||
"data_movers": ["kafka-connect", "nifi"],
|
"data_movers": ["kafka-connect", "nifi"],
|
||||||
"transformation_tools": ["dbt"],
|
"transformation_tools": ["dbt"],
|
||||||
"data_quality_tools": ["great-expectations"],
|
"data_quality": ["great-expectations"],
|
||||||
}
|
}
|
||||||
for k, v in category_to_platform_map.items():
|
for k, v in category_to_platform_map.items():
|
||||||
if data_platform_urn.get_entity_id_as_string() in v:
|
if data_platform_urn.get_entity_id_as_string() in v:
|
||||||
return Urn(entity_type="systemNode", entity_id=[k])
|
return Urn(entity_type="systemNode", entity_id=[k])
|
||||||
|
|
||||||
logger.warning(
|
logger.debug(
|
||||||
f"Failed to find category for platform {data_platform_urn}, mapping to generic data_platform"
|
f"Failed to find category for platform {data_platform_urn}, mapping to generic data_platform"
|
||||||
)
|
)
|
||||||
return Urn(entity_type="systemNode", entity_id=["data_platforms"])
|
return Urn(entity_type="systemNode", entity_id=["data_platforms"])
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user