mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-02 04:26:21 +00:00
feat(ingest(cli): add domain helper, add progressbar (#12436)
This commit is contained in:
parent
3471857893
commit
8eda51eff0
@ -287,7 +287,9 @@ A group of commands to interact with containers in DataHub.
|
|||||||
e.g. You can use this to apply a tag to all datasets recursively in this container.
|
e.g. You can use this to apply a tag to all datasets recursively in this container.
|
||||||
```shell
|
```shell
|
||||||
datahub container tag --container-urn "urn:li:container:0e9e46bd6d5cf645f33d5a8f0254bc2d" --tag-urn "urn:li:tag:tag1"
|
datahub container tag --container-urn "urn:li:container:0e9e46bd6d5cf645f33d5a8f0254bc2d" --tag-urn "urn:li:tag:tag1"
|
||||||
|
datahub container domain --container-urn "urn:li:container:3f2effd1fbe154a4d60b597263a41e41" --domain-urn "urn:li:domain:ajsajo-b832-4ab3-8881-7ed5e991a44c"
|
||||||
|
datahub container owner --container-urn "urn:li:container:3f2effd1fbe154a4d60b597263a41e41" --owner-urn "urn:li:corpGroup:eng@example.com"
|
||||||
|
datahub container term --container-urn "urn:li:container:3f2effd1fbe154a4d60b597263a41e41" --term-urn "urn:li:term:PII"
|
||||||
```
|
```
|
||||||
|
|
||||||
### check
|
### check
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
import logging
|
import logging
|
||||||
from typing import List
|
from typing import Any, List
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
import progressbar
|
||||||
|
|
||||||
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||||
from datahub.ingestion.graph.client import get_default_graph
|
from datahub.ingestion.graph.client import get_default_graph
|
||||||
from datahub.metadata.schema_classes import (
|
from datahub.metadata.schema_classes import (
|
||||||
|
DomainsClass,
|
||||||
GlossaryTermAssociationClass,
|
GlossaryTermAssociationClass,
|
||||||
OwnerClass,
|
OwnerClass,
|
||||||
OwnershipTypeClass,
|
OwnershipTypeClass,
|
||||||
@ -27,12 +30,12 @@ def apply_association_to_container(
|
|||||||
association_type: str,
|
association_type: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Common function to add either tags, terms, or owners to child datasets (for now).
|
Common function to add either tags, terms, domains, or owners to child datasets (for now).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
container_urn: The URN of the container
|
container_urn: The URN of the container
|
||||||
association_urn: The URN of the tag, term, or user to apply
|
association_urn: The URN of the tag, term, or user to apply
|
||||||
association_type: One of 'tag', 'term', or 'owner'
|
association_type: One of 'tag', 'term', 'domain' or 'owner'
|
||||||
"""
|
"""
|
||||||
urns: List[str] = []
|
urns: List[str] = []
|
||||||
graph = get_default_graph()
|
graph = get_default_graph()
|
||||||
@ -43,10 +46,10 @@ def apply_association_to_container(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
all_patches: List[Any] = []
|
||||||
for urn in urns:
|
for urn in urns:
|
||||||
logger.info(f"Adding {association_type} {association_urn} to {urn}")
|
|
||||||
builder = DatasetPatchBuilder(urn)
|
builder = DatasetPatchBuilder(urn)
|
||||||
|
patches: List[Any] = []
|
||||||
if association_type == "tag":
|
if association_type == "tag":
|
||||||
patches = builder.add_tag(TagAssociationClass(association_urn)).build()
|
patches = builder.add_tag(TagAssociationClass(association_urn)).build()
|
||||||
elif association_type == "term":
|
elif association_type == "term":
|
||||||
@ -60,9 +63,17 @@ def apply_association_to_container(
|
|||||||
type=OwnershipTypeClass.TECHNICAL_OWNER,
|
type=OwnershipTypeClass.TECHNICAL_OWNER,
|
||||||
)
|
)
|
||||||
).build()
|
).build()
|
||||||
|
elif association_type == "domain":
|
||||||
for mcp in patches:
|
patches = [
|
||||||
graph.emit(mcp)
|
MetadataChangeProposalWrapper(
|
||||||
|
entityUrn=urn,
|
||||||
|
aspect=DomainsClass(domains=[association_urn]),
|
||||||
|
)
|
||||||
|
]
|
||||||
|
all_patches.extend(patches)
|
||||||
|
mcps_iter = progressbar.progressbar(all_patches, redirect_stdout=True)
|
||||||
|
for mcp in mcps_iter:
|
||||||
|
graph.emit(mcp)
|
||||||
|
|
||||||
|
|
||||||
@container.command()
|
@container.command()
|
||||||
@ -83,7 +94,15 @@ def term(container_urn: str, term_urn: str) -> None:
|
|||||||
|
|
||||||
@container.command()
|
@container.command()
|
||||||
@click.option("--container-urn", required=True, type=str)
|
@click.option("--container-urn", required=True, type=str)
|
||||||
@click.option("--owner-id", required=True, type=str)
|
@click.option("--owner-urn", required=True, type=str)
|
||||||
def owner(container_urn: str, owner_id: str) -> None:
|
def owner(container_urn: str, owner_urn: str) -> None:
|
||||||
"""Add patch to add a owner to all datasets in a container"""
|
"""Add patch to add a owner to all datasets in a container"""
|
||||||
apply_association_to_container(container_urn, owner_id, "owner")
|
apply_association_to_container(container_urn, owner_urn, "owner")
|
||||||
|
|
||||||
|
|
||||||
|
@container.command()
|
||||||
|
@click.option("--container-urn", required=True, type=str)
|
||||||
|
@click.option("--domain-urn", required=True, type=str)
|
||||||
|
def domain(container_urn: str, domain_urn: str) -> None:
|
||||||
|
"""Add patch to add a domain to all datasets in a container"""
|
||||||
|
apply_association_to_container(container_urn, domain_urn, "domain")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user