mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 07:34:44 +00:00
79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
import logging
|
|
from typing import Union
|
|
|
|
from datahub.configuration.kafka import KafkaProducerConnectionConfig
|
|
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
|
|
from datahub.emitter.rest_emitter import DataHubRestEmitter
|
|
from datahub.metadata.schema_classes import (
|
|
FormPromptClass,
|
|
FormPromptTypeClass,
|
|
FormTypeClass,
|
|
OwnerClass,
|
|
OwnershipTypeClass,
|
|
StructuredPropertyParamsClass,
|
|
)
|
|
from datahub.metadata.urns import FormUrn
|
|
from datahub.specific.form import FormPatchBuilder
|
|
|
|
log = logging.getLogger(__name__)
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
# Get an emitter, either REST or Kafka, this example shows you both
|
|
def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
|
|
USE_REST_EMITTER = True
|
|
if USE_REST_EMITTER:
|
|
gms_endpoint = "http://localhost:8080"
|
|
return DataHubRestEmitter(gms_server=gms_endpoint)
|
|
else:
|
|
kafka_server = "localhost:9092"
|
|
schema_registry_url = "http://localhost:8081"
|
|
return DatahubKafkaEmitter(
|
|
config=KafkaEmitterConfig(
|
|
connection=KafkaProducerConnectionConfig(
|
|
bootstrap=kafka_server, schema_registry_url=schema_registry_url
|
|
)
|
|
)
|
|
)
|
|
|
|
|
|
# input your unique form ID
|
|
form_urn = FormUrn("metadata_initiative_1")
|
|
|
|
# example prompts to add, must reference an existing structured property
|
|
new_prompt = FormPromptClass(
|
|
id="abcd",
|
|
title="title",
|
|
type=FormPromptTypeClass.STRUCTURED_PROPERTY,
|
|
structuredPropertyParams=StructuredPropertyParamsClass(
|
|
"urn:li:structuredProperty:io.acryl.test"
|
|
),
|
|
required=True,
|
|
)
|
|
new_prompt2 = FormPromptClass(
|
|
id="1234",
|
|
title="title",
|
|
type=FormPromptTypeClass.FIELDS_STRUCTURED_PROPERTY,
|
|
structuredPropertyParams=StructuredPropertyParamsClass(
|
|
"urn:li:structuredProperty:io.acryl.test"
|
|
),
|
|
required=True,
|
|
)
|
|
|
|
with get_emitter() as emitter:
|
|
for patch_mcp in (
|
|
FormPatchBuilder(str(form_urn))
|
|
.add_owner(
|
|
OwnerClass(
|
|
owner="urn:li:corpuser:jdoe", type=OwnershipTypeClass.TECHNICAL_OWNER
|
|
)
|
|
)
|
|
.set_name("New Name")
|
|
.set_description("New description here")
|
|
.set_type(FormTypeClass.VERIFICATION)
|
|
.set_ownership_form(True)
|
|
.add_prompts([new_prompt, new_prompt2])
|
|
.build()
|
|
):
|
|
emitter.emit(patch_mcp)
|