diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py index f61ef48081..05d8fb1c58 100644 --- a/metadata-ingestion/scripts/avro_codegen.py +++ b/metadata-ingestion/scripts/avro_codegen.py @@ -22,16 +22,15 @@ def merge_schemas(schemas: List[str]) -> str: schemas_obj = [json.loads(schema) for schema in schemas] merged = ["null"] + schemas_obj - # Deduplicate repeated names. - def Register(self, schema): - if schema.fullname in self._names: - # print(f"deduping {schema.fullname}") - pass - else: - self._names[schema.fullname] = schema + # Patch add_name method to NOT complain about duplicate names + def add_name(self, name_attr, space_attr, new_schema): + to_add = avro.schema.Name(name_attr, space_attr, self.default_namespace) - with unittest.mock.patch("avro.schema.Names.Register", Register): - cleaned_schema = avro.schema.SchemaFromJSONData(merged) + self.names[to_add.fullname] = new_schema + return to_add + + with unittest.mock.patch("avro.schema.Names.add_name", add_name): + cleaned_schema = avro.schema.make_avsc_object(merged) # Convert back to an Avro schema JSON representation. class MappingProxyEncoder(json.JSONEncoder): @@ -52,7 +51,9 @@ autogen_header = """# flake8: noqa # fmt: off """ -autogen_footer = "# fmt: on\n" +autogen_footer = """ +# fmt: on +""" def suppress_checks_in_file(filepath: Union[str, Path]) -> None: @@ -71,6 +72,30 @@ def suppress_checks_in_file(filepath: Union[str, Path]) -> None: f.write(autogen_footer) +def add_avro_python3_warning(filepath: Path) -> None: + contents = filepath.read_text() + + contents = f""" +# The SchemaFromJSONData method only exists in avro-python3, but is called make_avsc_object in avro. +# We can use this fact to detect conflicts between the two packages. Pip won't detect those conflicts +# because both are namespace packages, and hence are allowed to overwrite files from each other. +# This means that installation order matters, which is a pretty unintuitive outcome. +# See https://github.com/pypa/pip/issues/4625 for details. +try: + from avro.schema import SchemaFromJSONData + import warnings + + warnings.warn("It seems like 'avro-python3' is installed, which conflicts with the 'avro' package used by datahub. " + + "Try running `pip uninstall avro-python3 && pip install --upgrade --force-reinstall avro` to fix this issue.") +except ImportError: + pass + +{contents} + """ + + filepath.write_text(contents) + + load_schema_method = """ import functools import pathlib @@ -103,9 +128,10 @@ def generate(schema_files: List[str], outdir: str) -> None: merged_schema = merge_schemas(list(schemas.values())) write_schema_files(merged_schema, outdir) - with open(f"{outdir}/__init__.py", "w"): - # Truncate this file. - pass + + # Schema files post-processing. + (Path(outdir) / "__init__.py").write_text("# This file is intentionally empty.\n") + add_avro_python3_warning(Path(outdir) / "schema_classes.py") # Save raw schema files in codegen as well. schema_save_dir = Path(outdir) / "schemas" diff --git a/metadata-ingestion/scripts/codegen.sh b/metadata-ingestion/scripts/codegen.sh index 496debdec2..f96a1e2e30 100755 --- a/metadata-ingestion/scripts/codegen.sh +++ b/metadata-ingestion/scripts/codegen.sh @@ -1,5 +1,5 @@ #!/bin/bash -set -euxo pipefail +set -euo pipefail OUTDIR=./src/datahub/metadata @@ -8,7 +8,7 @@ DATAHUB_ROOT=.. SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin" FILES="$SCHEMAS_ROOT/mxe/MetadataChangeEvent.avsc $SCHEMAS_ROOT/mxe/MetadataChangeProposal.avsc $SCHEMAS_ROOT/usage/UsageAggregation.avsc" # Since we depend on jq, check if jq is installed -if ! which jq; then +if ! which jq > /dev/null; then echo "jq is not installed. Please install jq and rerun (https://stedolan.github.io/jq/)" exit 1 fi diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 8ea5438546..903c3aa6bb 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -38,8 +38,8 @@ framework_common = { "entrypoints", "docker", "expandvars>=0.6.5", - "avro-gen3==0.6.0", - "avro-python3>=1.8.2", + "avro-gen3==0.7.1", + "avro>=1.10.2", "python-dateutil>=2.8.0", "stackprinter", "tabulate", diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index 4873358293..0a30c80ebb 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -51,6 +51,11 @@ AvroNonNestedSchemas = Union[ FieldStack = List[avro.schema.Field] +# The latest avro code contains this type definition in a compatibility module, +# but that has not yet been released to PyPI. In the interim, we define it ourselves. +# https://github.com/apache/avro/blob/e5811b404ac01fac0d0d6e223d62441554c9cbe9/lang/py/avro/compatibility.py#L48 +AVRO_TYPE_NULL = "null" + # ------------------------------------------------------------------------------ # AvroToMceSchemaConverter @@ -62,7 +67,7 @@ class AvroToMceSchemaConverter: version_string: str = "[version=2.0]" field_type_mapping: Dict[str, Any] = { - "null": NullTypeClass, + AVRO_TYPE_NULL: NullTypeClass, "bool": BooleanTypeClass, "boolean": BooleanTypeClass, "int": NumberTypeClass, @@ -121,26 +126,30 @@ class AvroToMceSchemaConverter: if isinstance(schema, avro.schema.UnionSchema): return any(self._is_nullable(sub_schema) for sub_schema in schema.schemas) elif isinstance(schema, avro.schema.PrimitiveSchema): - return schema.name == "null" + return schema.type == AVRO_TYPE_NULL else: return False def _get_cur_field_path(self) -> str: return ".".join(self._prefix_name_stack) + @staticmethod + def _strip_namespace(name_or_fullname: str) -> str: + return name_or_fullname.rsplit(".", maxsplit=1)[-1] + @staticmethod def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str: if isinstance(schema, (avro.schema.RecordSchema, avro.schema.Field)): # For Records, fields, always return the name. - return schema.name + return AvroToMceSchemaConverter._strip_namespace(schema.name) # For optional, use the underlying non-null type if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2: # Optional types as unions in AVRO. Return underlying non-null sub-type. (first, second) = schema.schemas - if first.type == avro.schema.NULL: + if first.type == AVRO_TYPE_NULL: return second.type - elif second.type == avro.schema.NULL: + elif second.type == AVRO_TYPE_NULL: return first.type # For everything else, use the schema's type @@ -160,9 +169,9 @@ class AvroToMceSchemaConverter: ) -> AvroNestedSchemas: if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2: (first, second) = schema.schemas - if first.type == avro.schema.NULL: + if first.type == AVRO_TYPE_NULL: return second - elif second.type == avro.schema.NULL: + elif second.type == AVRO_TYPE_NULL: return first return default @@ -273,7 +282,7 @@ class AvroToMceSchemaConverter: yield is_option_as_union_type else: for sub_schema in schema.schemas: - if sub_schema.type != avro.schema.NULL: + if sub_schema.type != AVRO_TYPE_NULL: yield sub_schema # Record type elif isinstance(schema, avro.schema.RecordSchema): diff --git a/metadata-ingestion/tests/unit/test_schema_util.py b/metadata-ingestion/tests/unit/test_schema_util.py index 971bbfa65b..cb2a2516f6 100644 --- a/metadata-ingestion/tests/unit/test_schema_util.py +++ b/metadata-ingestion/tests/unit/test_schema_util.py @@ -131,7 +131,7 @@ def test_avro_schema_to_mce_fields_record_with_two_fields(): { "type": "record", "name": "some.event.name", - "namespace": "some.event.namespace", + "namespace": "not.relevant.namespace", "fields": [ { "name": "a", @@ -165,12 +165,35 @@ def test_avro_schema_to_mce_fields_toplevel_isnt_a_record(): assret_field_paths_match(fields, expected_field_paths) +def test_avro_schema_namespacing(): + schema = """ +{ + "type": "record", + "name": "name", + "namespace": "should.not.show.up.namespace", + "fields": [ + { + "name": "aStringField", + "type": "string", + "doc": "some docs", + "default": "this is custom, default value" + } + ] +} +""" + fields = avro_schema_to_mce_fields(schema) + expected_field_paths = [ + "[version=2.0].[type=name].[type=string].aStringField", + ] + assret_field_paths_match(fields, expected_field_paths) + + def test_avro_schema_to_mce_fields_with_default(): schema = """ { "type": "record", "name": "some.event.name", - "namespace": "some.event.namespace", + "namespace": "not.relevant.namespace", "fields": [ { "name": "aStringField",