mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-13 02:57:03 +00:00
fix(ingest): switch to avro from deprecated avro-python3 (#3412)
This commit is contained in:
parent
c6a657939a
commit
6e5d1fe42f
@ -22,16 +22,15 @@ def merge_schemas(schemas: List[str]) -> str:
|
|||||||
schemas_obj = [json.loads(schema) for schema in schemas]
|
schemas_obj = [json.loads(schema) for schema in schemas]
|
||||||
merged = ["null"] + schemas_obj
|
merged = ["null"] + schemas_obj
|
||||||
|
|
||||||
# Deduplicate repeated names.
|
# Patch add_name method to NOT complain about duplicate names
|
||||||
def Register(self, schema):
|
def add_name(self, name_attr, space_attr, new_schema):
|
||||||
if schema.fullname in self._names:
|
to_add = avro.schema.Name(name_attr, space_attr, self.default_namespace)
|
||||||
# print(f"deduping {schema.fullname}")
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
self._names[schema.fullname] = schema
|
|
||||||
|
|
||||||
with unittest.mock.patch("avro.schema.Names.Register", Register):
|
self.names[to_add.fullname] = new_schema
|
||||||
cleaned_schema = avro.schema.SchemaFromJSONData(merged)
|
return to_add
|
||||||
|
|
||||||
|
with unittest.mock.patch("avro.schema.Names.add_name", add_name):
|
||||||
|
cleaned_schema = avro.schema.make_avsc_object(merged)
|
||||||
|
|
||||||
# Convert back to an Avro schema JSON representation.
|
# Convert back to an Avro schema JSON representation.
|
||||||
class MappingProxyEncoder(json.JSONEncoder):
|
class MappingProxyEncoder(json.JSONEncoder):
|
||||||
@ -52,7 +51,9 @@ autogen_header = """# flake8: noqa
|
|||||||
|
|
||||||
# fmt: off
|
# fmt: off
|
||||||
"""
|
"""
|
||||||
autogen_footer = "# fmt: on\n"
|
autogen_footer = """
|
||||||
|
# fmt: on
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
def suppress_checks_in_file(filepath: Union[str, Path]) -> None:
|
def suppress_checks_in_file(filepath: Union[str, Path]) -> None:
|
||||||
@ -71,6 +72,30 @@ def suppress_checks_in_file(filepath: Union[str, Path]) -> None:
|
|||||||
f.write(autogen_footer)
|
f.write(autogen_footer)
|
||||||
|
|
||||||
|
|
||||||
|
def add_avro_python3_warning(filepath: Path) -> None:
|
||||||
|
contents = filepath.read_text()
|
||||||
|
|
||||||
|
contents = f"""
|
||||||
|
# The SchemaFromJSONData method only exists in avro-python3, but is called make_avsc_object in avro.
|
||||||
|
# We can use this fact to detect conflicts between the two packages. Pip won't detect those conflicts
|
||||||
|
# because both are namespace packages, and hence are allowed to overwrite files from each other.
|
||||||
|
# This means that installation order matters, which is a pretty unintuitive outcome.
|
||||||
|
# See https://github.com/pypa/pip/issues/4625 for details.
|
||||||
|
try:
|
||||||
|
from avro.schema import SchemaFromJSONData
|
||||||
|
import warnings
|
||||||
|
|
||||||
|
warnings.warn("It seems like 'avro-python3' is installed, which conflicts with the 'avro' package used by datahub. "
|
||||||
|
+ "Try running `pip uninstall avro-python3 && pip install --upgrade --force-reinstall avro` to fix this issue.")
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
{contents}
|
||||||
|
"""
|
||||||
|
|
||||||
|
filepath.write_text(contents)
|
||||||
|
|
||||||
|
|
||||||
load_schema_method = """
|
load_schema_method = """
|
||||||
import functools
|
import functools
|
||||||
import pathlib
|
import pathlib
|
||||||
@ -103,9 +128,10 @@ def generate(schema_files: List[str], outdir: str) -> None:
|
|||||||
merged_schema = merge_schemas(list(schemas.values()))
|
merged_schema = merge_schemas(list(schemas.values()))
|
||||||
|
|
||||||
write_schema_files(merged_schema, outdir)
|
write_schema_files(merged_schema, outdir)
|
||||||
with open(f"{outdir}/__init__.py", "w"):
|
|
||||||
# Truncate this file.
|
# Schema files post-processing.
|
||||||
pass
|
(Path(outdir) / "__init__.py").write_text("# This file is intentionally empty.\n")
|
||||||
|
add_avro_python3_warning(Path(outdir) / "schema_classes.py")
|
||||||
|
|
||||||
# Save raw schema files in codegen as well.
|
# Save raw schema files in codegen as well.
|
||||||
schema_save_dir = Path(outdir) / "schemas"
|
schema_save_dir = Path(outdir) / "schemas"
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
set -euxo pipefail
|
set -euo pipefail
|
||||||
|
|
||||||
OUTDIR=./src/datahub/metadata
|
OUTDIR=./src/datahub/metadata
|
||||||
|
|
||||||
@ -8,7 +8,7 @@ DATAHUB_ROOT=..
|
|||||||
SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin"
|
SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin"
|
||||||
FILES="$SCHEMAS_ROOT/mxe/MetadataChangeEvent.avsc $SCHEMAS_ROOT/mxe/MetadataChangeProposal.avsc $SCHEMAS_ROOT/usage/UsageAggregation.avsc"
|
FILES="$SCHEMAS_ROOT/mxe/MetadataChangeEvent.avsc $SCHEMAS_ROOT/mxe/MetadataChangeProposal.avsc $SCHEMAS_ROOT/usage/UsageAggregation.avsc"
|
||||||
# Since we depend on jq, check if jq is installed
|
# Since we depend on jq, check if jq is installed
|
||||||
if ! which jq; then
|
if ! which jq > /dev/null; then
|
||||||
echo "jq is not installed. Please install jq and rerun (https://stedolan.github.io/jq/)"
|
echo "jq is not installed. Please install jq and rerun (https://stedolan.github.io/jq/)"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|||||||
@ -38,8 +38,8 @@ framework_common = {
|
|||||||
"entrypoints",
|
"entrypoints",
|
||||||
"docker",
|
"docker",
|
||||||
"expandvars>=0.6.5",
|
"expandvars>=0.6.5",
|
||||||
"avro-gen3==0.6.0",
|
"avro-gen3==0.7.1",
|
||||||
"avro-python3>=1.8.2",
|
"avro>=1.10.2",
|
||||||
"python-dateutil>=2.8.0",
|
"python-dateutil>=2.8.0",
|
||||||
"stackprinter",
|
"stackprinter",
|
||||||
"tabulate",
|
"tabulate",
|
||||||
|
|||||||
@ -51,6 +51,11 @@ AvroNonNestedSchemas = Union[
|
|||||||
|
|
||||||
FieldStack = List[avro.schema.Field]
|
FieldStack = List[avro.schema.Field]
|
||||||
|
|
||||||
|
# The latest avro code contains this type definition in a compatibility module,
|
||||||
|
# but that has not yet been released to PyPI. In the interim, we define it ourselves.
|
||||||
|
# https://github.com/apache/avro/blob/e5811b404ac01fac0d0d6e223d62441554c9cbe9/lang/py/avro/compatibility.py#L48
|
||||||
|
AVRO_TYPE_NULL = "null"
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# AvroToMceSchemaConverter
|
# AvroToMceSchemaConverter
|
||||||
|
|
||||||
@ -62,7 +67,7 @@ class AvroToMceSchemaConverter:
|
|||||||
version_string: str = "[version=2.0]"
|
version_string: str = "[version=2.0]"
|
||||||
|
|
||||||
field_type_mapping: Dict[str, Any] = {
|
field_type_mapping: Dict[str, Any] = {
|
||||||
"null": NullTypeClass,
|
AVRO_TYPE_NULL: NullTypeClass,
|
||||||
"bool": BooleanTypeClass,
|
"bool": BooleanTypeClass,
|
||||||
"boolean": BooleanTypeClass,
|
"boolean": BooleanTypeClass,
|
||||||
"int": NumberTypeClass,
|
"int": NumberTypeClass,
|
||||||
@ -121,26 +126,30 @@ class AvroToMceSchemaConverter:
|
|||||||
if isinstance(schema, avro.schema.UnionSchema):
|
if isinstance(schema, avro.schema.UnionSchema):
|
||||||
return any(self._is_nullable(sub_schema) for sub_schema in schema.schemas)
|
return any(self._is_nullable(sub_schema) for sub_schema in schema.schemas)
|
||||||
elif isinstance(schema, avro.schema.PrimitiveSchema):
|
elif isinstance(schema, avro.schema.PrimitiveSchema):
|
||||||
return schema.name == "null"
|
return schema.type == AVRO_TYPE_NULL
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _get_cur_field_path(self) -> str:
|
def _get_cur_field_path(self) -> str:
|
||||||
return ".".join(self._prefix_name_stack)
|
return ".".join(self._prefix_name_stack)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _strip_namespace(name_or_fullname: str) -> str:
|
||||||
|
return name_or_fullname.rsplit(".", maxsplit=1)[-1]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str:
|
def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str:
|
||||||
if isinstance(schema, (avro.schema.RecordSchema, avro.schema.Field)):
|
if isinstance(schema, (avro.schema.RecordSchema, avro.schema.Field)):
|
||||||
# For Records, fields, always return the name.
|
# For Records, fields, always return the name.
|
||||||
return schema.name
|
return AvroToMceSchemaConverter._strip_namespace(schema.name)
|
||||||
|
|
||||||
# For optional, use the underlying non-null type
|
# For optional, use the underlying non-null type
|
||||||
if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2:
|
if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2:
|
||||||
# Optional types as unions in AVRO. Return underlying non-null sub-type.
|
# Optional types as unions in AVRO. Return underlying non-null sub-type.
|
||||||
(first, second) = schema.schemas
|
(first, second) = schema.schemas
|
||||||
if first.type == avro.schema.NULL:
|
if first.type == AVRO_TYPE_NULL:
|
||||||
return second.type
|
return second.type
|
||||||
elif second.type == avro.schema.NULL:
|
elif second.type == AVRO_TYPE_NULL:
|
||||||
return first.type
|
return first.type
|
||||||
|
|
||||||
# For everything else, use the schema's type
|
# For everything else, use the schema's type
|
||||||
@ -160,9 +169,9 @@ class AvroToMceSchemaConverter:
|
|||||||
) -> AvroNestedSchemas:
|
) -> AvroNestedSchemas:
|
||||||
if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2:
|
if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2:
|
||||||
(first, second) = schema.schemas
|
(first, second) = schema.schemas
|
||||||
if first.type == avro.schema.NULL:
|
if first.type == AVRO_TYPE_NULL:
|
||||||
return second
|
return second
|
||||||
elif second.type == avro.schema.NULL:
|
elif second.type == AVRO_TYPE_NULL:
|
||||||
return first
|
return first
|
||||||
return default
|
return default
|
||||||
|
|
||||||
@ -273,7 +282,7 @@ class AvroToMceSchemaConverter:
|
|||||||
yield is_option_as_union_type
|
yield is_option_as_union_type
|
||||||
else:
|
else:
|
||||||
for sub_schema in schema.schemas:
|
for sub_schema in schema.schemas:
|
||||||
if sub_schema.type != avro.schema.NULL:
|
if sub_schema.type != AVRO_TYPE_NULL:
|
||||||
yield sub_schema
|
yield sub_schema
|
||||||
# Record type
|
# Record type
|
||||||
elif isinstance(schema, avro.schema.RecordSchema):
|
elif isinstance(schema, avro.schema.RecordSchema):
|
||||||
|
|||||||
@ -131,7 +131,7 @@ def test_avro_schema_to_mce_fields_record_with_two_fields():
|
|||||||
{
|
{
|
||||||
"type": "record",
|
"type": "record",
|
||||||
"name": "some.event.name",
|
"name": "some.event.name",
|
||||||
"namespace": "some.event.namespace",
|
"namespace": "not.relevant.namespace",
|
||||||
"fields": [
|
"fields": [
|
||||||
{
|
{
|
||||||
"name": "a",
|
"name": "a",
|
||||||
@ -165,12 +165,35 @@ def test_avro_schema_to_mce_fields_toplevel_isnt_a_record():
|
|||||||
assret_field_paths_match(fields, expected_field_paths)
|
assret_field_paths_match(fields, expected_field_paths)
|
||||||
|
|
||||||
|
|
||||||
|
def test_avro_schema_namespacing():
|
||||||
|
schema = """
|
||||||
|
{
|
||||||
|
"type": "record",
|
||||||
|
"name": "name",
|
||||||
|
"namespace": "should.not.show.up.namespace",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"name": "aStringField",
|
||||||
|
"type": "string",
|
||||||
|
"doc": "some docs",
|
||||||
|
"default": "this is custom, default value"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
fields = avro_schema_to_mce_fields(schema)
|
||||||
|
expected_field_paths = [
|
||||||
|
"[version=2.0].[type=name].[type=string].aStringField",
|
||||||
|
]
|
||||||
|
assret_field_paths_match(fields, expected_field_paths)
|
||||||
|
|
||||||
|
|
||||||
def test_avro_schema_to_mce_fields_with_default():
|
def test_avro_schema_to_mce_fields_with_default():
|
||||||
schema = """
|
schema = """
|
||||||
{
|
{
|
||||||
"type": "record",
|
"type": "record",
|
||||||
"name": "some.event.name",
|
"name": "some.event.name",
|
||||||
"namespace": "some.event.namespace",
|
"namespace": "not.relevant.namespace",
|
||||||
"fields": [
|
"fields": [
|
||||||
{
|
{
|
||||||
"name": "aStringField",
|
"name": "aStringField",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user