42 lines
1.0 KiB
Python

import logging
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
AuditStampClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)
from datahub.metadata.urns import CorpUserUrn, FormUrn
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Form URN to add owner to
form_urn = FormUrn("metadata_initiative_2024")
# Create ownership aspect
ownership = OwnershipClass(
owners=[
OwnerClass(
owner=str(CorpUserUrn("governance_team")),
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
],
lastModified=AuditStampClass(
time=0, actor="urn:li:corpuser:datahub", impersonator=None
),
)
# Create and emit metadata change proposal
event = MetadataChangeProposalWrapper(
entityUrn=str(form_urn),
aspect=ownership,
)
rest_emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
rest_emitter.emit(event)
log.info(f"Added owner to form {form_urn}")