fix(iceberg): Change how MapType are mapped to Avro to support complex Map key type. (#5060)

This commit is contained in:
cccs-eric 2022-06-08 22:43:10 -04:00 committed by GitHub
parent da273802be
commit f25a5f043d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 51 deletions

View File

@ -313,16 +313,30 @@ def _parse_datatype(type: IcebergTypes.Type, nullable: bool = False) -> Dict[str
"_nullable": nullable,
}
elif type.is_map_type():
# The Iceberg Map type will be handled differently. The idea is to translate the map
# similar to the Map.Entry struct of Java i.e. as an array of map_entry struct, where
# the map_entry struct has a key field and a value field. The key and value type can
# be complex or primitive types.
map_type: IcebergTypes.MapType = type
kt = _parse_datatype(map_type.key_type())
vt = _parse_datatype(map_type.value_type())
# keys are assumed to be strings in avro map
map_entry: Dict[str, Any] = {
"type": "record",
"name": _gen_name("__map_entry_"),
"fields": [
{
"name": "key",
"type": _parse_datatype(map_type.key_type(), False),
},
{
"name": "value",
"type": _parse_datatype(map_type.value_type(), True),
},
],
}
return {
"type": "map",
"values": vt,
"native_data_type": str(map_type),
"key_type": kt,
"key_native_data_type": repr(map_type.key_type()),
"type": "array",
"items": map_entry,
"native_data_type": str(type),
"_nullable": nullable,
}
elif type.is_struct_type():
structType: IcebergTypes.StructType = type
@ -340,7 +354,7 @@ def _parse_struct_fields(parts: Tuple[NestedField], nullable: bool) -> Dict[str,
fields.append({"name": field_name, "type": field_type, "doc": nested_field.doc})
return {
"type": "record",
"name": "__struct_{}".format(str(uuid.uuid4()).replace("-", "")),
"name": _gen_name("__struct_"),
"fields": fields,
"native_data_type": "struct<{}>".format(parts),
"_nullable": nullable,
@ -367,7 +381,7 @@ def _parse_basic_datatype(
fixed_type: IcebergTypes.FixedType = type
return {
"type": "fixed",
"name": "name", # TODO: Pass-in field name since it is required by Avro spec
"name": _gen_name("__fixed_"),
"size": fixed_type.length,
"native_data_type": repr(fixed_type),
"_nullable": nullable,
@ -380,7 +394,9 @@ def _parse_basic_datatype(
return {
# "type": "bytes", # when using bytes, avro drops _nullable attribute and others. See unit test.
"type": "fixed", # to fix avro bug ^ resolved by using a fixed type
"name": "bogus", # to fix avro bug ^ resolved by using a fixed type
"name": _gen_name(
"__fixed_"
), # to fix avro bug ^ resolved by using a fixed type
"size": 1, # to fix avro bug ^ resolved by using a fixed type
"logicalType": "decimal",
"precision": decimal_type.precision,
@ -431,3 +447,7 @@ def _parse_basic_datatype(
}
return {"type": "null", "native_data_type": repr(type)}
def _gen_name(prefix: str) -> str:
return f"{prefix}{str(uuid.uuid4()).replace('-', '')}"

View File

@ -4,7 +4,7 @@ import pytest
if sys.version_info < (3, 7):
pytest.skip("iceberg not available for python < 3.7", allow_module_level=True)
from typing import Any
from typing import Any, Optional
from iceberg.api import types as IcebergTypes
from iceberg.api.types.types import NestedField
@ -13,18 +13,13 @@ from datahub.configuration.common import ConfigurationError
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.azure.azure_common import AdlsSourceConfig
from datahub.ingestion.source.iceberg.iceberg import IcebergSource, IcebergSourceConfig
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayType,
MapType,
SchemaField,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import ArrayType, SchemaField
from datahub.metadata.schema_classes import (
ArrayTypeClass,
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
FixedTypeClass,
MapTypeClass,
NumberTypeClass,
RecordTypeClass,
StringTypeClass,
@ -44,7 +39,7 @@ def with_iceberg_source() -> IcebergSource:
def assert_field(
schema_field: SchemaField,
expected_description: str,
expected_description: Optional[str],
expected_nullable: bool,
expected_type: Any,
) -> None:
@ -267,64 +262,62 @@ def test_iceberg_list_to_schema_field(
@pytest.mark.parametrize(
"iceberg_type, expected_map_value_type",
"iceberg_type, expected_map_type",
[
(IcebergTypes.BinaryType.get(), "bytes"),
(IcebergTypes.BooleanType.get(), "boolean"),
(IcebergTypes.DateType.get(), "date"),
(IcebergTypes.BinaryType.get(), BytesTypeClass),
(IcebergTypes.BooleanType.get(), BooleanTypeClass),
(IcebergTypes.DateType.get(), DateTypeClass),
(
IcebergTypes.DecimalType.of(3, 2),
"decimal",
NumberTypeClass,
),
(IcebergTypes.DoubleType.get(), "double"),
(IcebergTypes.FixedType.of_length(4), "fixed"),
(IcebergTypes.FloatType.get(), "float"),
(IcebergTypes.IntegerType.get(), "int"),
(IcebergTypes.LongType.get(), "long"),
(IcebergTypes.StringType.get(), "string"),
(IcebergTypes.DoubleType.get(), NumberTypeClass),
(IcebergTypes.FixedType.of_length(4), FixedTypeClass),
(IcebergTypes.FloatType.get(), NumberTypeClass),
(IcebergTypes.IntegerType.get(), NumberTypeClass),
(IcebergTypes.LongType.get(), NumberTypeClass),
(IcebergTypes.StringType.get(), StringTypeClass),
(
IcebergTypes.TimestampType.with_timezone(),
"timestamp-micros",
TimeTypeClass,
),
(
IcebergTypes.TimestampType.without_timezone(),
"timestamp-micros",
TimeTypeClass,
),
(IcebergTypes.TimeType.get(), "time-micros"),
(IcebergTypes.TimeType.get(), TimeTypeClass),
(
IcebergTypes.UUIDType.get(),
"uuid",
StringTypeClass,
),
],
)
def test_iceberg_map_to_schema_field(
iceberg_type: IcebergTypes.PrimitiveType, expected_map_value_type: Any
iceberg_type: IcebergTypes.PrimitiveType, expected_map_type: Any
) -> None:
"""
Test converting a map typed Iceberg field to a MapType SchemaField, including the map value type.
Test converting a map typed Iceberg field to a MapType SchemaField, where the key is the same type as the value.
"""
map_column: NestedField = NestedField.required(
1,
"mapField",
IcebergTypes.MapType.of_required(
11, 12, IcebergTypes.StringType.get(), iceberg_type
),
IcebergTypes.MapType.of_required(11, 12, iceberg_type, iceberg_type),
"documentation",
)
iceberg_source_instance = with_iceberg_source()
schema_fields = iceberg_source_instance._get_schema_fields_for_column(map_column)
assert len(schema_fields) == 1, f"Expected 1 field, but got {len(schema_fields)}"
assert_field(schema_fields[0], map_column.doc, map_column.is_optional, MapTypeClass)
assert isinstance(
schema_fields[0].type.type, MapType
), f"Field type {schema_fields[0].type.type} was expected to be {MapType}"
mapType: MapType = schema_fields[0].type.type
assert (
mapType.keyType == "string"
), f"Map key type {mapType.keyType} should always be a string"
assert (
mapType.valueType == expected_map_value_type
), f"Map value type {mapType.valueType} was expected to be {expected_map_value_type}"
# Converting an Iceberg Map type will be done by creating an array of struct(key, value) records.
# The first field will be the array.
assert len(schema_fields) == 3, f"Expected 3 fields, but got {len(schema_fields)}"
assert_field(
schema_fields[0], map_column.doc, map_column.is_optional, ArrayTypeClass
)
# The second field will be the key type
assert_field(schema_fields[1], None, False, expected_map_type)
# The third field will be the value type
assert_field(schema_fields[2], None, True, expected_map_type)
@pytest.mark.parametrize(