datahub/metadata-ingestion/examples/library/domain_add_documentation.py

78 lines
2.3 KiB
Python

import time
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import (
AuditStampClass,
DomainPropertiesClass,
InstitutionalMemoryClass,
InstitutionalMemoryMetadataClass,
)
from datahub.metadata.urns import CorpUserUrn, DomainUrn
graph = DataHubGraph(DatahubClientConfig(server="http://localhost:8080"))
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
domain_urn = DomainUrn(id="marketing")
# Get existing properties
existing_properties = graph.get_aspect(str(domain_urn), DomainPropertiesClass)
# Update description
if existing_properties:
existing_properties.description = (
"The Marketing domain contains all data assets related to marketing operations, "
"campaigns, customer analytics, and brand management."
)
properties = existing_properties
else:
properties = DomainPropertiesClass(
name="Marketing",
description=(
"The Marketing domain contains all data assets related to marketing operations, "
"campaigns, customer analytics, and brand management."
),
)
# Emit properties
emitter.emit_mcp(
MetadataChangeProposalWrapper(entityUrn=str(domain_urn), aspect=properties)
)
# Get existing institutional memory
existing_memory = graph.get_aspect(str(domain_urn), InstitutionalMemoryClass)
links_list = (
list(existing_memory.elements)
if existing_memory and existing_memory.elements
else []
)
# Add new links
audit_stamp = AuditStampClass(
time=int(time.time() * 1000), actor=str(CorpUserUrn("datahub"))
)
links_list.append(
InstitutionalMemoryMetadataClass(
url="https://wiki.company.com/domains/marketing",
description="Marketing Domain Wiki - Overview and Guidelines",
createStamp=audit_stamp,
)
)
links_list.append(
InstitutionalMemoryMetadataClass(
url="https://confluence.company.com/marketing-data-governance",
description="Marketing Data Governance Policies",
createStamp=audit_stamp,
)
)
# Emit institutional memory
emitter.emit_mcp(
MetadataChangeProposalWrapper(
entityUrn=str(domain_urn), aspect=InstitutionalMemoryClass(elements=links_list)
)
)