478 lines
15 KiB
Python
Raw Normal View History

import uuid
from decimal import Decimal
from typing import Any, Optional
import pytest
from pydantic import ValidationError
from pyiceberg.schema import Schema
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IcebergType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.iceberg.iceberg import (
IcebergProfiler,
IcebergSource,
IcebergSourceConfig,
)
from datahub.ingestion.source.iceberg.iceberg_common import IcebergCatalogConfig
from datahub.metadata.com.linkedin.pegasus2avro.schema import ArrayType, SchemaField
from datahub.metadata.schema_classes import (
ArrayTypeClass,
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
FixedTypeClass,
NumberTypeClass,
RecordTypeClass,
StringTypeClass,
TimeTypeClass,
)
def with_iceberg_source() -> IcebergSource:
catalog: IcebergCatalogConfig = IcebergCatalogConfig(
name="test", type="rest", config={}
)
return IcebergSource(
ctx=PipelineContext(run_id="iceberg-source-test"),
config=IcebergSourceConfig(catalog=catalog),
)
def with_iceberg_profiler() -> IcebergProfiler:
iceberg_source_instance = with_iceberg_source()
return IcebergProfiler(
iceberg_source_instance.report, iceberg_source_instance.config.profiling
)
def assert_field(
schema_field: SchemaField,
expected_description: Optional[str],
expected_nullable: bool,
expected_type: Any,
) -> None:
assert (
schema_field.description == expected_description
), f"Field description '{schema_field.description}' is different from expected description '{expected_description}'"
assert (
schema_field.nullable == expected_nullable
), f"Field nullable '{schema_field.nullable}' is different from expected nullable '{expected_nullable}'"
assert isinstance(
schema_field.type.type, expected_type
), f"Field type {schema_field.type.type} is different from expected type {expected_type}"
def test_config_no_catalog():
"""
Test when no Iceberg catalog is provided.
"""
with pytest.raises(ValidationError, match="catalog"):
IcebergSourceConfig() # type: ignore
def test_config_catalog_not_configured():
"""
Test when an Iceberg catalog is provided, but not properly configured.
"""
with pytest.raises(ValidationError):
IcebergCatalogConfig() # type: ignore
with pytest.raises(ValidationError, match="conf"):
IcebergCatalogConfig(type="a type") # type: ignore
with pytest.raises(ValidationError, match="type"):
IcebergCatalogConfig(conf={}) # type: ignore
def test_config_for_tests():
"""
Test valid iceberg source that will be used in unit tests.
"""
with_iceberg_source()
@pytest.mark.parametrize(
"iceberg_type, expected_schema_field_type",
[
(BinaryType(), BytesTypeClass),
(BooleanType(), BooleanTypeClass),
(DateType(), DateTypeClass),
(
DecimalType(3, 2),
NumberTypeClass,
),
(DoubleType(), NumberTypeClass),
(FixedType(4), FixedTypeClass),
(FloatType(), NumberTypeClass),
(IntegerType(), NumberTypeClass),
(LongType(), NumberTypeClass),
(StringType(), StringTypeClass),
(
TimestampType(),
TimeTypeClass,
),
(
TimestamptzType(),
TimeTypeClass,
),
(TimeType(), TimeTypeClass),
(
UUIDType(),
StringTypeClass,
),
],
)
def test_iceberg_primitive_type_to_schema_field(
iceberg_type: PrimitiveType, expected_schema_field_type: Any
) -> None:
"""
Test converting a primitive typed Iceberg field to a SchemaField
"""
iceberg_source_instance = with_iceberg_source()
for column in [
NestedField(
1, "required_field", iceberg_type, True, "required field documentation"
),
NestedField(
1, "optional_field", iceberg_type, False, "optional field documentation"
),
]:
schema = Schema(column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
assert (
len(schema_fields) == 1
), f"Expected 1 field, but got {len(schema_fields)}"
assert_field(
schema_fields[0],
column.doc,
column.optional,
expected_schema_field_type,
)
@pytest.mark.parametrize(
"iceberg_type, expected_array_nested_type",
[
(BinaryType(), "bytes"),
(BooleanType(), "boolean"),
(DateType(), "date"),
(
DecimalType(3, 2),
"decimal",
),
(DoubleType(), "double"),
(FixedType(4), "fixed"),
(FloatType(), "float"),
(IntegerType(), "int"),
(LongType(), "long"),
(StringType(), "string"),
(
TimestampType(),
"timestamp-micros",
),
(
TimestamptzType(),
"timestamp-micros",
),
(TimeType(), "time-micros"),
(
UUIDType(),
"uuid",
),
],
)
def test_iceberg_list_to_schema_field(
iceberg_type: PrimitiveType, expected_array_nested_type: Any
) -> None:
"""
Test converting a list typed Iceberg field to an ArrayType SchemaField, including the list nested type.
"""
for list_column in [
NestedField(
1,
"listField",
ListType(2, iceberg_type, True),
True,
"required field, required element documentation",
),
NestedField(
1,
"listField",
ListType(2, iceberg_type, False),
True,
"required field, optional element documentation",
),
NestedField(
1,
"listField",
ListType(2, iceberg_type, True),
False,
"optional field, required element documentation",
),
NestedField(
1,
"listField",
ListType(2, iceberg_type, False),
False,
"optional field, optional element documentation",
),
]:
iceberg_source_instance = with_iceberg_source()
schema = Schema(list_column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
assert (
len(schema_fields) == 1
), f"Expected 1 field, but got {len(schema_fields)}"
assert_field(
schema_fields[0], list_column.doc, list_column.optional, ArrayTypeClass
)
assert isinstance(
schema_fields[0].type.type, ArrayType
), f"Field type {schema_fields[0].type.type} was expected to be {ArrayType}"
arrayType: ArrayType = schema_fields[0].type.type
assert arrayType.nestedType == [
expected_array_nested_type
], f"List Field nested type {arrayType.nestedType} was expected to be {expected_array_nested_type}"
@pytest.mark.parametrize(
"iceberg_type, expected_map_type",
[
(BinaryType(), BytesTypeClass),
(BooleanType(), BooleanTypeClass),
(DateType(), DateTypeClass),
(
DecimalType(3, 2),
NumberTypeClass,
),
(DoubleType(), NumberTypeClass),
(FixedType(4), FixedTypeClass),
(FloatType(), NumberTypeClass),
(IntegerType(), NumberTypeClass),
(LongType(), NumberTypeClass),
(StringType(), StringTypeClass),
(
TimestampType(),
TimeTypeClass,
),
(
TimestamptzType(),
TimeTypeClass,
),
(TimeType(), TimeTypeClass),
(
UUIDType(),
StringTypeClass,
),
],
)
def test_iceberg_map_to_schema_field(
iceberg_type: PrimitiveType, expected_map_type: Any
) -> None:
"""
Test converting a map typed Iceberg field to a MapType SchemaField, where the key is the same type as the value.
"""
for map_column in [
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, True),
True,
"required field, required value documentation",
),
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, False),
True,
"required field, optional value documentation",
),
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, True),
False,
"optional field, required value documentation",
),
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, False),
False,
"optional field, optional value documentation",
),
]:
iceberg_source_instance = with_iceberg_source()
schema = Schema(map_column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
# Converting an Iceberg Map type will be done by creating an array of struct(key, value) records.
# The first field will be the array.
assert (
len(schema_fields) == 3
), f"Expected 3 fields, but got {len(schema_fields)}"
assert_field(
schema_fields[0], map_column.doc, map_column.optional, ArrayTypeClass
)
# The second field will be the key type
assert_field(schema_fields[1], None, False, expected_map_type)
# The third field will be the value type
assert_field(
schema_fields[2],
None,
not map_column.field_type.value_required,
expected_map_type,
)
@pytest.mark.parametrize(
"iceberg_type, expected_schema_field_type",
[
(BinaryType(), BytesTypeClass),
(BooleanType(), BooleanTypeClass),
(DateType(), DateTypeClass),
(
DecimalType(3, 2),
NumberTypeClass,
),
(DoubleType(), NumberTypeClass),
(FixedType(4), FixedTypeClass),
(FloatType(), NumberTypeClass),
(IntegerType(), NumberTypeClass),
(LongType(), NumberTypeClass),
(StringType(), StringTypeClass),
(
TimestampType(),
TimeTypeClass,
),
(
TimestamptzType(),
TimeTypeClass,
),
(TimeType(), TimeTypeClass),
(
UUIDType(),
StringTypeClass,
),
],
)
def test_iceberg_struct_to_schema_field(
iceberg_type: PrimitiveType, expected_schema_field_type: Any
) -> None:
"""
Test converting a struct typed Iceberg field to a RecordType SchemaField.
"""
field1 = NestedField(11, "field1", iceberg_type, True, "field documentation")
struct_column = NestedField(
1, "structField", StructType(field1), True, "struct documentation"
)
iceberg_source_instance = with_iceberg_source()
schema = Schema(struct_column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
assert len(schema_fields) == 2, f"Expected 2 fields, but got {len(schema_fields)}"
assert_field(
schema_fields[0], struct_column.doc, struct_column.optional, RecordTypeClass
)
assert_field(
schema_fields[1], field1.doc, field1.optional, expected_schema_field_type
)
@pytest.mark.parametrize(
"value_type, value, expected_value",
[
(BinaryType(), bytes([1, 2, 3, 4, 5]), "b'\\x01\\x02\\x03\\x04\\x05'"),
(BooleanType(), True, "True"),
(DateType(), 19543, "2023-07-05"),
(DecimalType(3, 2), Decimal((0, (3, 1, 4), -2)), "3.14"),
(DoubleType(), 3.4, "3.4"),
(FixedType(4), bytes([1, 2, 3, 4]), "b'\\x01\\x02\\x03\\x04'"),
(FloatType(), 3.4, "3.4"),
(IntegerType(), 3, "3"),
(LongType(), 4294967295000, "4294967295000"),
(StringType(), "a string", "a string"),
(
TimestampType(),
1688559488157000,
"2023-07-05T12:18:08.157000",
),
(
TimestamptzType(),
1688559488157000,
"2023-07-05T12:18:08.157000+00:00",
),
(TimeType(), 40400000000, "11:13:20"),
(
UUIDType(),
uuid.UUID("00010203-0405-0607-0809-0a0b0c0d0e0f"),
"00010203-0405-0607-0809-0a0b0c0d0e0f",
),
],
)
def test_iceberg_profiler_value_render(
value_type: IcebergType, value: Any, expected_value: Optional[str]
) -> None:
iceberg_profiler_instance = with_iceberg_profiler()
assert (
iceberg_profiler_instance._render_value("a.dataset", value_type, value)
== expected_value
)
def test_avro_decimal_bytes_nullable() -> None:
"""
The following test exposes a problem with decimal (bytes) not preserving extra attributes like _nullable. Decimal (fixed) and Boolean for example do.
NOTE: This bug was by-passed by mapping the Decimal type to fixed instead of bytes.
"""
import avro.schema
decimal_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "bytes", "precision": 3, "scale": 2, "logicalType": "decimal", "native_data_type": "decimal(3, 2)", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}"""
decimal_avro_schema = avro.schema.parse(decimal_avro_schema_string)
print("\nDecimal (bytes)")
print(
f"Original avro schema string: {decimal_avro_schema_string}"
)
print(f"After avro parsing, _nullable attribute is missing: {decimal_avro_schema}")
decimal_fixed_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "fixed", "logicalType": "decimal", "precision": 3, "scale": 2, "native_data_type": "decimal(3, 2)", "_nullable": false, "name": "bogusName", "size": 16}, "name": "required_field", "doc": "required field documentation"}]}"""
decimal_fixed_avro_schema = avro.schema.parse(decimal_fixed_avro_schema_string)
print("\nDecimal (fixed)")
print(
f"Original avro schema string: {decimal_fixed_avro_schema_string}"
)
print(
f"After avro parsing, _nullable attribute is preserved: {decimal_fixed_avro_schema}"
)
boolean_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "boolean", "native_data_type": "boolean", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}"""
boolean_avro_schema = avro.schema.parse(boolean_avro_schema_string)
print("\nBoolean")
print(
f"Original avro schema string: {boolean_avro_schema_string}"
)
print(
f"After avro parsing, _nullable attribute is preserved: {boolean_avro_schema}"
)