mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-04 04:39:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			241 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import glob
 | 
						|
import json
 | 
						|
import os
 | 
						|
import sys
 | 
						|
from typing import Any, Dict, List
 | 
						|
 | 
						|
 | 
						|
def get_base() -> Any:
 | 
						|
    return {
 | 
						|
        "$schema": "https://json-schema.org/draft/2020-12/schema",
 | 
						|
        "id": "https://json.schemastore.org/datahub-ingestion",
 | 
						|
        "title": "Datahub Ingestion",
 | 
						|
        "description": "Root schema of Datahub Ingestion",
 | 
						|
        "$defs": {
 | 
						|
            "console_sink": {
 | 
						|
                "type": "object",
 | 
						|
                "properties": {
 | 
						|
                    "type": {
 | 
						|
                        "type": "string",
 | 
						|
                        "enum": ["console"],
 | 
						|
                    }
 | 
						|
                },
 | 
						|
                "required": ["type"],
 | 
						|
            },
 | 
						|
            "file_sink": {
 | 
						|
                "type": "object",
 | 
						|
                "properties": {
 | 
						|
                    "type": {"type": "string", "enum": ["file"]},
 | 
						|
                    "config": {"$ref": "#/$defs/file_sink_config"},
 | 
						|
                },
 | 
						|
                "required": ["type", "config"],
 | 
						|
            },
 | 
						|
            "file_sink_config": {
 | 
						|
                "type": "object",
 | 
						|
                "properties": {
 | 
						|
                    "filename": {
 | 
						|
                        "description": "Path to file to write to.",
 | 
						|
                        "type": "string",
 | 
						|
                    }
 | 
						|
                },
 | 
						|
                "required": ["filename"],
 | 
						|
                "additionalProperties": False,
 | 
						|
            },
 | 
						|
            "datahub_rest_sink": {
 | 
						|
                "type": "object",
 | 
						|
                "properties": {
 | 
						|
                    "type": {"type": "string", "enum": ["datahub-rest"]},
 | 
						|
                    "config": {"$ref": "#/$defs/datahub_rest_sink_config"},
 | 
						|
                },
 | 
						|
                "required": ["type", "config"],
 | 
						|
                "additionalProperties": False,
 | 
						|
            },
 | 
						|
            "datahub_rest_sink_config": {
 | 
						|
                "type": "object",
 | 
						|
                "properties": {
 | 
						|
                    "ca_certificate_path": {
 | 
						|
                        "type": "string",
 | 
						|
                        "description": "Path to server's CA certificate for verification of HTTPS communications",
 | 
						|
                    },
 | 
						|
                    "client_certificate_path": {
 | 
						|
                        "type": "string",
 | 
						|
                        "descritption": "Path to client's CA certificate for HTTPS communications",
 | 
						|
                    },
 | 
						|
                    "max_threads": {
 | 
						|
                        "type": "number",
 | 
						|
                        "description": "Experimental: Max parallelism for REST API calls",
 | 
						|
                        "default": 15,
 | 
						|
                    },
 | 
						|
                    "retry_status_codes": {
 | 
						|
                        "type": "array",
 | 
						|
                        "items": {"type": "number"},
 | 
						|
                        "description": "Retry HTTP request also on these status codes",
 | 
						|
                        "default": [429, 502, 503, 504],
 | 
						|
                    },
 | 
						|
                    "server": {
 | 
						|
                        "type": "string",
 | 
						|
                        "description": "URL of DataHub GMS endpoint.",
 | 
						|
                    },
 | 
						|
                    "timeout_sec": {
 | 
						|
                        "type": "number",
 | 
						|
                        "description": "Per-HTTP request timeout.",
 | 
						|
                        "default": 30,
 | 
						|
                    },
 | 
						|
                    "token": {
 | 
						|
                        "type": "string",
 | 
						|
                        "description": "Bearer token used for authentication.",
 | 
						|
                    },
 | 
						|
                    "extra_headers": {
 | 
						|
                        "type": "string",
 | 
						|
                        "description": "Extra headers which will be added to the request.",
 | 
						|
                    },
 | 
						|
                    "disable_ssl_verification": {
 | 
						|
                        "type": "boolean",
 | 
						|
                        "description": "Disable SSL verification for HTTPS communications.",
 | 
						|
                        "default": False,
 | 
						|
                    },
 | 
						|
                },
 | 
						|
                "required": ["server"],
 | 
						|
                "additionalProperties": False,
 | 
						|
            },
 | 
						|
            "datahub_kafka_sink": {
 | 
						|
                "type": "object",
 | 
						|
                "properties": {
 | 
						|
                    "type": {"type": "string", "enum": ["datahub-kafka"]},
 | 
						|
                    "config": {"$ref": "#/$defs/datahub_kafka_sink_config"},
 | 
						|
                },
 | 
						|
                "required": ["type", "config"],
 | 
						|
                "additionalProperties": False,
 | 
						|
            },
 | 
						|
            "datahub_kafka_sink_config": {
 | 
						|
                "type": "object",
 | 
						|
                "properties": {
 | 
						|
                    "connection": {
 | 
						|
                        "type": "object",
 | 
						|
                        "properties": {
 | 
						|
                            "bootstrap": {
 | 
						|
                                "type": "string",
 | 
						|
                                "description": "Kafka bootstrap URL.",
 | 
						|
                                "default": "localhost:9092",
 | 
						|
                            },
 | 
						|
                            "producer_config": {
 | 
						|
                                "type": "object",
 | 
						|
                                "description": "Passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.SerializingProducer",
 | 
						|
                            },
 | 
						|
                            "schema_registry_url": {
 | 
						|
                                "type": "string",
 | 
						|
                                "description": "URL of schema registry being used.",
 | 
						|
                                "default": "http://localhost:8081",
 | 
						|
                            },
 | 
						|
                            "schema_registry_config": {
 | 
						|
                                "type": "object",
 | 
						|
                                "description": "Passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.schema_registry.SchemaRegistryClient",
 | 
						|
                            },
 | 
						|
                        },
 | 
						|
                        "additionalProperties": False,
 | 
						|
                        "required": ["bootstrap", "schema_registry_url"],
 | 
						|
                    },
 | 
						|
                    "topic_routes": {
 | 
						|
                        "type": "object",
 | 
						|
                        "properties": {
 | 
						|
                            "MetadataChangeEvent": {
 | 
						|
                                "type": "string",
 | 
						|
                                "description": "Overridden Kafka topic name for the MetadataChangeEvent",
 | 
						|
                                "default": "MetadataChangeEvent",
 | 
						|
                            },
 | 
						|
                            "MetadataChangeProposal": {
 | 
						|
                                "type": "string",
 | 
						|
                                "description": "Overridden Kafka topic name for the MetadataChangeProposal",
 | 
						|
                                "default": "MetadataChangeProposal",
 | 
						|
                            },
 | 
						|
                        },
 | 
						|
                        "additionalProperties": False,
 | 
						|
                    },
 | 
						|
                },
 | 
						|
                "required": ["connection"],
 | 
						|
                "additionalProperties": False,
 | 
						|
            },
 | 
						|
        },
 | 
						|
        "type": "object",
 | 
						|
        "properties": {
 | 
						|
            "source": {"anyOf": []},
 | 
						|
            "transformers": {
 | 
						|
                "type": "array",
 | 
						|
                "items": {
 | 
						|
                    "type": "object",
 | 
						|
                    "description": "Transformer configs see at https://docs.datahub.com/docs/metadata-ingestion/docs/transformer",
 | 
						|
                    "properties": {
 | 
						|
                        "type": {"type": "string", "description": "Transformer type"},
 | 
						|
                        "config": {
 | 
						|
                            "type": "object",
 | 
						|
                            "description": "Transformer config",
 | 
						|
                        },
 | 
						|
                    },
 | 
						|
                    "required": ["type"],
 | 
						|
                    "additionalProperties": False,
 | 
						|
                },
 | 
						|
            },
 | 
						|
            "sink": {
 | 
						|
                "description": "sink",
 | 
						|
                "anyOf": [
 | 
						|
                    {"$ref": "#/$defs/datahub_kafka_sink"},
 | 
						|
                    {"$ref": "#/$defs/datahub_rest_sink"},
 | 
						|
                    {"$ref": "#/$defs/console_sink"},
 | 
						|
                    {"$ref": "#/$defs/file_sink"},
 | 
						|
                ],
 | 
						|
            },
 | 
						|
        },
 | 
						|
        "required": ["source"],
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
configs: Dict[str, Any] = {}
 | 
						|
definitions = {}
 | 
						|
refs: List[Dict] = []
 | 
						|
 | 
						|
if len(sys.argv) != 3:
 | 
						|
    print(
 | 
						|
        """\
 | 
						|
Usage:
 | 
						|
    gen_json_schema.py config_schema_dir output_file
 | 
						|
"""
 | 
						|
    )
 | 
						|
    sys.exit(0)
 | 
						|
 | 
						|
config_schemas_dir: str = sys.argv[1]
 | 
						|
output_file: str = sys.argv[2]
 | 
						|
 | 
						|
for jfile in glob.glob(f"{config_schemas_dir}/*"):
 | 
						|
    config_name: str = os.path.splitext(os.path.basename(jfile))[0].split("_")[0]
 | 
						|
    print(f"ConfigName: {config_name}")
 | 
						|
 | 
						|
    f = open(jfile)
 | 
						|
    data = json.load(f)
 | 
						|
 | 
						|
    source_obj = {
 | 
						|
        "type": "object",
 | 
						|
        "properties": {
 | 
						|
            "type": {"type": "string", "enum": [f"{config_name}"]},
 | 
						|
            "config": {"$ref": f"#/$defs/{config_name}_config"},
 | 
						|
        },
 | 
						|
        "required": ["type", "config"],
 | 
						|
    }
 | 
						|
    configs[f"{config_name}"] = source_obj
 | 
						|
    if "$defs" in data:
 | 
						|
        definitions.update(data["$defs"])
 | 
						|
        data.pop("$defs", None)
 | 
						|
 | 
						|
    configs[f"{config_name}_config"] = data
 | 
						|
    ref = {"$ref": f"#/$defs/{config_name}"}
 | 
						|
    refs.append(ref)
 | 
						|
 | 
						|
base = get_base()
 | 
						|
base["$defs"].update(configs)
 | 
						|
base["$defs"].update(definitions)
 | 
						|
 | 
						|
print(base["properties"]["source"])
 | 
						|
 | 
						|
base["properties"]["source"]["anyOf"] = base["properties"]["source"]["anyOf"] + refs
 | 
						|
with open(f"{output_file}", "w") as outfile:
 | 
						|
    json.dump(base, outfile, indent=4)
 |