mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-26 00:14:53 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			108 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			108 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import time
 | |
| 
 | |
| from datahub.emitter.mce_builder import make_dataset_urn, make_term_urn
 | |
| from datahub.emitter.mcp import MetadataChangeProposalWrapper
 | |
| 
 | |
| # read-modify-write requires access to the DataHubGraph (RestEmitter is not enough)
 | |
| from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
 | |
| 
 | |
| # Imports for metadata model classes
 | |
| from datahub.metadata.schema_classes import (
 | |
|     AuditStampClass,
 | |
|     ChangeTypeClass,
 | |
|     EditableSchemaFieldInfoClass,
 | |
|     EditableSchemaMetadataClass,
 | |
|     GlossaryTermAssociationClass,
 | |
|     GlossaryTermsClass,
 | |
| )
 | |
| 
 | |
| log = logging.getLogger(__name__)
 | |
| logging.basicConfig(level=logging.INFO)
 | |
| 
 | |
| 
 | |
| def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
 | |
|     """A helper function to extract simple . path notation from the v2 field path"""
 | |
|     if not field_path.startswith("[version=2.0]"):
 | |
|         # not a v2, we assume this is a simple path
 | |
|         return field_path
 | |
|         # this is a v2 field path
 | |
|     tokens = [
 | |
|         t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
 | |
|     ]
 | |
| 
 | |
|     return ".".join(tokens)
 | |
| 
 | |
| 
 | |
| # Inputs -> the column, dataset and the term to set
 | |
| column = "address.zipcode"
 | |
| dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")
 | |
| term_to_add = make_term_urn("Classification.Location")
 | |
| 
 | |
| 
 | |
| # First we get the current editable schema metadata
 | |
| gms_endpoint = "http://localhost:8080"
 | |
| graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))
 | |
| 
 | |
| 
 | |
| current_editable_schema_metadata = graph.get_aspect(
 | |
|     entity_urn=dataset_urn, aspect_type=EditableSchemaMetadataClass
 | |
| )
 | |
| 
 | |
| 
 | |
| # Some pre-built objects to help all the conditional pathways
 | |
| now = int(time.time() * 1000)  # milliseconds since epoch
 | |
| current_timestamp = AuditStampClass(time=now, actor="urn:li:corpuser:ingestion")
 | |
| 
 | |
| term_association_to_add = GlossaryTermAssociationClass(urn=term_to_add)
 | |
| term_aspect_to_set = GlossaryTermsClass(
 | |
|     terms=[term_association_to_add], auditStamp=current_timestamp
 | |
| )
 | |
| field_info_to_set = EditableSchemaFieldInfoClass(
 | |
|     fieldPath=column, glossaryTerms=term_aspect_to_set
 | |
| )
 | |
| 
 | |
| need_write = False
 | |
| field_match = False
 | |
| if current_editable_schema_metadata:
 | |
|     for fieldInfo in current_editable_schema_metadata.editableSchemaFieldInfo:
 | |
|         if get_simple_field_path_from_v2_field_path(fieldInfo.fieldPath) == column:
 | |
|             # we have some editable schema metadata for this field
 | |
|             field_match = True
 | |
|             if fieldInfo.glossaryTerms:
 | |
|                 if term_to_add not in [x.urn for x in fieldInfo.glossaryTerms.terms]:
 | |
|                     # this tag is not present
 | |
|                     fieldInfo.glossaryTerms.terms.append(term_association_to_add)
 | |
|                     need_write = True
 | |
|             else:
 | |
|                 fieldInfo.glossaryTerms = term_aspect_to_set
 | |
|                 need_write = True
 | |
| 
 | |
|     if not field_match:
 | |
|         # this field isn't present in the editable schema metadata aspect, add it
 | |
|         field_info = field_info_to_set
 | |
|         current_editable_schema_metadata.editableSchemaFieldInfo.append(field_info)
 | |
|         need_write = True
 | |
| 
 | |
| else:
 | |
|     # create a brand new editable schema metadata aspect
 | |
|     current_editable_schema_metadata = EditableSchemaMetadataClass(
 | |
|         editableSchemaFieldInfo=[field_info_to_set],
 | |
|         created=current_timestamp,
 | |
|     )
 | |
|     need_write = True
 | |
| 
 | |
| if need_write:
 | |
|     event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
 | |
|         entityType="dataset",
 | |
|         changeType=ChangeTypeClass.UPSERT,
 | |
|         entityUrn=dataset_urn,
 | |
|         aspectName="editableSchemaMetadata",
 | |
|         aspect=current_editable_schema_metadata,
 | |
|     )
 | |
|     graph.emit(event)
 | |
|     log.info(f"Tag {term_to_add} added to column {column} of dataset {dataset_urn}")
 | |
| 
 | |
| else:
 | |
|     log.info(f"Tag {term_to_add} already attached to column {column}, omitting write")
 | 
