feat(ingest): add CorpUser and CorpGroup to the Python SDK (#5930)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Thomas Tauber-Marshall 2022-10-04 12:13:38 -07:00 committed by GitHub
parent 1325b8ad71
commit 6b83cab82c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 237 additions and 3 deletions

View File

@ -1,6 +1,8 @@
import time import time
import uuid import uuid
from datahub.api.entities.corpgroup.corpgroup import CorpGroup
from datahub.api.entities.corpuser.corpuser import CorpUser
from datahub.api.entities.datajob.dataflow import DataFlow from datahub.api.entities.datajob.dataflow import DataFlow
from datahub.api.entities.datajob.datajob import DataJob from datahub.api.entities.datajob.datajob import DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import ( from datahub.api.entities.dataprocess.dataprocess_instance import (
@ -22,7 +24,7 @@ dataJob.emit(emitter)
dataJob2 = DataJob(flow_urn=jobFlow.urn, id="job2", name="My Job 2") dataJob2 = DataJob(flow_urn=jobFlow.urn, id="job2", name="My Job 2")
dataJob2.upstream_urns.append(dataJob.urn) dataJob2.upstream_urns.append(dataJob.urn)
dataJob2.tags.add("TestTag") dataJob2.tags.add("TestTag")
dataJob2.owners.add("test@test.com") dataJob2.owners.add("testUser")
dataJob2.emit(emitter) dataJob2.emit(emitter)
dataJob3 = DataJob(flow_urn=jobFlow.urn, id="job3", name="My Job 3") dataJob3 = DataJob(flow_urn=jobFlow.urn, id="job3", name="My Job 3")
@ -32,6 +34,7 @@ dataJob3.emit(emitter)
dataJob4 = DataJob(flow_urn=jobFlow.urn, id="job4", name="My Job 4") dataJob4 = DataJob(flow_urn=jobFlow.urn, id="job4", name="My Job 4")
dataJob4.upstream_urns.append(dataJob2.urn) dataJob4.upstream_urns.append(dataJob2.urn)
dataJob4.upstream_urns.append(dataJob3.urn) dataJob4.upstream_urns.append(dataJob3.urn)
dataJob4.group_owners.add("testGroup")
dataJob4.emit(emitter) dataJob4.emit(emitter)
# Hello World # Hello World
@ -105,3 +108,20 @@ jobRun4.emit_process_end(
end_timestamp_millis=int(time.time() * 1000), end_timestamp_millis=int(time.time() * 1000),
result=InstanceRunResult.SUCCESS, result=InstanceRunResult.SUCCESS,
) )
user1 = CorpUser(
id="testUser",
display_name="Test User",
email="test-user@test.com",
groups=["testGroup"],
)
user1.emit(emitter)
group1 = CorpGroup(
id="testGroup",
display_name="Test Group",
email="test-group@test.com",
slack="#test-group",
overrideEditable=True,
)
group1.emit(emitter)

View File

@ -0,0 +1 @@
from datahub.api.entities.corpgroup.corpgroup import CorpGroup

View File

@ -0,0 +1,97 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, Iterable, Optional, Union, cast
import datahub.emitter.mce_builder as builder
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
ChangeTypeClass,
CorpGroupEditableInfoClass,
CorpGroupInfoClass,
)
@dataclass
class CorpGroup:
"""This is a CorpGroup class which represents a CorpGroup
Args:
id (str): The id of the group
display_name (Optional[str]): The name of the group
email (Optional[str]): email of this group
description (Optional[str]): A description of the group
overrideEditable (bool): If True, group information that is editable in the UI will be overridden
picture_link (Optional[str]): A URL which points to a picture which user wants to set as the photo for the group
slack (Optional[str]): Slack channel for the group
"""
id: str
urn: str = field(init=False)
# These are for CorpGroupInfo
display_name: Optional[str] = None
email: Optional[str] = None
description: Optional[str] = None
# These are for CorpGroupEditableInfo
overrideEditable: bool = False
picture_link: Optional[str] = None
slack: Optional[str] = None
def __post_init__(self):
self.urn = builder.make_group_urn(self.id)
def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
if self.overrideEditable:
mcp = MetadataChangeProposalWrapper(
entityType="corpgroup",
entityUrn=str(self.urn),
aspectName="corpGroupEditableInfo",
aspect=CorpGroupEditableInfoClass(
description=self.description,
pictureLink=self.picture_link,
slack=self.slack,
email=self.email,
),
changeType=ChangeTypeClass.UPSERT,
)
yield mcp
mcp = MetadataChangeProposalWrapper(
entityType="corpgroup",
entityUrn=str(self.urn),
aspectName="corpGroupInfo",
aspect=CorpGroupInfoClass(
admins=[], # Deprecated, replaced by Ownership aspect
members=[], # Deprecated, replaced by GroupMembership aspect
groups=[], # Deprecated, this field is unused
displayName=self.display_name,
email=self.email,
description=self.description,
),
changeType=ChangeTypeClass.UPSERT,
)
yield mcp
def emit(
self,
emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter],
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
"""
Emit the CorpGroup entity to Datahub
:param emitter: Datahub Emitter to emit the proccess event
:param callback: The callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
if type(emitter).__name__ == "DatahubKafkaEmitter":
assert callback is not None
kafka_emitter = cast("DatahubKafkaEmitter", emitter)
kafka_emitter.emit(mcp, callback)
else:
rest_emitter = cast("DatahubRestEmitter", emitter)
rest_emitter.emit(mcp)

View File

@ -0,0 +1 @@
from datahub.api.entities.corpuser.corpuser import CorpUser

View File

@ -0,0 +1,109 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, Iterable, List, Optional, Union, cast
import datahub.emitter.mce_builder as builder
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
ChangeTypeClass,
CorpUserInfoClass,
GroupMembershipClass,
)
@dataclass
class CorpUser:
"""This is a CorpUser class which represents a CorpUser
Args:
id (str): The id of the user
display_name (Optional[str]): The name of the user to display in the UI
email (Optional[str]): email address of this user
title (Optional[str]): title of this user
manager_urn (Optional[str]): direct manager of this user
department_id (Optional[int]): department id this user belongs to
department_name (Optional[str]): department name this user belongs to
first_name (Optional[str]): first name of this user
last_name (Optional[str]): last name of this user
full_name (Optional[str]): Common name of this user, format is firstName + lastName (split by a whitespace)
country_code (Optional[str]): two uppercase letters country code. e.g. US
groups (List[str]): List of group ids the user belongs to
"""
id: str
urn: str = field(init=False)
display_name: Optional[str] = None
email: Optional[str] = None
title: Optional[str] = None
manager_urn: Optional[str] = None
department_id: Optional[int] = None
department_name: Optional[str] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
full_name: Optional[str] = None
country_code: Optional[str] = None
groups: List[str] = field(default_factory=list)
def __post_init__(self):
self.urn = builder.make_user_urn(self.id)
def generate_group_membership_aspect(self) -> Iterable[GroupMembershipClass]:
group_membership = GroupMembershipClass(
groups=[builder.make_group_urn(group) for group in self.groups]
)
return [group_membership]
def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=str(self.urn),
aspectName="corpUserInfo",
aspect=CorpUserInfoClass(
active=True, # Deprecated, use CorpUserStatus instead.
displayName=self.display_name,
email=self.email,
title=self.title,
managerUrn=self.manager_urn,
departmentId=self.department_id,
departmentName=self.department_name,
firstName=self.first_name,
lastName=self.last_name,
fullName=self.full_name,
countryCode=self.country_code,
),
changeType=ChangeTypeClass.UPSERT,
)
yield mcp
for group_membership in self.generate_group_membership_aspect():
mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=str(self.urn),
aspectName="groupMembership",
aspect=group_membership,
changeType=ChangeTypeClass.UPSERT,
)
yield mcp
def emit(
self,
emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter],
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
"""
Emit the CorpUser entity to Datahub
:param emitter: Datahub Emitter to emit the proccess event
:param callback: The callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
if type(emitter).__name__ == "DatahubKafkaEmitter":
assert callback is not None
kafka_emitter = cast("DatahubKafkaEmitter", emitter)
kafka_emitter.emit(mcp, callback)
else:
rest_emitter = cast("DatahubRestEmitter", emitter)
rest_emitter.emit(mcp)

View File

@ -51,6 +51,8 @@ class DataJob:
parent_instance (Optional[DataProcessInstanceUrn]): The parent execution's urn if applicable parent_instance (Optional[DataProcessInstanceUrn]): The parent execution's urn if applicable
properties Dict[str, str]: Custom properties to set for the DataProcessInstance properties Dict[str, str]: Custom properties to set for the DataProcessInstance
url (Optional[str]): Url which points to the DataJob at the orchestrator url (Optional[str]): Url which points to the DataJob at the orchestrator
owners Set[str]): A list of user ids that own this job.
group_owners Set[str]): A list of group ids that own this job.
inlets (List[str]): List of urns the DataProcessInstance consumes inlets (List[str]): List of urns the DataProcessInstance consumes
outlets (List[str]): List of urns the DataProcessInstance produces outlets (List[str]): List of urns the DataProcessInstance produces
input_datajob_urns: List[DataJobUrn] = field(default_factory=list) input_datajob_urns: List[DataJobUrn] = field(default_factory=list)
@ -65,6 +67,7 @@ class DataJob:
url: Optional[str] = None url: Optional[str] = None
tags: Set[str] = field(default_factory=set) tags: Set[str] = field(default_factory=set)
owners: Set[str] = field(default_factory=set) owners: Set[str] = field(default_factory=set)
group_owners: Set[str] = field(default_factory=set)
inlets: List[DatasetUrn] = field(default_factory=list) inlets: List[DatasetUrn] = field(default_factory=list)
outlets: List[DatasetUrn] = field(default_factory=list) outlets: List[DatasetUrn] = field(default_factory=list)
upstream_urns: List[DataJobUrn] = field(default_factory=list) upstream_urns: List[DataJobUrn] = field(default_factory=list)
@ -80,17 +83,20 @@ class DataJob:
) )
def generate_ownership_aspect(self) -> Iterable[OwnershipClass]: def generate_ownership_aspect(self) -> Iterable[OwnershipClass]:
owners = set([builder.make_user_urn(owner) for owner in self.owners]) | set(
[builder.make_group_urn(owner) for owner in self.group_owners]
)
ownership = OwnershipClass( ownership = OwnershipClass(
owners=[ owners=[
OwnerClass( OwnerClass(
owner=builder.make_user_urn(owner), owner=urn,
type=OwnershipTypeClass.DEVELOPER, type=OwnershipTypeClass.DEVELOPER,
source=OwnershipSourceClass( source=OwnershipSourceClass(
type=OwnershipSourceTypeClass.SERVICE, type=OwnershipSourceTypeClass.SERVICE,
# url=dag.filepath, # url=dag.filepath,
), ),
) )
for owner in (self.owners or []) for urn in (owners or [])
], ],
lastModified=AuditStampClass( lastModified=AuditStampClass(
time=0, time=0,