254 lines
8.3 KiB
Python
Raw Normal View History

import json
import types
import unittest.mock
from pathlib import Path
from typing import Any, Dict, Iterable, List, Union
import avro.schema
import click
2021-02-11 23:14:20 -08:00
from avrogen import write_schema_files
def load_schema_file(schema_file: Union[str, Path]) -> dict:
raw_schema_text = Path(schema_file).read_text()
return json.loads(raw_schema_text)
def merge_schemas(schemas_obj: List[Any]) -> str:
# Combine schemas.
merged = ["null"] + schemas_obj
# Patch add_name method to NOT complain about duplicate names
def add_name(self, name_attr, space_attr, new_schema):
to_add = avro.schema.Name(name_attr, space_attr, self.default_namespace)
self.names[to_add.fullname] = new_schema
return to_add
with unittest.mock.patch("avro.schema.Names.add_name", add_name):
cleaned_schema = avro.schema.make_avsc_object(merged)
# Convert back to an Avro schema JSON representation.
class MappingProxyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, types.MappingProxyType):
return dict(obj)
return json.JSONEncoder.default(self, obj)
out_schema = cleaned_schema.to_json()
encoded = json.dumps(out_schema, cls=MappingProxyEncoder, indent=2)
return encoded
autogen_header = """# flake8: noqa
# This file is autogenerated by /metadata-ingestion/scripts/avro_codegen.py
# Do not modify manually!
# pylint: skip-file
# fmt: off
"""
autogen_footer = """
# fmt: on
"""
def suppress_checks_in_file(filepath: Union[str, Path]) -> None:
"""
Adds a couple lines to the top of an autogenerated file:
- Comments to suppress flake8 and black.
- A note stating that the file was autogenerated.
"""
with open(filepath, "r+") as f:
contents = f.read()
f.seek(0, 0)
f.write(autogen_header)
f.write(contents)
f.write(autogen_footer)
def add_avro_python3_warning(filepath: Path) -> None:
contents = filepath.read_text()
contents = f"""
# The SchemaFromJSONData method only exists in avro-python3, but is called make_avsc_object in avro.
# We can use this fact to detect conflicts between the two packages. Pip won't detect those conflicts
# because both are namespace packages, and hence are allowed to overwrite files from each other.
# This means that installation order matters, which is a pretty unintuitive outcome.
# See https://github.com/pypa/pip/issues/4625 for details.
try:
from avro.schema import SchemaFromJSONData
import warnings
warnings.warn("It seems like 'avro-python3' is installed, which conflicts with the 'avro' package used by datahub. "
+ "Try running `pip uninstall avro-python3 && pip install --upgrade --force-reinstall avro` to fix this issue.")
except ImportError:
pass
{contents}
"""
filepath.write_text(contents)
load_schema_method = """
import functools
import pathlib
def _load_schema(schema_name: str) -> str:
return (pathlib.Path(__file__).parent / f"{schema_name}.avsc").read_text()
"""
individual_schema_method = """
@functools.lru_cache(maxsize=None)
def get{schema_name}Schema() -> str:
return _load_schema("{schema_name}")
"""
def make_load_schema_methods(schemas: Iterable[str]) -> str:
return load_schema_method + "".join(
individual_schema_method.format(schema_name=schema) for schema in schemas
2021-02-12 10:46:28 -08:00
)
def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None:
schema_classes_lines = schema_class_file.read_text().splitlines()
line_lookup_table = {line: i for i, line in enumerate(schema_classes_lines)}
# Create the Aspect class.
# We ensure that it cannot be instantiated directly, as
# per https://stackoverflow.com/a/7989101/5004662.
schema_classes_lines[
line_lookup_table["__SCHEMAS: Dict[str, RecordSchema] = {}"]
] += """
class _Aspect(DictWrapper):
ASPECT_NAME: str = None # type: ignore
ASPECT_TYPE: str = "default"
def __init__(self):
if type(self) is _Aspect:
raise TypeError("_Aspect is an abstract class, and cannot be instantiated directly.")
super().__init__()
@classmethod
def get_aspect_name(cls) -> str:
return cls.ASPECT_NAME # type: ignore
@classmethod
def get_aspect_type(cls) -> str:
return cls.ASPECT_TYPE
"""
for aspect in aspects:
className = f'{aspect["name"]}Class'
aspectName = aspect["Aspect"]["name"]
class_def_original = f"class {className}(DictWrapper):"
# Make the aspects inherit from the Aspect class.
class_def_line = line_lookup_table[class_def_original]
schema_classes_lines[class_def_line] = f"class {className}(_Aspect):"
# Define the ASPECT_NAME class attribute.
# There's always an empty line between the docstring and the RECORD_SCHEMA class attribute.
# We need to find it and insert our line of code there.
empty_line = class_def_line + 1
while not (
schema_classes_lines[empty_line].strip() == ""
and schema_classes_lines[empty_line + 1]
.strip()
.startswith("RECORD_SCHEMA = ")
):
empty_line += 1
schema_classes_lines[empty_line] = "\n"
schema_classes_lines[empty_line] += f"\n ASPECT_NAME = '{aspectName}'"
if "type" in aspect["Aspect"]:
schema_classes_lines[
empty_line
] += f"\n ASPECT_TYPE = '{aspect['Aspect']['type']}'"
# Finally, generate a big list of all available aspects.
newline = "\n"
schema_classes_lines.append(
f"""
from typing import Type
ASPECT_CLASSES: List[Type[_Aspect]] = [
{f',{newline} '.join(f"{aspect['name']}Class" for aspect in aspects)}
]
"""
)
schema_class_file.write_text("\n".join(schema_classes_lines))
@click.command()
@click.argument(
"schemas_path", type=click.Path(exists=True, file_okay=False), required=True
)
@click.argument("outdir", type=click.Path(), required=True)
def generate(schemas_path: str, outdir: str) -> None:
required_schema_files = {
"mxe/MetadataChangeEvent.avsc",
"mxe/MetadataChangeProposal.avsc",
"usage/UsageAggregation.avsc",
"mxe/MetadataChangeLog.avsc",
"mxe/PlatformEvent.avsc",
"platform/event/v1/EntityChangeEvent.avsc",
"metadata/query/filter/Filter.avsc", # temporarily added to test reserved keywords support
}
# Find all the aspect schemas / other important schemas.
aspect_file_stems: List[str] = []
schema_files: List[Path] = []
for schema_file in Path(schemas_path).glob("**/*.avsc"):
relative_path = schema_file.relative_to(schemas_path).as_posix()
if relative_path in required_schema_files:
schema_files.append(schema_file)
required_schema_files.remove(relative_path)
elif load_schema_file(schema_file).get("Aspect"):
aspect_file_stems.append(schema_file.stem)
schema_files.append(schema_file)
assert not required_schema_files, f"Schema files not found: {required_schema_files}"
schemas: Dict[str, dict] = {}
for schema_file in schema_files:
schema = load_schema_file(schema_file)
schemas[Path(schema_file).stem] = schema
merged_schema = merge_schemas(list(schemas.values()))
write_schema_files(merged_schema, outdir)
# Schema files post-processing.
(Path(outdir) / "__init__.py").write_text("# This file is intentionally empty.\n")
add_avro_python3_warning(Path(outdir) / "schema_classes.py")
annotate_aspects(
[schemas[aspect_file_stem] for aspect_file_stem in aspect_file_stems],
Path(outdir) / "schema_classes.py",
)
# Save raw schema files in codegen as well.
schema_save_dir = Path(outdir) / "schemas"
schema_save_dir.mkdir()
for schema_out_file, schema in schemas.items():
(schema_save_dir / f"{schema_out_file}.avsc").write_text(
json.dumps(schema, indent=2)
)
# Add load_schema method.
with open(schema_save_dir / "__init__.py", "a") as schema_dir_init:
schema_dir_init.write(make_load_schema_methods(schemas.keys()))
# Add headers for all generated files
generated_files = Path(outdir).glob("**/*.py")
for file in generated_files:
suppress_checks_in_file(file)
if __name__ == "__main__":
generate()