mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-16 19:33:41 +00:00
78 lines
2.3 KiB
Python
78 lines
2.3 KiB
Python
import time
|
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.emitter.rest_emitter import DatahubRestEmitter
|
|
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
|
|
from datahub.metadata.schema_classes import (
|
|
AuditStampClass,
|
|
DomainPropertiesClass,
|
|
InstitutionalMemoryClass,
|
|
InstitutionalMemoryMetadataClass,
|
|
)
|
|
from datahub.metadata.urns import CorpUserUrn, DomainUrn
|
|
|
|
graph = DataHubGraph(DatahubClientConfig(server="http://localhost:8080"))
|
|
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
|
|
|
|
domain_urn = DomainUrn(id="marketing")
|
|
|
|
# Get existing properties
|
|
existing_properties = graph.get_aspect(str(domain_urn), DomainPropertiesClass)
|
|
|
|
# Update description
|
|
if existing_properties:
|
|
existing_properties.description = (
|
|
"The Marketing domain contains all data assets related to marketing operations, "
|
|
"campaigns, customer analytics, and brand management."
|
|
)
|
|
properties = existing_properties
|
|
else:
|
|
properties = DomainPropertiesClass(
|
|
name="Marketing",
|
|
description=(
|
|
"The Marketing domain contains all data assets related to marketing operations, "
|
|
"campaigns, customer analytics, and brand management."
|
|
),
|
|
)
|
|
|
|
# Emit properties
|
|
emitter.emit_mcp(
|
|
MetadataChangeProposalWrapper(entityUrn=str(domain_urn), aspect=properties)
|
|
)
|
|
|
|
# Get existing institutional memory
|
|
existing_memory = graph.get_aspect(str(domain_urn), InstitutionalMemoryClass)
|
|
links_list = (
|
|
list(existing_memory.elements)
|
|
if existing_memory and existing_memory.elements
|
|
else []
|
|
)
|
|
|
|
# Add new links
|
|
audit_stamp = AuditStampClass(
|
|
time=int(time.time() * 1000), actor=str(CorpUserUrn("datahub"))
|
|
)
|
|
|
|
links_list.append(
|
|
InstitutionalMemoryMetadataClass(
|
|
url="https://wiki.company.com/domains/marketing",
|
|
description="Marketing Domain Wiki - Overview and Guidelines",
|
|
createStamp=audit_stamp,
|
|
)
|
|
)
|
|
|
|
links_list.append(
|
|
InstitutionalMemoryMetadataClass(
|
|
url="https://confluence.company.com/marketing-data-governance",
|
|
description="Marketing Data Governance Policies",
|
|
createStamp=audit_stamp,
|
|
)
|
|
)
|
|
|
|
# Emit institutional memory
|
|
emitter.emit_mcp(
|
|
MetadataChangeProposalWrapper(
|
|
entityUrn=str(domain_urn), aspect=InstitutionalMemoryClass(elements=links_list)
|
|
)
|
|
)
|