datahub/metadata-ingestion/examples/library/query_add_subjects.py

42 lines
1.3 KiB
Python

# metadata-ingestion/examples/library/query_add_subjects.py
import logging
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig
from datahub.metadata.schema_classes import QuerySubjectClass, QuerySubjectsClass
from datahub.metadata.urns import DatasetUrn, QueryUrn
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
query_urn = QueryUrn("my-unique-query-id")
graph = DataHubGraph(DataHubGraphConfig(server="http://localhost:8080"))
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
existing_subjects = graph.get_aspect(
entity_urn=query_urn.urn(),
aspect_type=QuerySubjectsClass,
)
subjects = existing_subjects.subjects if existing_subjects else []
new_dataset_urn = DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:postgres,public.customers,PROD)"
)
new_subject = QuerySubjectClass(entity=new_dataset_urn.urn())
if new_subject not in subjects:
subjects.append(new_subject)
query_subjects_aspect = QuerySubjectsClass(subjects=subjects)
event = MetadataChangeProposalWrapper(
entityUrn=query_urn.urn(),
aspect=query_subjects_aspect,
)
emitter.emit(event)
log.info(f"Added subject to query {query_urn}")