mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-31 18:59:23 +00:00 
			
		
		
		
	
		
			
	
	
		
			79 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			79 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import logging | ||
|  | from typing import Union | ||
|  | 
 | ||
|  | from datahub.configuration.kafka import KafkaProducerConnectionConfig | ||
|  | from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig | ||
|  | from datahub.emitter.rest_emitter import DataHubRestEmitter | ||
|  | from datahub.metadata.schema_classes import ( | ||
|  |     FormPromptClass, | ||
|  |     FormPromptTypeClass, | ||
|  |     FormTypeClass, | ||
|  |     OwnerClass, | ||
|  |     OwnershipTypeClass, | ||
|  |     StructuredPropertyParamsClass, | ||
|  | ) | ||
|  | from datahub.metadata.urns import FormUrn | ||
|  | from datahub.specific.form import FormPatchBuilder | ||
|  | 
 | ||
|  | log = logging.getLogger(__name__) | ||
|  | logging.basicConfig(level=logging.INFO) | ||
|  | 
 | ||
|  | 
 | ||
|  | # Get an emitter, either REST or Kafka, this example shows you both | ||
|  | def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]: | ||
|  |     USE_REST_EMITTER = True | ||
|  |     if USE_REST_EMITTER: | ||
|  |         gms_endpoint = "http://localhost:8080" | ||
|  |         return DataHubRestEmitter(gms_server=gms_endpoint) | ||
|  |     else: | ||
|  |         kafka_server = "localhost:9092" | ||
|  |         schema_registry_url = "http://localhost:8081" | ||
|  |         return DatahubKafkaEmitter( | ||
|  |             config=KafkaEmitterConfig( | ||
|  |                 connection=KafkaProducerConnectionConfig( | ||
|  |                     bootstrap=kafka_server, schema_registry_url=schema_registry_url | ||
|  |                 ) | ||
|  |             ) | ||
|  |         ) | ||
|  | 
 | ||
|  | 
 | ||
|  | # input your unique form ID | ||
|  | form_urn = FormUrn("metadata_initiative_1") | ||
|  | 
 | ||
|  | # example prompts to add, must reference an existing structured property | ||
|  | new_prompt = FormPromptClass( | ||
|  |     id="abcd", | ||
|  |     title="title", | ||
|  |     type=FormPromptTypeClass.STRUCTURED_PROPERTY, | ||
|  |     structuredPropertyParams=StructuredPropertyParamsClass( | ||
|  |         "urn:li:structuredProperty:io.acryl.test" | ||
|  |     ), | ||
|  |     required=True, | ||
|  | ) | ||
|  | new_prompt2 = FormPromptClass( | ||
|  |     id="1234", | ||
|  |     title="title", | ||
|  |     type=FormPromptTypeClass.FIELDS_STRUCTURED_PROPERTY, | ||
|  |     structuredPropertyParams=StructuredPropertyParamsClass( | ||
|  |         "urn:li:structuredProperty:io.acryl.test" | ||
|  |     ), | ||
|  |     required=True, | ||
|  | ) | ||
|  | 
 | ||
|  | with get_emitter() as emitter: | ||
|  |     for patch_mcp in ( | ||
|  |         FormPatchBuilder(str(form_urn)) | ||
|  |         .add_owner( | ||
|  |             OwnerClass( | ||
|  |                 owner="urn:li:corpuser:jdoe", type=OwnershipTypeClass.TECHNICAL_OWNER | ||
|  |             ) | ||
|  |         ) | ||
|  |         .set_name("New Name") | ||
|  |         .set_description("New description here") | ||
|  |         .set_type(FormTypeClass.VERIFICATION) | ||
|  |         .set_ownership_form(True) | ||
|  |         .add_prompts([new_prompt, new_prompt2]) | ||
|  |         .build() | ||
|  |     ): | ||
|  |         emitter.emit(patch_mcp) |