79 lines
2.4 KiB
Python

import logging
from typing import Union
from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.metadata.schema_classes import (
FormPromptClass,
FormPromptTypeClass,
FormTypeClass,
OwnerClass,
OwnershipTypeClass,
StructuredPropertyParamsClass,
)
from datahub.metadata.urns import FormUrn
from datahub.specific.form import FormPatchBuilder
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Get an emitter, either REST or Kafka, this example shows you both
def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
USE_REST_EMITTER = True
if USE_REST_EMITTER:
gms_endpoint = "http://localhost:8080"
return DataHubRestEmitter(gms_server=gms_endpoint)
else:
kafka_server = "localhost:9092"
schema_registry_url = "http://localhost:8081"
return DatahubKafkaEmitter(
config=KafkaEmitterConfig(
connection=KafkaProducerConnectionConfig(
bootstrap=kafka_server, schema_registry_url=schema_registry_url
)
)
)
# input your unique form ID
form_urn = FormUrn("metadata_initiative_1")
# example prompts to add, must reference an existing structured property
new_prompt = FormPromptClass(
id="abcd",
title="title",
type=FormPromptTypeClass.STRUCTURED_PROPERTY,
structuredPropertyParams=StructuredPropertyParamsClass(
"urn:li:structuredProperty:io.acryl.test"
),
required=True,
)
new_prompt2 = FormPromptClass(
id="1234",
title="title",
type=FormPromptTypeClass.FIELDS_STRUCTURED_PROPERTY,
structuredPropertyParams=StructuredPropertyParamsClass(
"urn:li:structuredProperty:io.acryl.test"
),
required=True,
)
with get_emitter() as emitter:
for patch_mcp in (
FormPatchBuilder(str(form_urn))
.add_owner(
OwnerClass(
owner="urn:li:corpuser:jdoe", type=OwnershipTypeClass.TECHNICAL_OWNER
)
)
.set_name("New Name")
.set_description("New description here")
.set_type(FormTypeClass.VERIFICATION)
.set_ownership_form(True)
.add_prompts([new_prompt, new_prompt2])
.build()
):
emitter.emit(patch_mcp)