mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-31 18:59:23 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			95 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			95 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from datahub.ingestion.extractor.schema_util import AvroToMceSchemaConverter
 | |
| from avro.schema import parse as parse_avro, RecordSchema
 | |
| from datahub.emitter.synchronized_file_emitter import SynchronizedFileEmitter
 | |
| import datahub.metadata.schema_classes as models
 | |
| import click
 | |
| from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn
 | |
| from datahub.emitter.mcp import MetadataChangeProposalWrapper
 | |
| import os
 | |
| import hashlib
 | |
| from datahub.ingestion.graph.client import get_default_graph
 | |
| 
 | |
| 
 | |
| def get_schema_hash(schema):
 | |
|     # Convert schema to string if it isn't already
 | |
|     schema_str = str(schema)
 | |
| 
 | |
|     # Create MD5 hash
 | |
|     schema_hash = hashlib.md5(schema_str.encode("utf-8")).hexdigest()
 | |
| 
 | |
|     return schema_hash
 | |
| 
 | |
| 
 | |
| @click.command(name="avro2datahub")
 | |
| @click.option("--input-file", "-i", type=click.Path(exists=True), required=True)
 | |
| @click.option("--platform", type=str, required=True)
 | |
| @click.option("--output-file", "-o", type=click.Path(), default="metadata.py.json")
 | |
| @click.option("--to-file", "-f", is_flag=True, default=True)
 | |
| @click.option("--to-server", "-s", is_flag=True, default=False)
 | |
| def generate_schema_file_from_avro_schema(
 | |
|     input_file: str, platform: str, output_file: str, to_file: bool, to_server: bool
 | |
| ):
 | |
|     avro_schema_file = input_file
 | |
|     output_file_name = output_file
 | |
|     platform_urn = make_data_platform_urn(platform)
 | |
|     converter = AvroToMceSchemaConverter(is_key_schema=False)
 | |
| 
 | |
|     # Delete the output file if it exists
 | |
|     if os.path.exists(output_file_name):
 | |
|         os.remove(output_file_name)
 | |
| 
 | |
|     with open(avro_schema_file) as f:
 | |
|         raw_string = f.read()
 | |
|         avro_schema = parse_avro(raw_string)
 | |
|         # Get fingerprint bytes
 | |
|         canonical_form = avro_schema.canonical_form
 | |
|         print(
 | |
|             f"Schema canonical form: Length ({len(canonical_form)}); {canonical_form}"
 | |
|         )
 | |
|         md5_bytes = avro_schema.fingerprint("md5")
 | |
|         # Convert to hex string
 | |
|         avro_schema_hash = md5_bytes.hex()
 | |
|         assert isinstance(
 | |
|             avro_schema, RecordSchema
 | |
|         ), "This command only works for Avro records"
 | |
|         dataset_urn = make_dataset_urn(
 | |
|             platform=platform_urn,
 | |
|             name=(
 | |
|                 f"{avro_schema.namespace}.{avro_schema.name}"
 | |
|                 if avro_schema.namespace
 | |
|                 else avro_schema.name
 | |
|             ),
 | |
|         )
 | |
|         schema_fields = [
 | |
|             f for f in converter.to_mce_fields(avro_schema, is_key_schema=False)
 | |
|         ]
 | |
|         schema_metadata = models.SchemaMetadataClass(
 | |
|             schemaName=avro_schema.name,
 | |
|             platform=platform_urn,
 | |
|             version=0,
 | |
|             hash=avro_schema_hash,
 | |
|             platformSchema=models.OtherSchemaClass(rawSchema=raw_string),
 | |
|             fields=schema_fields,
 | |
|         )
 | |
|         assert schema_metadata.validate()
 | |
|         if to_file:
 | |
|             with SynchronizedFileEmitter(output_file_name) as file_emitter:
 | |
|                 file_emitter.emit(
 | |
|                     MetadataChangeProposalWrapper(
 | |
|                         entityUrn=dataset_urn, aspect=schema_metadata
 | |
|                     )
 | |
|                 )
 | |
|         if to_server:
 | |
|             with get_default_graph() as graph:
 | |
|                 graph.emit(
 | |
|                     MetadataChangeProposalWrapper(
 | |
|                         entityUrn=dataset_urn, aspect=schema_metadata
 | |
|                     )
 | |
|                 )
 | |
| 
 | |
|     print(f"Wrote metadata to {output_file}")
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     generate_schema_file_from_avro_schema()
 | 
