import logging from typing import Optional from datahub.emitter.mce_builder import make_dataset_urn, make_user_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper # read-modify-write requires access to the DataHubGraph (RestEmitter is not enough) from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph # Imports for metadata model classes from datahub.metadata.schema_classes import ( ChangeTypeClass, OwnerClass, OwnershipClass, OwnershipTypeClass, ) log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) # Inputs -> owner, ownership_type, dataset owner_to_add = make_user_urn("jdoe") ownership_type = OwnershipTypeClass.DATAOWNER dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD") # Some objects to help with conditional pathways later owner_class_to_add = OwnerClass(owner=owner_to_add, type=ownership_type) ownership_to_add = OwnershipClass(owners=[owner_class_to_add]) # First we get the current owners gms_endpoint = "http://localhost:8080" graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint)) current_owners: Optional[OwnershipClass] = graph.get_aspect_v2( entity_urn=dataset_urn, aspect="ownership", aspect_type=OwnershipClass, ) need_write = False if current_owners: if (owner_to_add, ownership_type) not in [ (x.owner, x.type) for x in current_owners.owners ]: # owners exist, but this owner is not present in the current owners current_owners.owners.append(owner_class_to_add) need_write = True else: # create a brand new ownership aspect current_owners = ownership_to_add need_write = True if need_write: event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( entityType="dataset", changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, aspectName="ownership", aspect=current_owners, ) graph.emit(event) log.info( f"Owner {owner_to_add}, type {ownership_type} added to dataset {dataset_urn}" ) else: log.info(f"Owner {owner_to_add} already exists, omitting write")