2022-06-06 09:31:44 +02:00
import glob
import json
import os
import sys
from typing import Any , Dict , List
def get_base ( ) - > Any :
return {
2023-12-14 16:54:40 -05:00
" $schema " : " https://json-schema.org/draft/2020-12/schema " ,
2022-06-06 09:31:44 +02:00
" id " : " https://json.schemastore.org/datahub-ingestion " ,
" title " : " Datahub Ingestion " ,
" description " : " Root schema of Datahub Ingestion " ,
" definitions " : {
" console_sink " : {
" type " : " object " ,
" properties " : {
" type " : {
" type " : " string " ,
" enum " : [ " console " ] ,
}
} ,
" required " : [ " type " ] ,
} ,
" file_sink " : {
" type " : " object " ,
" properties " : {
" type " : { " type " : " string " , " enum " : [ " file " ] } ,
" config " : { " $ref " : " #/definitions/file_sink_config " } ,
} ,
" required " : [ " type " , " config " ] ,
} ,
" file_sink_config " : {
" type " : " object " ,
" properties " : {
" filename " : {
" description " : " Path to file to write to. " ,
" type " : " string " ,
}
} ,
" required " : [ " filename " ] ,
" additionalProperties " : False ,
} ,
" datahub_rest_sink " : {
" type " : " object " ,
" properties " : {
" type " : { " type " : " string " , " enum " : [ " datahub-rest " ] } ,
" config " : { " $ref " : " #/definitions/datahub_rest_sink_config " } ,
} ,
" required " : [ " type " , " config " ] ,
" additionalProperties " : False ,
} ,
" datahub_rest_sink_config " : {
" type " : " object " ,
" properties " : {
" ca_certificate_path " : {
" type " : " string " ,
2023-08-10 15:43:06 +05:30
" description " : " Path to server ' s CA certificate for verification of HTTPS communications " ,
} ,
" client_certificate_path " : {
" type " : " string " ,
" descritption " : " Path to client ' s CA certificate for HTTPS communications " ,
2022-06-06 09:31:44 +02:00
} ,
" max_threads " : {
" type " : " number " ,
" description " : " Experimental: Max parallelism for REST API calls " ,
2023-05-02 22:54:15 -07:00
" default " : 15 ,
2022-06-06 09:31:44 +02:00
} ,
" retry_status_codes " : {
" type " : " array " ,
" items " : { " type " : " number " } ,
" description " : " Retry HTTP request also on these status codes " ,
" default " : [ 429 , 502 , 503 , 504 ] ,
} ,
" server " : {
" type " : " string " ,
" description " : " URL of DataHub GMS endpoint. " ,
} ,
" timeout_sec " : {
" type " : " number " ,
" description " : " Per-HTTP request timeout. " ,
" default " : 30 ,
} ,
" token " : {
" type " : " string " ,
" description " : " Bearer token used for authentication. " ,
} ,
" extra_headers " : {
" type " : " string " ,
" description " : " Extra headers which will be added to the request. " ,
} ,
2022-08-16 15:31:46 +02:00
" disable_ssl_verification " : {
2022-09-13 12:29:20 +02:00
" type " : " boolean " ,
2022-08-16 15:31:46 +02:00
" description " : " Disable SSL verification for HTTPS communications. " ,
" default " : False ,
} ,
2022-06-06 09:31:44 +02:00
} ,
" required " : [ " server " ] ,
" additionalProperties " : False ,
} ,
" datahub_kafka_sink " : {
" type " : " object " ,
" properties " : {
" type " : { " type " : " string " , " enum " : [ " datahub-kafka " ] } ,
" config " : { " $ref " : " #/definitions/datahub_kafka_sink_config " } ,
} ,
" required " : [ " type " , " config " ] ,
" additionalProperties " : False ,
} ,
" datahub_kafka_sink_config " : {
" type " : " object " ,
" properties " : {
" connection " : {
" type " : " object " ,
" properties " : {
" bootstrap " : {
" type " : " string " ,
" description " : " Kafka bootstrap URL. " ,
2023-12-14 16:54:40 -05:00
" default " : " localhost:9092 " ,
2022-06-06 09:31:44 +02:00
} ,
" producer_config " : {
" type " : " object " ,
" description " : " Passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.SerializingProducer " ,
} ,
" schema_registry_url " : {
" type " : " string " ,
" description " : " URL of schema registry being used. " ,
2023-12-14 16:54:40 -05:00
" default " : " http://localhost:8081 " ,
2022-06-06 09:31:44 +02:00
} ,
" schema_registry_config " : {
" type " : " object " ,
" description " : " Passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.schema_registry.SchemaRegistryClient " ,
} ,
} ,
" additionalProperties " : False ,
" required " : [ " bootstrap " , " schema_registry_url " ] ,
} ,
" topic_routes " : {
" type " : " object " ,
" properties " : {
" MetadataChangeEvent " : {
" type " : " string " ,
" description " : " Overridden Kafka topic name for the MetadataChangeEvent " ,
" default " : " MetadataChangeEvent " ,
} ,
" MetadataChangeProposal " : {
" type " : " string " ,
" description " : " Overridden Kafka topic name for the MetadataChangeProposal " ,
" default " : " MetadataChangeProposal " ,
} ,
} ,
" additionalProperties " : False ,
} ,
} ,
" required " : [ " connection " ] ,
" additionalProperties " : False ,
} ,
} ,
" type " : " object " ,
" properties " : {
" source " : { " anyOf " : [ ] } ,
" transformers " : {
" type " : " array " ,
" items " : {
" type " : " object " ,
2025-04-28 23:34:33 +09:00
" description " : " Transformer configs see at https://docs.datahub.com/docs/metadata-ingestion/docs/transformer " ,
2022-06-06 09:31:44 +02:00
" properties " : {
" type " : { " type " : " string " , " description " : " Transformer type " } ,
" config " : {
" type " : " object " ,
" description " : " Transformer config " ,
} ,
} ,
" required " : [ " type " ] ,
" additionalProperties " : False ,
} ,
} ,
" sink " : {
" description " : " sink " ,
" anyOf " : [
{ " $ref " : " #/definitions/datahub_kafka_sink " } ,
{ " $ref " : " #/definitions/datahub_rest_sink " } ,
{ " $ref " : " #/definitions/console_sink " } ,
{ " $ref " : " #/definitions/file_sink " } ,
] ,
} ,
} ,
2022-06-08 00:52:26 +02:00
" required " : [ " source " ] ,
2022-06-06 09:31:44 +02:00
}
configs : Dict [ str , Any ] = { }
definitions = { }
refs : List [ Dict ] = [ ]
if len ( sys . argv ) != 3 :
print (
""" \
Usage :
gen_json_schema . py config_schema_dir output_file
"""
)
sys . exit ( 0 )
config_schemas_dir : str = sys . argv [ 1 ]
output_file : str = sys . argv [ 2 ]
for jfile in glob . glob ( f " { config_schemas_dir } /* " ) :
config_name : str = os . path . splitext ( os . path . basename ( jfile ) ) [ 0 ] . split ( " _ " ) [ 0 ]
print ( f " ConfigName: { config_name } " )
f = open ( jfile )
data = json . load ( f )
source_obj = {
" type " : " object " ,
" properties " : {
" type " : { " type " : " string " , " enum " : [ f " { config_name } " ] } ,
" config " : { " $ref " : f " #/definitions/ { config_name } _config " } ,
} ,
" required " : [ " type " , " config " ] ,
}
configs [ f " { config_name } " ] = source_obj
if " definitions " in data :
definitions . update ( data [ " definitions " ] )
data . pop ( " definitions " , None )
configs [ f " { config_name } _config " ] = data
ref = { " $ref " : f " #/definitions/ { config_name } " }
refs . append ( ref )
base = get_base ( )
base [ " definitions " ] . update ( configs )
base [ " definitions " ] . update ( definitions )
print ( base [ " properties " ] [ " source " ] )
base [ " properties " ] [ " source " ] [ " anyOf " ] = base [ " properties " ] [ " source " ] [ " anyOf " ] + refs
with open ( f " { output_file } " , " w " ) as outfile :
json . dump ( base , outfile , indent = 4 )