feat(cli): add user and group commands to create users and groups (#7595)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Shirshanka Das 2023-03-22 18:44:01 -07:00 committed by GitHub
parent 1938b843cf
commit f872ca9cba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 962 additions and 59 deletions

View File

@ -0,0 +1,35 @@
import json
from typing import List, Union
from pydantic import BaseModel
from datahub.api.entities.corpgroup.corpgroup import CorpGroup
from datahub.api.entities.corpuser.corpuser import CorpUser
"""
A handy script to generate schemas for the CorpUser and CorpGroup file formats
"""
class CorpUserList(BaseModel):
__root__: List[CorpUser]
class CorpGroupList(BaseModel):
__root__: List[CorpGroup]
class CorpUserFile(BaseModel):
__root__: Union[CorpUser, CorpUserList]
class CorpGroupFile(BaseModel):
__root__: Union[CorpGroup, CorpGroupList]
with open("user/user.dhub.yaml_schema.json", "w") as fp:
fp.write(json.dumps(CorpUserFile.schema(), indent=4))
with open("group/group.dhub.yaml_schema.json", "w") as fp:
fp.write(json.dumps(CorpGroupFile.schema(), indent=4))

View File

@ -0,0 +1,9 @@
id: foogroup@acryl.io
display_name: Foo Group
admins:
- datahub
members:
- bar@acryl.io # refer to a user either by id or by urn
- id: joe@acryl.io # inline specification of user
slack: "@joe_shmoe"
display_name: "Joe's Hub"

View File

@ -0,0 +1,159 @@
{
"title": "CorpGroupFile",
"anyOf": [
{
"$ref": "#/definitions/CorpGroup"
},
{
"$ref": "#/definitions/CorpGroupList"
}
],
"definitions": {
"CorpUser": {
"title": "CorpUser",
"description": "This is a CorpUser class which represents a CorpUser\n\nArgs:\n id (str): The id of the user\n display_name (Optional[str]): The name of the user to display in the UI\n email (Optional[str]): email address of this user\n title (Optional[str]): title of this user\n manager_urn (Optional[str]): direct manager of this user\n department_id (Optional[int]): department id this user belongs to\n department_name (Optional[str]): department name this user belongs to\n first_name (Optional[str]): first name of this user\n last_name (Optional[str]): last name of this user\n full_name (Optional[str]): Common name of this user, format is firstName + lastName (split by a whitespace)\n country_code (Optional[str]): two uppercase letters country code. e.g. US\n groups (List[str]): List of group ids the user belongs to\n description (Optional[str]): A description string for the user\n slack (Optional[str]): Slack handle for the user",
"type": "object",
"properties": {
"id": {
"title": "Id",
"type": "string"
},
"display_name": {
"title": "Display Name",
"type": "string"
},
"email": {
"title": "Email",
"type": "string"
},
"title": {
"title": "Title",
"type": "string"
},
"manager_urn": {
"title": "Manager Urn",
"type": "string"
},
"department_id": {
"title": "Department Id",
"type": "integer"
},
"department_name": {
"title": "Department Name",
"type": "string"
},
"first_name": {
"title": "First Name",
"type": "string"
},
"last_name": {
"title": "Last Name",
"type": "string"
},
"full_name": {
"title": "Full Name",
"type": "string"
},
"country_code": {
"title": "Country Code",
"type": "string"
},
"groups": {
"title": "Groups",
"type": "array",
"items": {
"type": "string"
}
},
"description": {
"title": "Description",
"type": "string"
},
"slack": {
"title": "Slack",
"type": "string"
}
},
"required": [
"id"
],
"additionalProperties": false
},
"CorpGroup": {
"title": "CorpGroup",
"description": "This is a CorpGroup class which represents a CorpGroup\n\nArgs:\n id (str): The id of the group\n display_name (Optional[str]): The name of the group\n email (Optional[str]): email of this group\n description (Optional[str]): A description of the group\n overrideEditable (bool): If True, group information that is editable in the UI will be overridden\n picture_link (Optional[str]): A URL which points to a picture which user wants to set as the photo for the group\n slack (Optional[str]): Slack channel for the group\n admins (List[Union[str, CorpUser]]): A list of administrator ids (or urns) for the group. You can also provide the user record for the admin inline within this section\n members (List[Union[str, CorpUser]]): A list of member ids (or urns) for the group.",
"type": "object",
"properties": {
"id": {
"title": "Id",
"type": "string"
},
"display_name": {
"title": "Display Name",
"type": "string"
},
"email": {
"title": "Email",
"type": "string"
},
"description": {
"title": "Description",
"type": "string"
},
"overrideEditable": {
"title": "Overrideeditable",
"default": false,
"type": "boolean"
},
"picture_link": {
"title": "Picture Link",
"type": "string"
},
"slack": {
"title": "Slack",
"type": "string"
},
"admins": {
"title": "Admins",
"default": [],
"type": "array",
"items": {
"anyOf": [
{
"type": "string"
},
{
"$ref": "#/definitions/CorpUser"
}
]
}
},
"members": {
"title": "Members",
"default": [],
"type": "array",
"items": {
"anyOf": [
{
"type": "string"
},
{
"$ref": "#/definitions/CorpUser"
}
]
}
}
},
"required": [
"id"
]
},
"CorpGroupList": {
"title": "CorpGroupList",
"type": "array",
"items": {
"$ref": "#/definitions/CorpGroup"
}
}
}
}

View File

@ -0,0 +1 @@
datahub group upsert -f foo.group.dhub.yaml # --override-editable

View File

@ -0,0 +1,13 @@
- id: bar@acryl.io
first_name: The
last_name: Bar
email: bar@acryl.io
slack: "@the_bar_raiser"
description: "I like raising the bar higher"
groups:
- foogroup@acryl.io
- id: datahub
slack: "@datahubproject"
phone: "1-800-GOT-META"
description: "The DataHub Project"
picture_link: "https://raw.githubusercontent.com/datahub-project/datahub/master/datahub-web-react/src/images/datahub-logo-color-stable.svg"

View File

@ -0,0 +1 @@
datahub user upsert -f bar.user.dhub.yaml # --override-editable

View File

@ -0,0 +1,90 @@
{
"title": "CorpUserFile",
"anyOf": [
{
"$ref": "#/definitions/CorpUser"
},
{
"$ref": "#/definitions/CorpUserList"
}
],
"definitions": {
"CorpUser": {
"title": "CorpUser",
"description": "This is a CorpUser class which represents a CorpUser\n\nArgs:\n id (str): The id of the user\n display_name (Optional[str]): The name of the user to display in the UI\n email (Optional[str]): email address of this user\n title (Optional[str]): title of this user\n manager_urn (Optional[str]): direct manager of this user\n department_id (Optional[int]): department id this user belongs to\n department_name (Optional[str]): department name this user belongs to\n first_name (Optional[str]): first name of this user\n last_name (Optional[str]): last name of this user\n full_name (Optional[str]): Common name of this user, format is firstName + lastName (split by a whitespace)\n country_code (Optional[str]): two uppercase letters country code. e.g. US\n groups (List[str]): List of group ids the user belongs to\n description (Optional[str]): A description string for the user\n slack (Optional[str]): Slack handle for the user",
"type": "object",
"properties": {
"id": {
"title": "Id",
"type": "string"
},
"display_name": {
"title": "Display Name",
"type": "string"
},
"email": {
"title": "Email",
"type": "string"
},
"title": {
"title": "Title",
"type": "string"
},
"manager_urn": {
"title": "Manager Urn",
"type": "string"
},
"department_id": {
"title": "Department Id",
"type": "integer"
},
"department_name": {
"title": "Department Name",
"type": "string"
},
"first_name": {
"title": "First Name",
"type": "string"
},
"last_name": {
"title": "Last Name",
"type": "string"
},
"full_name": {
"title": "Full Name",
"type": "string"
},
"country_code": {
"title": "Country Code",
"type": "string"
},
"groups": {
"title": "Groups",
"type": "array",
"items": {
"type": "string"
}
},
"description": {
"title": "Description",
"type": "string"
},
"slack": {
"title": "Slack",
"type": "string"
}
},
"required": [
"id"
],
"additionalProperties": false
},
"CorpUserList": {
"title": "CorpUserList",
"type": "array",
"items": {
"$ref": "#/definitions/CorpUser"
}
}
}
}

View File

@ -1,20 +1,44 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, Iterable, Optional, Union
import logging
from dataclasses import dataclass
from functools import lru_cache
from typing import Callable, Iterable, List, Optional, Union
import pydantic
from pydantic import BaseModel
import datahub.emitter.mce_builder as builder
from datahub.api.entities.corpuser.corpuser import CorpUser, CorpUserGenerationConfig
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import (
CorpGroupEditableInfoClass,
CorpGroupInfoClass,
GroupMembershipClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
StatusClass,
_Aspect,
)
logger = logging.getLogger(__name__)
@dataclass
class CorpGroup:
class CorpGroupGenerationConfig:
"""
A container for configuration for generation of mcp-s from CorpGroup instances
"""
override_editable: bool = False
datahub_graph: Optional[DataHubGraph] = None
class CorpGroup(BaseModel):
"""This is a CorpGroup class which represents a CorpGroup
Args:
@ -25,10 +49,11 @@ class CorpGroup:
overrideEditable (bool): If True, group information that is editable in the UI will be overridden
picture_link (Optional[str]): A URL which points to a picture which user wants to set as the photo for the group
slack (Optional[str]): Slack channel for the group
admins (List[Union[str, CorpUser]]): A list of administrator ids (or urns) for the group. You can also provide the user record for the admin inline within this section
members (List[Union[str, CorpUser]]): A list of member ids (or urns) for the group.
"""
id: str
urn: str = field(init=False)
# These are for CorpGroupInfo
display_name: Optional[str] = None
@ -39,35 +64,152 @@ class CorpGroup:
overrideEditable: bool = False
picture_link: Optional[str] = None
slack: Optional[str] = None
admins: List[Union[str, CorpUser]] = []
members: List[Union[str, CorpUser]] = []
def __post_init__(self):
self.urn = builder.make_group_urn(self.id)
@pydantic.validator("admins", "members", each_item=True)
def make_urn_if_needed(v):
if isinstance(v, str):
return builder.make_user_urn(v)
return v
def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
if self.overrideEditable:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=CorpGroupEditableInfoClass(
@property
def urn(self):
return builder.make_group_urn(self.id)
def _needs_editable_aspect(self) -> bool:
return bool(self.picture_link)
def generate_mcp(
self, generation_config: CorpGroupGenerationConfig = CorpGroupGenerationConfig()
) -> Iterable[MetadataChangeProposalWrapper]:
urns_created = set() # dedup member creation on the way out
members_to_create: List[CorpUser] = (
[u for u in self.members if isinstance(u, CorpUser)] if self.members else []
)
admins_to_create: List[CorpUser] = (
[u for u in self.admins if isinstance(u, CorpUser)] if self.admins else []
)
member_urns: List[str] = (
[u.urn if isinstance(u, CorpUser) else u for u in self.members]
if self.members
else []
)
admin_urns: List[str] = (
[u.urn if isinstance(u, CorpUser) else u for u in self.admins]
if self.admins
else []
)
for m in members_to_create + admins_to_create:
if m.urn not in urns_created:
yield from m.generate_mcp(
generation_config=CorpUserGenerationConfig(
override_editable=generation_config.override_editable
)
)
urns_created.add(m.urn)
else:
logger.warn(
f"Supressing emission of member {m.urn} before we already emitted metadata for it"
)
aspects: List[_Aspect] = [StatusClass(removed=False)]
if generation_config.override_editable:
aspects.append(
CorpGroupEditableInfoClass(
description=self.description,
pictureLink=self.picture_link,
slack=self.slack,
email=self.email,
),
)
)
yield mcp
else:
aspects.append(
CorpGroupInfoClass(
admins=admin_urns, # deprecated but we fill it out for consistency
members=member_urns, # deprecated but we fill it out for consistency
groups=[], # deprecated
displayName=self.display_name,
email=self.email,
description=self.description,
slack=self.slack,
)
)
# picture link is only available in the editable aspect, so we have to use it if it is provided
if self._needs_editable_aspect():
aspects.append(
CorpGroupEditableInfoClass(
description=self.description,
pictureLink=self.picture_link,
slack=self.slack,
email=self.email,
)
)
for aspect in aspects:
yield MetadataChangeProposalWrapper(entityUrn=self.urn, aspect=aspect)
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=CorpGroupInfoClass(
admins=[], # Deprecated, replaced by Ownership aspect
members=[], # Deprecated, replaced by GroupMembership aspect
groups=[], # Deprecated, this field is unused
displayName=self.display_name,
email=self.email,
description=self.description,
),
# Unfortunately, admins and members fields in CorpGroupInfo has been deprecated
# So we need to emit Ownership and GroupMembership oriented to the individual users
# TODO: Move this to PATCH MCP-s once these aspects are supported via patch.
if generation_config.datahub_graph is not None:
datahub_graph = generation_config.datahub_graph
for urn in admin_urns:
ownership = datahub_graph.get_aspect(
urn, OwnershipClass
) or OwnershipClass(owners=[])
if self.urn not in [owner.owner for owner in ownership.owners]:
ownership.owners = ownership.owners + [
OwnerClass(owner=urn, type=OwnershipTypeClass.TECHNICAL_OWNER)
]
yield MetadataChangeProposalWrapper(
entityUrn=self.urn, aspect=ownership
)
for urn in member_urns:
group_membership = datahub_graph.get_aspect(
urn, GroupMembershipClass
) or GroupMembershipClass(groups=[])
if self.urn not in group_membership.groups:
group_membership.groups = sorted(
set(group_membership.groups + [self.urn])
)
yield MetadataChangeProposalWrapper(
entityUrn=urn, aspect=group_membership
)
else:
if admin_urns or member_urns:
raise Exception(
"Unable to emit group ownership because admins or members are non-empty, and a DataHubGraph instance was not provided."
)
# emit status aspects for all user urns referenced (to ensure they get created)
for urn in set(admin_urns).union(set(member_urns)):
yield MetadataChangeProposalWrapper(
entityUrn=urn, aspect=StatusClass(removed=False)
)
@lru_cache(maxsize=32)
def _datahub_graph_from_datahub_rest_emitter(
self, rest_emitter: DatahubRestEmitter
) -> DataHubGraph:
"""
Create a datahub graph instance from a REST Emitter.
A stop-gap implementation which is expected to be removed after PATCH support is implemented
for membership updates for users <-> groups
"""
graph = DataHubGraph(
config=DatahubClientConfig(
server=rest_emitter._gms_server,
token=rest_emitter._token,
timeout_sec=rest_emitter._connect_timeout_sec,
retry_status_codes=rest_emitter._retry_status_codes,
extra_headers=rest_emitter._session.headers,
disable_ssl_verification=rest_emitter._session.verify is False,
)
)
yield mcp
return graph
def emit(
self,
@ -75,10 +217,24 @@ class CorpGroup:
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
"""
Emit the CorpGroup entity to Datahub
A convenience method to emit the CorpGroup entity to DataHub using an emitter.
See also: generate_mcp to have finer grain control over mcp routing
:param emitter: Datahub Emitter to emit the proccess event
:param callback: The callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
datahub_graph = emitter if isinstance(emitter, DataHubGraph) else None
if not datahub_graph:
if isinstance(emitter, DatahubRestEmitter):
# create a datahub graph instance from the emitter
# this code path exists mainly for backwards compatibility with existing callers
# who are passing in a DataHubRestEmitter today
# we won't need this in the future once PATCH support is implemented as all emitters
# will work
datahub_graph = self._datahub_graph_from_datahub_rest_emitter(emitter)
for mcp in self.generate_mcp(
generation_config=CorpGroupGenerationConfig(
override_editable=self.overrideEditable, datahub_graph=datahub_graph
)
):
emitter.emit(mcp, callback)

View File

@ -1,17 +1,33 @@
from __future__ import annotations
from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import Callable, Iterable, List, Optional, Union
import pydantic
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import CorpUserInfoClass, GroupMembershipClass
from datahub.metadata.schema_classes import (
CorpUserEditableInfoClass,
CorpUserInfoClass,
GroupMembershipClass,
StatusClass,
)
@dataclass
class CorpUser:
class CorpUserGenerationConfig:
"""
A holder for configuration for MCP generation from CorpUser objects
"""
override_editable: bool = False
class CorpUser(ConfigModel):
"""This is a CorpUser class which represents a CorpUser
Args:
@ -27,10 +43,13 @@ class CorpUser:
full_name (Optional[str]): Common name of this user, format is firstName + lastName (split by a whitespace)
country_code (Optional[str]): two uppercase letters country code. e.g. US
groups (List[str]): List of group ids the user belongs to
description (Optional[str]): A description string for the user
slack (Optional[str]): Slack handle for the user
picture_link (Optional[str]): A resolvable url for the user's picture icon
phone (Optional(str)): A phone number for the user
"""
id: str
urn: str = field(init=False)
display_name: Optional[str] = None
email: Optional[str] = None
title: Optional[str] = None
@ -41,35 +60,73 @@ class CorpUser:
last_name: Optional[str] = None
full_name: Optional[str] = None
country_code: Optional[str] = None
groups: List[str] = field(default_factory=list)
groups: Optional[List[str]] = None
description: Optional[str] = None
slack: Optional[str] = None
picture_link: Optional[str] = None
phone: Optional[str] = None
def __post_init__(self):
self.urn = builder.make_user_urn(self.id)
@pydantic.validator("full_name", always=True)
def full_name_can_be_built_from_first_name_last_name(v, values):
if not v:
if "first_name" in values or "last_name" in values:
first_name = values.get("first_name") or ""
last_name = values.get("last_name") or ""
full_name = f"{first_name} {last_name}" if last_name else first_name
return full_name
else:
return v
@property
def urn(self):
return builder.make_user_urn(self.id)
def _needs_editable_aspect(self) -> bool:
return (bool)(self.slack or self.description or self.picture_link or self.phone)
def generate_group_membership_aspect(self) -> Iterable[GroupMembershipClass]:
group_membership = GroupMembershipClass(
groups=[builder.make_group_urn(group) for group in self.groups]
)
return [group_membership]
if self.groups is not None:
group_membership = GroupMembershipClass(
groups=[builder.make_group_urn(group) for group in self.groups]
)
return [group_membership]
else:
return []
def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=CorpUserInfoClass(
active=True, # Deprecated, use CorpUserStatus instead.
displayName=self.display_name,
email=self.email,
title=self.title,
managerUrn=self.manager_urn,
departmentId=self.department_id,
departmentName=self.department_name,
firstName=self.first_name,
lastName=self.last_name,
fullName=self.full_name,
countryCode=self.country_code,
),
)
yield mcp
def generate_mcp(
self, generation_config: CorpUserGenerationConfig = CorpUserGenerationConfig()
) -> Iterable[MetadataChangeProposalWrapper]:
if generation_config.override_editable or self._needs_editable_aspect():
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=CorpUserEditableInfoClass(
aboutMe=self.description,
pictureLink=self.picture_link,
displayName=self.full_name,
slack=self.slack,
email=self.email,
phone=self.phone,
),
)
yield mcp
else:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=CorpUserInfoClass(
active=True, # Deprecated, use CorpUserStatus instead.
displayName=self.display_name,
email=self.email,
title=self.title,
managerUrn=self.manager_urn,
departmentId=self.department_id,
departmentName=self.department_name,
firstName=self.first_name,
lastName=self.last_name,
fullName=self.full_name,
countryCode=self.country_code,
),
)
yield mcp
for group_membership in self.generate_group_membership_aspect():
mcp = MetadataChangeProposalWrapper(
@ -78,6 +135,11 @@ class CorpUser:
)
yield mcp
# Finally emit status
yield MetadataChangeProposalWrapper(
entityUrn=self.urn, aspect=StatusClass(removed=False)
)
def emit(
self,
emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter],

View File

@ -0,0 +1,35 @@
import io
from pathlib import Path
from typing import Union
from datahub.configuration.common import ConfigurationError
from datahub.configuration.yaml import YamlConfigurationMechanism
def load_file(config_file: Path) -> Union[dict, list]:
"""
A variant of datahub.configuration.common.config_loader.load_config_file
that does not:
- resolve env variables
- pretend to return a dictionary
Required for other use-cases of loading pydantic based models and will probably
evolve to becoming a standard function that all the specific. cli variants will use
to load up the models from external files
"""
if not isinstance(config_file, Path):
config_file = Path(config_file)
if not config_file.is_file():
raise ConfigurationError(f"Cannot open config file {config_file}")
if config_file.suffix in {".yaml", ".yml"}:
config_mech: YamlConfigurationMechanism = YamlConfigurationMechanism()
else:
raise ConfigurationError(
f"Only .yaml and .yml are supported. Cannot process file type {config_file.suffix}"
)
raw_config_file = config_file.read_text()
config_fp = io.StringIO(raw_config_file)
raw_config = config_mech.load_config(config_fp)
return raw_config

View File

@ -0,0 +1,60 @@
import logging
from pathlib import Path
import click
from click_default_group import DefaultGroup
from datahub.api.entities.corpgroup.corpgroup import (
CorpGroup,
CorpGroupGenerationConfig,
)
from datahub.cli.specific.file_loader import load_file
from datahub.ingestion.graph.client import get_default_graph
from datahub.telemetry import telemetry
from datahub.upgrade import upgrade
logger = logging.getLogger(__name__)
@click.group(cls=DefaultGroup, default="upsert")
def group() -> None:
"""A group of commands to interact with the Group entity in DataHub."""
pass
@group.command(
name="upsert",
)
@click.option("-f", "--file", required=True, type=click.Path(exists=True))
@click.option(
"--override-editable",
required=False,
is_flag=True,
default=False,
help="When set, writes to the editable section of the metadata graph, overwriting writes from the UI",
)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def upsert(file: Path, override_editable: bool) -> None:
"""Create or Update a Group with embedded Users"""
config_dict = load_file(file)
group_configs = config_dict if isinstance(config_dict, list) else [config_dict]
with get_default_graph() as emitter:
for group_config in group_configs:
try:
datahub_group = CorpGroup.parse_obj(config_dict)
for mcp in datahub_group.generate_mcp(
generation_config=CorpGroupGenerationConfig(
override_editable=override_editable, datahub_graph=emitter
)
):
emitter.emit(mcp)
click.secho(
f"Update succeeded for group {datahub_group.urn}.", fg="green"
)
except Exception as e:
click.secho(
f"Update failed for id {group_config.get('id')}. due to {e}",
fg="red",
)

View File

@ -0,0 +1,56 @@
import logging
import pathlib
from pathlib import Path
import click
from click_default_group import DefaultGroup
from datahub.api.entities.corpuser.corpuser import CorpUser, CorpUserGenerationConfig
from datahub.cli.specific.file_loader import load_file
from datahub.ingestion.graph.client import get_default_graph
from datahub.telemetry import telemetry
from datahub.upgrade import upgrade
logger = logging.getLogger(__name__)
@click.group(cls=DefaultGroup, default="upsert")
def user() -> None:
"""A group of commands to interact with the User entity in DataHub."""
pass
@user.command(
name="upsert",
)
@click.option("-f", "--file", required=True, type=click.Path(exists=True))
@click.option(
"--override-editable",
required=False,
default=False,
is_flag=True,
help="Use this flag to overwrite the information that is set via the UI",
)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def upsert(file: Path, override_editable: bool) -> None:
"""Create or Update a User in DataHub"""
config_dict = load_file(pathlib.Path(file))
user_configs = config_dict if isinstance(config_dict, list) else [config_dict]
with get_default_graph() as emitter:
for user_config in user_configs:
try:
datahub_user: CorpUser = CorpUser.parse_obj(user_config)
for mcp in datahub_user.generate_mcp(
generation_config=CorpUserGenerationConfig(
override_editable=override_editable
)
):
emitter.emit(mcp)
click.secho(f"Update succeeded for urn {datahub_user.urn}.", fg="green")
except Exception as e:
click.secho(
f"Update failed for id {user_config.get('id')}. due to {e}",
fg="red",
)

View File

@ -164,15 +164,34 @@ def assertion_urn_to_key(assertion_urn: str) -> Optional[AssertionKeyClass]:
def make_user_urn(username: str) -> str:
return f"urn:li:corpuser:{username}"
"""
Makes a user urn if the input is not a user urn already
"""
return (
f"urn:li:corpuser:{username}"
if not username.startswith("urn:li:corpuser:")
else username
)
def make_group_urn(groupname: str) -> str:
return f"urn:li:corpGroup:{groupname}"
"""
Makes a group urn if the input is not a group urn already
"""
if groupname and groupname.startswith("urn:li:corpGroup:"):
return groupname
else:
return f"urn:li:corpGroup:{groupname}"
def make_tag_urn(tag: str) -> str:
return f"urn:li:tag:{tag}"
"""
Makes a tag urn if the input is not a tag urn already
"""
if tag and tag.startswith("urn:li:tag:"):
return tag
else:
return f"urn:li:tag:{tag}"
def make_owner_urn(owner: str, owner_type: OwnerType) -> str:
@ -180,7 +199,13 @@ def make_owner_urn(owner: str, owner_type: OwnerType) -> str:
def make_term_urn(term: str) -> str:
return f"urn:li:glossaryTerm:{term}"
"""
Makes a term urn if the input is not a term urn already
"""
if term and term.startswith("urn:li:glossaryTerm:"):
return term
else:
return f"urn:li:glossaryTerm:{term}"
def make_data_flow_urn(

View File

@ -20,6 +20,8 @@ from datahub.cli.get_cli import get
from datahub.cli.ingest_cli import ingest
from datahub.cli.migrate import migrate
from datahub.cli.put_cli import put
from datahub.cli.specific.group_cli import group
from datahub.cli.specific.user_cli import user
from datahub.cli.state_cli import state
from datahub.cli.telemetry import telemetry as telemetry_cli
from datahub.cli.timeline_cli import timeline
@ -147,6 +149,8 @@ datahub.add_command(state)
datahub.add_command(telemetry_cli)
datahub.add_command(migrate)
datahub.add_command(timeline)
datahub.add_command(user)
datahub.add_command(group)
try:
from datahub.cli.lite_cli import lite

View File

@ -0,0 +1,120 @@
import json
import sys
import tempfile
from typing import Any, Dict, Iterable, List
import yaml
from click.testing import CliRunner, Result
from datahub.api.entities.corpgroup.corpgroup import CorpGroup
from datahub.entrypoints import datahub
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
import time
import requests_wrapper as requests
runner = CliRunner(mix_stderr=False)
def sync_elastic() -> None:
time.sleep(requests.ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
def datahub_upsert_group(group: CorpGroup) -> None:
with tempfile.NamedTemporaryFile("w+t", suffix=".yaml") as group_file:
yaml.dump(group.dict(), group_file)
group_file.flush()
upsert_args: List[str] = [
"group",
"upsert",
"-f",
group_file.name,
]
group_create_result = runner.invoke(datahub, upsert_args)
assert group_create_result.exit_code == 0
def gen_datahub_groups(num_groups: int) -> Iterable[CorpGroup]:
for i in range(0, num_groups):
group = CorpGroup(
id=f"group_{i}",
display_name=f"Group {i}",
email=f"group_{i}@datahubproject.io",
description=f"The Group {i}",
picture_link=f"https://images.google.com/group{i}.jpg",
slack=f"@group{i}",
admins=["user1"],
members=["user2"],
)
yield group
def datahub_get_group(group_urn: str):
get_args: List[str] = ["get", "--urn", group_urn]
get_result: Result = runner.invoke(datahub, get_args)
assert get_result.exit_code == 0
try:
get_result_output_obj: Dict = json.loads(get_result.stdout)
return get_result_output_obj
except json.JSONDecodeError as e:
print("Failed to decode: " + get_result.stdout, file=sys.stderr)
raise e
def get_group_ownership(user_urn: str) -> List[str]:
graph = get_default_graph()
entities = graph.get_related_entities(
entity_urn=user_urn,
relationship_types="OwnedBy",
direction=DataHubGraph.RelationshipDirection.INCOMING,
)
return [entity.urn for entity in entities]
def get_group_membership(user_urn: str) -> List[str]:
graph = get_default_graph()
entities = graph.get_related_entities(
entity_urn=user_urn,
relationship_types="IsMemberOfGroup",
direction=DataHubGraph.RelationshipDirection.OUTGOING,
)
return [entity.urn for entity in entities]
def test_group_upsert(wait_for_healthchecks: Any) -> None:
num_groups: int = 10
for i, datahub_group in enumerate(gen_datahub_groups(num_groups)):
datahub_upsert_group(datahub_group)
group_dict = datahub_get_group(f"urn:li:corpGroup:group_{i}")
assert group_dict == {
"corpGroupEditableInfo": {
"description": f"The Group {i}",
"email": f"group_{i}@datahubproject.io",
"pictureLink": f"https://images.google.com/group{i}.jpg",
"slack": f"@group{i}",
},
"corpGroupInfo": {
"admins": ["urn:li:corpuser:user1"],
"description": f"The Group {i}",
"displayName": f"Group {i}",
"email": f"group_{i}@datahubproject.io",
"groups": [],
"members": ["urn:li:corpuser:user2"],
"slack": f"@group{i}",
},
"corpGroupKey": {"name": f"group_{i}"},
"ownership": {
"lastModified": {"actor": "urn:li:corpuser:unknown", "time": 0},
"owners": [
{"owner": "urn:li:corpuser:user1", "type": "TECHNICAL_OWNER"}
],
},
"status": {"removed": False},
}
sync_elastic()
groups_owned = get_group_ownership("urn:li:corpuser:user1")
groups_partof = get_group_membership("urn:li:corpuser:user2")
all_groups = sorted([f"urn:li:corpGroup:group_{i}" for i in range(0, num_groups)])
assert sorted(groups_owned) == all_groups
assert sorted(groups_partof) == all_groups

View File

@ -0,0 +1,77 @@
import json
import sys
import tempfile
from typing import Any, Dict, Iterable, List
import yaml
from click.testing import CliRunner, Result
from datahub.api.entities.corpuser.corpuser import CorpUser
from datahub.entrypoints import datahub
runner = CliRunner(mix_stderr=False)
def datahub_upsert_user(user: CorpUser) -> None:
with tempfile.NamedTemporaryFile("w+t", suffix=".yaml") as user_file:
yaml.dump(user.dict(), user_file)
user_file.flush()
upsert_args: List[str] = [
"user",
"upsert",
"-f",
user_file.name,
]
user_create_result = runner.invoke(datahub, upsert_args)
assert user_create_result.exit_code == 0
def gen_datahub_users(num_users: int) -> Iterable[CorpUser]:
for i in range(0, num_users):
user = CorpUser(
id=f"user_{i}",
display_name=f"User {i}",
email=f"user_{i}@datahubproject.io",
title=f"User {i}",
first_name="User",
last_name=f"{i}",
groups=[f"urn:li:corpGroup:group_{i}"],
description=f"The User {i}",
slack=f"@user{i}",
picture_link=f"https://images.google.com/user{i}.jpg",
phone=f"1-800-USER-{i}",
)
yield user
def datahub_get_user(user_urn: str):
get_args: List[str] = ["get", "--urn", user_urn]
get_result: Result = runner.invoke(datahub, get_args)
assert get_result.exit_code == 0
try:
get_result_output_obj: Dict = json.loads(get_result.stdout)
return get_result_output_obj
except json.JSONDecodeError as e:
print("Failed to decode: " + get_result.stdout, file=sys.stderr)
raise e
def test_user_upsert(wait_for_healthchecks: Any) -> None:
num_user_profiles: int = 10
for i, datahub_user in enumerate(gen_datahub_users(num_user_profiles)):
datahub_upsert_user(datahub_user)
user_dict = datahub_get_user(f"urn:li:corpuser:user_{i}")
assert user_dict == {
"corpUserEditableInfo": {
"aboutMe": f"The User {i}",
"displayName": f"User {i}",
"email": f"user_{i}@datahubproject.io",
"phone": f"1-800-USER-{i}",
"pictureLink": f"https://images.google.com/user{i}.jpg",
"skills": [],
"slack": f"@user{i}",
"teams": [],
},
"corpUserKey": {"username": f"user_{i}"},
"groupMembership": {"groups": [f"urn:li:corpGroup:group_{i}"]},
"status": {"removed": False},
}