build(ingest): Bump avro pin: security vulnerability (#9042)

This commit is contained in:
Andrew Sikowitz 2023-10-25 13:06:12 -04:00 committed by GitHub
parent dd5d997390
commit 8a80e858a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 99 additions and 54 deletions

View File

@ -152,7 +152,8 @@ def merge_schemas(schemas_obj: List[dict]) -> str:
return encoded
autogen_header = """# flake8: noqa
autogen_header = """# mypy: ignore-errors
# flake8: noqa
# This file is autogenerated by /metadata-ingestion/scripts/avro_codegen.py
# Do not modify manually!

View File

@ -351,8 +351,8 @@ def generate_stitched_record(
field_objects = []
for f in entity_fields:
field = avro.schema.Field(
type=f["type"],
name=f["name"],
f["type"],
f["name"],
has_default=False,
)
field_objects.append(field)

View File

@ -32,7 +32,7 @@ framework_common = {
"expandvars>=0.6.5",
"avro-gen3==0.7.11",
# "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3",
"avro>=1.10.2,<1.11",
"avro>=1.11.3,<1.12",
"python-dateutil>=2.8.0",
"tabulate",
"progressbar2",
@ -355,7 +355,11 @@ plugins: Dict[str, Set[str]] = {
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redshift": sql_common | redshift_common | usage_common | {"redshift-connector"} | sqlglot_lib,
"redshift": sql_common
| redshift_common
| usage_common
| {"redshift-connector"}
| sqlglot_lib,
"redshift-legacy": sql_common | redshift_common | sqlglot_lib,
"redshift-usage-legacy": sql_common | redshift_common | sqlglot_lib | usage_common,
"s3": {*s3_base, *data_lake_profiling},

View File

@ -1,6 +1,18 @@
import json
import logging
from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
Optional,
Type,
Union,
cast,
overload,
)
import avro.schema
@ -54,6 +66,8 @@ AvroNonNestedSchemas = Union[
avro.schema.PrimitiveSchema,
]
SchemaOrField = Union[avro.schema.Schema, avro.schema.Field]
FieldStack = List[avro.schema.Field]
# The latest avro code contains this type definition in a compatibility module,
@ -124,16 +138,22 @@ class AvroToMceSchemaConverter:
self._meta_mapping_processor = meta_mapping_processor
self._schema_tags_field = schema_tags_field
self._tag_prefix = tag_prefix
# Map of avro schema type to the conversion handler
self._avro_type_to_mce_converter_map: Dict[
avro.schema.Schema,
Callable[[ExtendedAvroNestedSchemas], Generator[SchemaField, None, None]],
# TODO: Clean up this type... perhaps refactor
self._avro_type_to_mce_converter_map: Mapping[
Union[
Type[avro.schema.Schema],
Type[avro.schema.Field],
Type[avro.schema.LogicalSchema],
],
Callable[[SchemaOrField], Iterable[SchemaField]],
] = {
avro.schema.RecordSchema: self._gen_from_non_field_nested_schemas,
avro.schema.UnionSchema: self._gen_from_non_field_nested_schemas,
avro.schema.ArraySchema: self._gen_from_non_field_nested_schemas,
avro.schema.MapSchema: self._gen_from_non_field_nested_schemas,
avro.schema.Field: self._gen_nested_schema_from_field,
avro.schema.Field: self._gen_nested_schema_from_field, # type: ignore
avro.schema.PrimitiveSchema: self._gen_non_nested_to_mce_fields,
avro.schema.FixedSchema: self._gen_non_nested_to_mce_fields,
avro.schema.EnumSchema: self._gen_non_nested_to_mce_fields,
@ -142,20 +162,22 @@ class AvroToMceSchemaConverter:
@staticmethod
def _get_type_name(
avro_schema: avro.schema.Schema, logical_if_present: bool = False
avro_schema: SchemaOrField, logical_if_present: bool = False
) -> str:
logical_type_name: Optional[str] = None
if logical_if_present:
logical_type_name = getattr(
avro_schema, "logical_type", None
) or avro_schema.props.get("logicalType")
logical_type_name = cast(
Optional[str],
getattr(avro_schema, "logical_type", None)
or avro_schema.props.get("logicalType"),
)
return logical_type_name or str(
getattr(avro_schema.type, "type", avro_schema.type)
)
@staticmethod
def _get_column_type(
avro_schema: avro.schema.Schema, logical_type: Optional[str]
avro_schema: SchemaOrField, logical_type: Optional[str]
) -> SchemaFieldDataType:
type_name: str = AvroToMceSchemaConverter._get_type_name(avro_schema)
TypeClass: Optional[Type] = AvroToMceSchemaConverter.field_type_mapping.get(
@ -186,7 +208,7 @@ class AvroToMceSchemaConverter:
)
return dt
def _is_nullable(self, schema: avro.schema.Schema) -> bool:
def _is_nullable(self, schema: SchemaOrField) -> bool:
if isinstance(schema, avro.schema.Field):
return self._is_nullable(schema.type)
if isinstance(schema, avro.schema.UnionSchema):
@ -208,7 +230,7 @@ class AvroToMceSchemaConverter:
return name_or_fullname.rsplit(".", maxsplit=1)[-1]
@staticmethod
def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str:
def _get_simple_native_type(schema: SchemaOrField) -> str:
if isinstance(schema, (avro.schema.RecordSchema, avro.schema.Field)):
# For Records, fields, always return the name.
return AvroToMceSchemaConverter._strip_namespace(schema.name)
@ -226,7 +248,7 @@ class AvroToMceSchemaConverter:
return schema.type
@staticmethod
def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str:
def _get_type_annotation(schema: SchemaOrField) -> str:
simple_native_type = AvroToMceSchemaConverter._get_simple_native_type(schema)
if simple_native_type.startswith("__struct_"):
simple_native_type = "struct"
@ -238,9 +260,23 @@ class AvroToMceSchemaConverter:
return f"[type={simple_native_type}]"
@staticmethod
@overload
def _get_underlying_type_if_option_as_union(
schema: AvroNestedSchemas, default: Optional[AvroNestedSchemas] = None
) -> AvroNestedSchemas:
schema: SchemaOrField, default: SchemaOrField
) -> SchemaOrField:
...
@staticmethod
@overload
def _get_underlying_type_if_option_as_union(
schema: SchemaOrField, default: Optional[SchemaOrField] = None
) -> Optional[SchemaOrField]:
...
@staticmethod
def _get_underlying_type_if_option_as_union(
schema: SchemaOrField, default: Optional[SchemaOrField] = None
) -> Optional[SchemaOrField]:
if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2:
(first, second) = schema.schemas
if first.type == AVRO_TYPE_NULL:
@ -258,8 +294,8 @@ class AvroToMceSchemaConverter:
def __init__(
self,
schema: avro.schema.Schema,
actual_schema: avro.schema.Schema,
schema: SchemaOrField,
actual_schema: SchemaOrField,
converter: "AvroToMceSchemaConverter",
description: Optional[str] = None,
default_value: Optional[str] = None,
@ -275,7 +311,7 @@ class AvroToMceSchemaConverter:
self._converter._prefix_name_stack.append(type_annotation)
return self
def emit(self) -> Generator[SchemaField, None, None]:
def emit(self) -> Iterable[SchemaField]:
if (
not isinstance(
self._actual_schema,
@ -307,7 +343,7 @@ class AvroToMceSchemaConverter:
description = self._description
if not description and actual_schema.props.get("doc"):
description = actual_schema.props.get("doc")
description = cast(Optional[str], actual_schema.props.get("doc"))
if self._default_value is not None:
description = f"{description if description else ''}\nField default value: {self._default_value}"
@ -320,12 +356,12 @@ class AvroToMceSchemaConverter:
native_data_type = native_data_type[
slice(len(type_prefix), len(native_data_type) - 1)
]
native_data_type = actual_schema.props.get(
"native_data_type", native_data_type
native_data_type = cast(
str, actual_schema.props.get("native_data_type", native_data_type)
)
field_path = self._converter._get_cur_field_path()
merged_props = {}
merged_props: Dict[str, Any] = {}
merged_props.update(self._schema.other_props)
merged_props.update(schema.other_props)
@ -363,12 +399,13 @@ class AvroToMceSchemaConverter:
meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION)
logical_type_name: Optional[str] = (
logical_type_name: Optional[str] = cast(
Optional[str],
# logicalType nested inside type
getattr(actual_schema, "logical_type", None)
or actual_schema.props.get("logicalType")
# bare logicalType
or self._actual_schema.props.get("logicalType")
or self._actual_schema.props.get("logicalType"),
)
field = SchemaField(
@ -392,14 +429,12 @@ class AvroToMceSchemaConverter:
def __exit__(self, exc_type, exc_val, exc_tb):
self._converter._prefix_name_stack.pop()
def _get_sub_schemas(
self, schema: ExtendedAvroNestedSchemas
) -> Generator[avro.schema.Schema, None, None]:
def _get_sub_schemas(self, schema: SchemaOrField) -> Iterable[SchemaOrField]:
"""Responsible for generation for appropriate sub-schemas for every nested AVRO type."""
def gen_items_from_list_tuple_or_scalar(
val: Any,
) -> Generator[avro.schema.Schema, None, None]:
) -> Iterable[avro.schema.Schema]:
if isinstance(val, (list, tuple)):
for i in val:
yield i
@ -433,7 +468,7 @@ class AvroToMceSchemaConverter:
def _gen_nested_schema_from_field(
self,
field: avro.schema.Field,
) -> Generator[SchemaField, None, None]:
) -> Iterable[SchemaField]:
"""Handles generation of MCE SchemaFields for an AVRO Field type."""
# NOTE: Here we only manage the field stack and trigger MCE Field generation from this field's type.
# The actual emitting of a field happens when
@ -447,7 +482,7 @@ class AvroToMceSchemaConverter:
def _gen_from_last_field(
self, schema_to_recurse: Optional[AvroNestedSchemas] = None
) -> Generator[SchemaField, None, None]:
) -> Iterable[SchemaField]:
"""Emits the field most-recent field, optionally triggering sub-schema generation under the field."""
last_field_schema = self._fields_stack[-1]
# Generate the custom-description for the field.
@ -467,8 +502,8 @@ class AvroToMceSchemaConverter:
yield from self._to_mce_fields(sub_schema)
def _gen_from_non_field_nested_schemas(
self, schema: AvroNestedSchemas
) -> Generator[SchemaField, None, None]:
self, schema: SchemaOrField
) -> Iterable[SchemaField]:
"""Handles generation of MCE SchemaFields for all standard AVRO nested types."""
# Handle recursive record definitions
recurse: bool = True
@ -511,8 +546,8 @@ class AvroToMceSchemaConverter:
yield from self._to_mce_fields(sub_schema)
def _gen_non_nested_to_mce_fields(
self, schema: AvroNonNestedSchemas
) -> Generator[SchemaField, None, None]:
self, schema: SchemaOrField
) -> Iterable[SchemaField]:
"""Handles generation of MCE SchemaFields for non-nested AVRO types."""
with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
schema,
@ -521,9 +556,7 @@ class AvroToMceSchemaConverter:
) as non_nested_emitter:
yield from non_nested_emitter.emit()
def _to_mce_fields(
self, avro_schema: avro.schema.Schema
) -> Generator[SchemaField, None, None]:
def _to_mce_fields(self, avro_schema: SchemaOrField) -> Iterable[SchemaField]:
# Invoke the relevant conversion handler for the schema element type.
schema_type = (
type(avro_schema)
@ -541,7 +574,7 @@ class AvroToMceSchemaConverter:
meta_mapping_processor: Optional[OperationProcessor] = None,
schema_tags_field: Optional[str] = None,
tag_prefix: Optional[str] = None,
) -> Generator[SchemaField, None, None]:
) -> Iterable[SchemaField]:
"""
Converts a key or value type AVRO schema string to appropriate MCE SchemaFields.
:param avro_schema_string: String representation of the AVRO schema.

View File

@ -3,7 +3,7 @@ import json
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Type
from typing import Any, Dict, Iterable, List, Optional, Type, cast
import avro.schema
import confluent_kafka
@ -316,13 +316,20 @@ class KafkaSource(StatefulIngestionSourceBase):
avro_schema = avro.schema.parse(
schema_metadata.platformSchema.documentSchema
)
description = avro_schema.doc
description = getattr(avro_schema, "doc", None)
# set the tags
all_tags: List[str] = []
for tag in avro_schema.other_props.get(
self.source_config.schema_tags_field, []
):
all_tags.append(self.source_config.tag_prefix + tag)
try:
schema_tags = cast(
Iterable[str],
avro_schema.other_props.get(
self.source_config.schema_tags_field, []
),
)
for tag in schema_tags:
all_tags.append(self.source_config.tag_prefix + tag)
except TypeError:
pass
if self.source_config.enable_meta_mapping:
meta_aspects = self.meta_processor.process(avro_schema.other_props)

View File

@ -4,7 +4,7 @@ import operator
import re
import time
from functools import reduce
from typing import Any, Dict, List, Match, Optional, Union, cast
from typing import Any, Dict, List, Mapping, Match, Optional, Union, cast
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import OwnerType
@ -111,7 +111,7 @@ class OperationProcessor:
self.owner_source_type = owner_source_type
self.match_nested_props = match_nested_props
def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]:
def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]:
# Defining the following local variables -
# operations_map - the final resulting map when operations are processed.
# Against each operation the values to be applied are stored.

View File

@ -1,14 +1,14 @@
import tempfile
from typing import List, Type
import avro.schema
import pandas as pd
import ujson
from avro import schema as avro_schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet
from datahub.ingestion.source.schema_inference import csv_tsv, json, parquet
from datahub.ingestion.source.schema_inference.avro import AvroInferrer
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
NumberTypeClass,
@ -123,7 +123,7 @@ def test_infer_schema_avro():
file.seek(0)
fields = avro.AvroInferrer().infer_schema(file)
fields = AvroInferrer().infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths_avro)