mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-26 00:14:53 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			79 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			79 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| from typing import Union
 | |
| 
 | |
| from datahub.configuration.kafka import KafkaProducerConnectionConfig
 | |
| from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
 | |
| from datahub.emitter.rest_emitter import DataHubRestEmitter
 | |
| from datahub.metadata.schema_classes import (
 | |
|     FormPromptClass,
 | |
|     FormPromptTypeClass,
 | |
|     FormTypeClass,
 | |
|     OwnerClass,
 | |
|     OwnershipTypeClass,
 | |
|     StructuredPropertyParamsClass,
 | |
| )
 | |
| from datahub.metadata.urns import FormUrn
 | |
| from datahub.specific.form import FormPatchBuilder
 | |
| 
 | |
| log = logging.getLogger(__name__)
 | |
| logging.basicConfig(level=logging.INFO)
 | |
| 
 | |
| 
 | |
| # Get an emitter, either REST or Kafka, this example shows you both
 | |
| def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
 | |
|     USE_REST_EMITTER = True
 | |
|     if USE_REST_EMITTER:
 | |
|         gms_endpoint = "http://localhost:8080"
 | |
|         return DataHubRestEmitter(gms_server=gms_endpoint)
 | |
|     else:
 | |
|         kafka_server = "localhost:9092"
 | |
|         schema_registry_url = "http://localhost:8081"
 | |
|         return DatahubKafkaEmitter(
 | |
|             config=KafkaEmitterConfig(
 | |
|                 connection=KafkaProducerConnectionConfig(
 | |
|                     bootstrap=kafka_server, schema_registry_url=schema_registry_url
 | |
|                 )
 | |
|             )
 | |
|         )
 | |
| 
 | |
| 
 | |
| # input your unique form ID
 | |
| form_urn = FormUrn("metadata_initiative_1")
 | |
| 
 | |
| # example prompts to add, must reference an existing structured property
 | |
| new_prompt = FormPromptClass(
 | |
|     id="abcd",
 | |
|     title="title",
 | |
|     type=FormPromptTypeClass.STRUCTURED_PROPERTY,
 | |
|     structuredPropertyParams=StructuredPropertyParamsClass(
 | |
|         "urn:li:structuredProperty:io.acryl.test"
 | |
|     ),
 | |
|     required=True,
 | |
| )
 | |
| new_prompt2 = FormPromptClass(
 | |
|     id="1234",
 | |
|     title="title",
 | |
|     type=FormPromptTypeClass.FIELDS_STRUCTURED_PROPERTY,
 | |
|     structuredPropertyParams=StructuredPropertyParamsClass(
 | |
|         "urn:li:structuredProperty:io.acryl.test"
 | |
|     ),
 | |
|     required=True,
 | |
| )
 | |
| 
 | |
| with get_emitter() as emitter:
 | |
|     for patch_mcp in (
 | |
|         FormPatchBuilder(str(form_urn))
 | |
|         .add_owner(
 | |
|             OwnerClass(
 | |
|                 owner="urn:li:corpuser:jdoe", type=OwnershipTypeClass.TECHNICAL_OWNER
 | |
|             )
 | |
|         )
 | |
|         .set_name("New Name")
 | |
|         .set_description("New description here")
 | |
|         .set_type(FormTypeClass.VERIFICATION)
 | |
|         .set_ownership_form(True)
 | |
|         .add_prompts([new_prompt, new_prompt2])
 | |
|         .build()
 | |
|     ):
 | |
|         emitter.emit(patch_mcp)
 | 
