diff --git a/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_verbose.py b/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_verbose.py index 769f63cae2..97acdbee1c 100644 --- a/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_verbose.py +++ b/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_verbose.py @@ -1,6 +1,8 @@ import time import uuid +from datahub.api.entities.corpgroup.corpgroup import CorpGroup +from datahub.api.entities.corpuser.corpuser import CorpUser from datahub.api.entities.datajob.dataflow import DataFlow from datahub.api.entities.datajob.datajob import DataJob from datahub.api.entities.dataprocess.dataprocess_instance import ( @@ -22,7 +24,7 @@ dataJob.emit(emitter) dataJob2 = DataJob(flow_urn=jobFlow.urn, id="job2", name="My Job 2") dataJob2.upstream_urns.append(dataJob.urn) dataJob2.tags.add("TestTag") -dataJob2.owners.add("test@test.com") +dataJob2.owners.add("testUser") dataJob2.emit(emitter) dataJob3 = DataJob(flow_urn=jobFlow.urn, id="job3", name="My Job 3") @@ -32,6 +34,7 @@ dataJob3.emit(emitter) dataJob4 = DataJob(flow_urn=jobFlow.urn, id="job4", name="My Job 4") dataJob4.upstream_urns.append(dataJob2.urn) dataJob4.upstream_urns.append(dataJob3.urn) +dataJob4.group_owners.add("testGroup") dataJob4.emit(emitter) # Hello World @@ -105,3 +108,20 @@ jobRun4.emit_process_end( end_timestamp_millis=int(time.time() * 1000), result=InstanceRunResult.SUCCESS, ) + +user1 = CorpUser( + id="testUser", + display_name="Test User", + email="test-user@test.com", + groups=["testGroup"], +) +user1.emit(emitter) + +group1 = CorpGroup( + id="testGroup", + display_name="Test Group", + email="test-group@test.com", + slack="#test-group", + overrideEditable=True, +) +group1.emit(emitter) diff --git a/metadata-ingestion/src/datahub/api/entities/corpgroup/__init__.py b/metadata-ingestion/src/datahub/api/entities/corpgroup/__init__.py new file mode 100644 index 0000000000..dc4be421ef --- /dev/null +++ b/metadata-ingestion/src/datahub/api/entities/corpgroup/__init__.py @@ -0,0 +1 @@ +from datahub.api.entities.corpgroup.corpgroup import CorpGroup diff --git a/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py b/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py new file mode 100644 index 0000000000..25ca60ade3 --- /dev/null +++ b/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Callable, Iterable, Optional, Union, cast + +import datahub.emitter.mce_builder as builder +from datahub.emitter.kafka_emitter import DatahubKafkaEmitter +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.metadata.schema_classes import ( + ChangeTypeClass, + CorpGroupEditableInfoClass, + CorpGroupInfoClass, +) + + +@dataclass +class CorpGroup: + """This is a CorpGroup class which represents a CorpGroup + + Args: + id (str): The id of the group + display_name (Optional[str]): The name of the group + email (Optional[str]): email of this group + description (Optional[str]): A description of the group + overrideEditable (bool): If True, group information that is editable in the UI will be overridden + picture_link (Optional[str]): A URL which points to a picture which user wants to set as the photo for the group + slack (Optional[str]): Slack channel for the group + """ + + id: str + urn: str = field(init=False) + + # These are for CorpGroupInfo + display_name: Optional[str] = None + email: Optional[str] = None + description: Optional[str] = None + + # These are for CorpGroupEditableInfo + overrideEditable: bool = False + picture_link: Optional[str] = None + slack: Optional[str] = None + + def __post_init__(self): + self.urn = builder.make_group_urn(self.id) + + def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: + if self.overrideEditable: + mcp = MetadataChangeProposalWrapper( + entityType="corpgroup", + entityUrn=str(self.urn), + aspectName="corpGroupEditableInfo", + aspect=CorpGroupEditableInfoClass( + description=self.description, + pictureLink=self.picture_link, + slack=self.slack, + email=self.email, + ), + changeType=ChangeTypeClass.UPSERT, + ) + yield mcp + + mcp = MetadataChangeProposalWrapper( + entityType="corpgroup", + entityUrn=str(self.urn), + aspectName="corpGroupInfo", + aspect=CorpGroupInfoClass( + admins=[], # Deprecated, replaced by Ownership aspect + members=[], # Deprecated, replaced by GroupMembership aspect + groups=[], # Deprecated, this field is unused + displayName=self.display_name, + email=self.email, + description=self.description, + ), + changeType=ChangeTypeClass.UPSERT, + ) + yield mcp + + def emit( + self, + emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter], + callback: Optional[Callable[[Exception, str], None]] = None, + ) -> None: + """ + Emit the CorpGroup entity to Datahub + + :param emitter: Datahub Emitter to emit the proccess event + :param callback: The callback method for KafkaEmitter if it is used + """ + for mcp in self.generate_mcp(): + if type(emitter).__name__ == "DatahubKafkaEmitter": + assert callback is not None + kafka_emitter = cast("DatahubKafkaEmitter", emitter) + kafka_emitter.emit(mcp, callback) + else: + rest_emitter = cast("DatahubRestEmitter", emitter) + rest_emitter.emit(mcp) diff --git a/metadata-ingestion/src/datahub/api/entities/corpuser/__init__.py b/metadata-ingestion/src/datahub/api/entities/corpuser/__init__.py new file mode 100644 index 0000000000..ea94087746 --- /dev/null +++ b/metadata-ingestion/src/datahub/api/entities/corpuser/__init__.py @@ -0,0 +1 @@ +from datahub.api.entities.corpuser.corpuser import CorpUser diff --git a/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py b/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py new file mode 100644 index 0000000000..00fe35ded5 --- /dev/null +++ b/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Callable, Iterable, List, Optional, Union, cast + +import datahub.emitter.mce_builder as builder +from datahub.emitter.kafka_emitter import DatahubKafkaEmitter +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.metadata.schema_classes import ( + ChangeTypeClass, + CorpUserInfoClass, + GroupMembershipClass, +) + + +@dataclass +class CorpUser: + """This is a CorpUser class which represents a CorpUser + + Args: + id (str): The id of the user + display_name (Optional[str]): The name of the user to display in the UI + email (Optional[str]): email address of this user + title (Optional[str]): title of this user + manager_urn (Optional[str]): direct manager of this user + department_id (Optional[int]): department id this user belongs to + department_name (Optional[str]): department name this user belongs to + first_name (Optional[str]): first name of this user + last_name (Optional[str]): last name of this user + full_name (Optional[str]): Common name of this user, format is firstName + lastName (split by a whitespace) + country_code (Optional[str]): two uppercase letters country code. e.g. US + groups (List[str]): List of group ids the user belongs to + """ + + id: str + urn: str = field(init=False) + display_name: Optional[str] = None + email: Optional[str] = None + title: Optional[str] = None + manager_urn: Optional[str] = None + department_id: Optional[int] = None + department_name: Optional[str] = None + first_name: Optional[str] = None + last_name: Optional[str] = None + full_name: Optional[str] = None + country_code: Optional[str] = None + groups: List[str] = field(default_factory=list) + + def __post_init__(self): + self.urn = builder.make_user_urn(self.id) + + def generate_group_membership_aspect(self) -> Iterable[GroupMembershipClass]: + group_membership = GroupMembershipClass( + groups=[builder.make_group_urn(group) for group in self.groups] + ) + return [group_membership] + + def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: + mcp = MetadataChangeProposalWrapper( + entityType="corpuser", + entityUrn=str(self.urn), + aspectName="corpUserInfo", + aspect=CorpUserInfoClass( + active=True, # Deprecated, use CorpUserStatus instead. + displayName=self.display_name, + email=self.email, + title=self.title, + managerUrn=self.manager_urn, + departmentId=self.department_id, + departmentName=self.department_name, + firstName=self.first_name, + lastName=self.last_name, + fullName=self.full_name, + countryCode=self.country_code, + ), + changeType=ChangeTypeClass.UPSERT, + ) + yield mcp + + for group_membership in self.generate_group_membership_aspect(): + mcp = MetadataChangeProposalWrapper( + entityType="corpuser", + entityUrn=str(self.urn), + aspectName="groupMembership", + aspect=group_membership, + changeType=ChangeTypeClass.UPSERT, + ) + yield mcp + + def emit( + self, + emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter], + callback: Optional[Callable[[Exception, str], None]] = None, + ) -> None: + """ + Emit the CorpUser entity to Datahub + + :param emitter: Datahub Emitter to emit the proccess event + :param callback: The callback method for KafkaEmitter if it is used + """ + for mcp in self.generate_mcp(): + if type(emitter).__name__ == "DatahubKafkaEmitter": + assert callback is not None + kafka_emitter = cast("DatahubKafkaEmitter", emitter) + kafka_emitter.emit(mcp, callback) + else: + rest_emitter = cast("DatahubRestEmitter", emitter) + rest_emitter.emit(mcp) diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index 8f33c7c3a4..1440e7799a 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -51,6 +51,8 @@ class DataJob: parent_instance (Optional[DataProcessInstanceUrn]): The parent execution's urn if applicable properties Dict[str, str]: Custom properties to set for the DataProcessInstance url (Optional[str]): Url which points to the DataJob at the orchestrator + owners Set[str]): A list of user ids that own this job. + group_owners Set[str]): A list of group ids that own this job. inlets (List[str]): List of urns the DataProcessInstance consumes outlets (List[str]): List of urns the DataProcessInstance produces input_datajob_urns: List[DataJobUrn] = field(default_factory=list) @@ -65,6 +67,7 @@ class DataJob: url: Optional[str] = None tags: Set[str] = field(default_factory=set) owners: Set[str] = field(default_factory=set) + group_owners: Set[str] = field(default_factory=set) inlets: List[DatasetUrn] = field(default_factory=list) outlets: List[DatasetUrn] = field(default_factory=list) upstream_urns: List[DataJobUrn] = field(default_factory=list) @@ -80,17 +83,20 @@ class DataJob: ) def generate_ownership_aspect(self) -> Iterable[OwnershipClass]: + owners = set([builder.make_user_urn(owner) for owner in self.owners]) | set( + [builder.make_group_urn(owner) for owner in self.group_owners] + ) ownership = OwnershipClass( owners=[ OwnerClass( - owner=builder.make_user_urn(owner), + owner=urn, type=OwnershipTypeClass.DEVELOPER, source=OwnershipSourceClass( type=OwnershipSourceTypeClass.SERVICE, # url=dag.filepath, ), ) - for owner in (self.owners or []) + for urn in (owners or []) ], lastModified=AuditStampClass( time=0,