mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 15:50:14 +00:00
1239 lines
47 KiB
Python
1239 lines
47 KiB
Python
import uuid
|
|
from collections import defaultdict
|
|
from decimal import Decimal
|
|
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
|
|
from unittest import TestCase
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from pydantic import ValidationError
|
|
from pyiceberg.exceptions import (
|
|
NoSuchIcebergTableError,
|
|
NoSuchNamespaceError,
|
|
NoSuchPropertyException,
|
|
NoSuchTableError,
|
|
ServerError,
|
|
)
|
|
from pyiceberg.io.pyarrow import PyArrowFileIO
|
|
from pyiceberg.partitioning import PartitionSpec
|
|
from pyiceberg.schema import Schema
|
|
from pyiceberg.table import Table
|
|
from pyiceberg.table.metadata import TableMetadataV2
|
|
from pyiceberg.types import (
|
|
BinaryType,
|
|
BooleanType,
|
|
DateType,
|
|
DecimalType,
|
|
DoubleType,
|
|
FixedType,
|
|
FloatType,
|
|
IcebergType,
|
|
IntegerType,
|
|
ListType,
|
|
LongType,
|
|
MapType,
|
|
NestedField,
|
|
PrimitiveType,
|
|
StringType,
|
|
StructType,
|
|
TimestampType,
|
|
TimestamptzType,
|
|
TimeType,
|
|
UUIDType,
|
|
)
|
|
|
|
from datahub.configuration.common import AllowDenyPattern
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
|
from datahub.ingestion.source.iceberg.iceberg import (
|
|
IcebergProfiler,
|
|
IcebergSource,
|
|
IcebergSourceConfig,
|
|
)
|
|
from datahub.metadata.com.linkedin.pegasus2avro.schema import ArrayType, SchemaField
|
|
from datahub.metadata.schema_classes import (
|
|
ArrayTypeClass,
|
|
BooleanTypeClass,
|
|
BytesTypeClass,
|
|
DateTypeClass,
|
|
FixedTypeClass,
|
|
NumberTypeClass,
|
|
RecordTypeClass,
|
|
StringTypeClass,
|
|
TimeTypeClass,
|
|
)
|
|
|
|
MCPS_PER_TABLE = 7 # assuming no profiling
|
|
MCPS_PER_NAMESPACE = 5
|
|
|
|
|
|
def with_iceberg_source(processing_threads: int = 1, **kwargs: Any) -> IcebergSource:
|
|
catalog = {"test": {"type": "rest"}}
|
|
config = IcebergSourceConfig(
|
|
catalog=catalog, processing_threads=processing_threads, **kwargs
|
|
)
|
|
return IcebergSource(
|
|
ctx=PipelineContext(run_id="iceberg-source-test"),
|
|
config=config,
|
|
)
|
|
|
|
|
|
def with_iceberg_profiler() -> IcebergProfiler:
|
|
iceberg_source_instance = with_iceberg_source()
|
|
return IcebergProfiler(
|
|
iceberg_source_instance.report, iceberg_source_instance.config.profiling
|
|
)
|
|
|
|
|
|
def assert_field(
|
|
schema_field: SchemaField,
|
|
expected_description: Optional[str],
|
|
expected_nullable: bool,
|
|
expected_type: Any,
|
|
) -> None:
|
|
assert schema_field.description == expected_description, (
|
|
f"Field description '{schema_field.description}' is different from expected description '{expected_description}'"
|
|
)
|
|
assert schema_field.nullable == expected_nullable, (
|
|
f"Field nullable '{schema_field.nullable}' is different from expected nullable '{expected_nullable}'"
|
|
)
|
|
assert isinstance(schema_field.type.type, expected_type), (
|
|
f"Field type {schema_field.type.type} is different from expected type {expected_type}"
|
|
)
|
|
|
|
|
|
def test_config_no_catalog():
|
|
"""
|
|
Test when no Iceberg catalog is provided.
|
|
"""
|
|
with pytest.raises(ValidationError, match="catalog"):
|
|
IcebergSourceConfig() # type: ignore
|
|
|
|
|
|
def test_config_catalog_not_configured():
|
|
"""
|
|
Test when an Iceberg catalog is provided, but not properly configured.
|
|
"""
|
|
# When no catalog configurationis provided, the config should be invalid
|
|
with pytest.raises(ValidationError, match="type"):
|
|
IcebergSourceConfig(catalog={}) # type: ignore
|
|
|
|
# When a catalog name is provided without configuration, the config should be invalid
|
|
with pytest.raises(ValidationError):
|
|
IcebergSourceConfig(catalog={"test": {}})
|
|
|
|
|
|
def test_config_deprecated_catalog_configuration():
|
|
"""
|
|
Test when a deprecated Iceberg catalog configuration is provided, it should be converted to the current scheme.
|
|
"""
|
|
deprecated_config = {
|
|
"name": "test",
|
|
"type": "rest",
|
|
"config": {"uri": "http://a.uri.test", "another_prop": "another_value"},
|
|
}
|
|
migrated_config = IcebergSourceConfig(catalog=deprecated_config)
|
|
assert migrated_config.catalog["test"] is not None
|
|
assert migrated_config.catalog["test"]["type"] == "rest"
|
|
assert migrated_config.catalog["test"]["uri"] == "http://a.uri.test"
|
|
assert migrated_config.catalog["test"]["another_prop"] == "another_value"
|
|
|
|
|
|
def test_config_for_tests():
|
|
"""
|
|
Test valid iceberg source that will be used in unit tests.
|
|
"""
|
|
with_iceberg_source()
|
|
|
|
|
|
def test_config_support_nested_dicts():
|
|
"""
|
|
Test that Iceberg source supports nested dictionaries inside its configuration, as allowed by pyiceberg.
|
|
"""
|
|
catalog = {
|
|
"test": {
|
|
"type": "rest",
|
|
"nested_dict": {
|
|
"nested_key": "nested_value",
|
|
"nested_array": ["a1", "a2"],
|
|
"subnested_dict": {"subnested_key": "subnested_value"},
|
|
},
|
|
}
|
|
}
|
|
test_config = IcebergSourceConfig(catalog=catalog)
|
|
assert isinstance(test_config.catalog["test"]["nested_dict"], Dict)
|
|
assert test_config.catalog["test"]["nested_dict"]["nested_key"] == "nested_value"
|
|
assert isinstance(test_config.catalog["test"]["nested_dict"]["nested_array"], List)
|
|
assert test_config.catalog["test"]["nested_dict"]["nested_array"][0] == "a1"
|
|
assert isinstance(
|
|
test_config.catalog["test"]["nested_dict"]["subnested_dict"], Dict
|
|
)
|
|
assert (
|
|
test_config.catalog["test"]["nested_dict"]["subnested_dict"]["subnested_key"]
|
|
== "subnested_value"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"iceberg_type, expected_schema_field_type",
|
|
[
|
|
(BinaryType(), BytesTypeClass),
|
|
(BooleanType(), BooleanTypeClass),
|
|
(DateType(), DateTypeClass),
|
|
(
|
|
DecimalType(3, 2),
|
|
NumberTypeClass,
|
|
),
|
|
(DoubleType(), NumberTypeClass),
|
|
(FixedType(4), FixedTypeClass),
|
|
(FloatType(), NumberTypeClass),
|
|
(IntegerType(), NumberTypeClass),
|
|
(LongType(), NumberTypeClass),
|
|
(StringType(), StringTypeClass),
|
|
(
|
|
TimestampType(),
|
|
TimeTypeClass,
|
|
),
|
|
(
|
|
TimestamptzType(),
|
|
TimeTypeClass,
|
|
),
|
|
(TimeType(), TimeTypeClass),
|
|
(
|
|
UUIDType(),
|
|
StringTypeClass,
|
|
),
|
|
],
|
|
)
|
|
def test_iceberg_primitive_type_to_schema_field(
|
|
iceberg_type: PrimitiveType, expected_schema_field_type: Any
|
|
) -> None:
|
|
"""
|
|
Test converting a primitive typed Iceberg field to a SchemaField
|
|
"""
|
|
iceberg_source_instance = with_iceberg_source()
|
|
for column in [
|
|
NestedField(
|
|
1, "required_field", iceberg_type, True, "required field documentation"
|
|
),
|
|
NestedField(
|
|
1, "optional_field", iceberg_type, False, "optional field documentation"
|
|
),
|
|
]:
|
|
schema = Schema(column)
|
|
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
|
|
assert len(schema_fields) == 1, (
|
|
f"Expected 1 field, but got {len(schema_fields)}"
|
|
)
|
|
assert_field(
|
|
schema_fields[0],
|
|
column.doc,
|
|
column.optional,
|
|
expected_schema_field_type,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"iceberg_type, expected_array_nested_type",
|
|
[
|
|
(BinaryType(), "bytes"),
|
|
(BooleanType(), "boolean"),
|
|
(DateType(), "date"),
|
|
(
|
|
DecimalType(3, 2),
|
|
"decimal",
|
|
),
|
|
(DoubleType(), "double"),
|
|
(FixedType(4), "fixed"),
|
|
(FloatType(), "float"),
|
|
(IntegerType(), "int"),
|
|
(LongType(), "long"),
|
|
(StringType(), "string"),
|
|
(
|
|
TimestampType(),
|
|
"timestamp-micros",
|
|
),
|
|
(
|
|
TimestamptzType(),
|
|
"timestamp-micros",
|
|
),
|
|
(TimeType(), "time-micros"),
|
|
(
|
|
UUIDType(),
|
|
"uuid",
|
|
),
|
|
],
|
|
)
|
|
def test_iceberg_list_to_schema_field(
|
|
iceberg_type: PrimitiveType, expected_array_nested_type: Any
|
|
) -> None:
|
|
"""
|
|
Test converting a list typed Iceberg field to an ArrayType SchemaField, including the list nested type.
|
|
"""
|
|
for list_column in [
|
|
NestedField(
|
|
1,
|
|
"listField",
|
|
ListType(2, iceberg_type, True),
|
|
True,
|
|
"required field, required element documentation",
|
|
),
|
|
NestedField(
|
|
1,
|
|
"listField",
|
|
ListType(2, iceberg_type, False),
|
|
True,
|
|
"required field, optional element documentation",
|
|
),
|
|
NestedField(
|
|
1,
|
|
"listField",
|
|
ListType(2, iceberg_type, True),
|
|
False,
|
|
"optional field, required element documentation",
|
|
),
|
|
NestedField(
|
|
1,
|
|
"listField",
|
|
ListType(2, iceberg_type, False),
|
|
False,
|
|
"optional field, optional element documentation",
|
|
),
|
|
]:
|
|
iceberg_source_instance = with_iceberg_source()
|
|
schema = Schema(list_column)
|
|
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
|
|
assert len(schema_fields) == 1, (
|
|
f"Expected 1 field, but got {len(schema_fields)}"
|
|
)
|
|
assert_field(
|
|
schema_fields[0], list_column.doc, list_column.optional, ArrayTypeClass
|
|
)
|
|
assert isinstance(schema_fields[0].type.type, ArrayType), (
|
|
f"Field type {schema_fields[0].type.type} was expected to be {ArrayType}"
|
|
)
|
|
arrayType: ArrayType = schema_fields[0].type.type
|
|
assert arrayType.nestedType == [expected_array_nested_type], (
|
|
f"List Field nested type {arrayType.nestedType} was expected to be {expected_array_nested_type}"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"iceberg_type, expected_map_type",
|
|
[
|
|
(BinaryType(), BytesTypeClass),
|
|
(BooleanType(), BooleanTypeClass),
|
|
(DateType(), DateTypeClass),
|
|
(
|
|
DecimalType(3, 2),
|
|
NumberTypeClass,
|
|
),
|
|
(DoubleType(), NumberTypeClass),
|
|
(FixedType(4), FixedTypeClass),
|
|
(FloatType(), NumberTypeClass),
|
|
(IntegerType(), NumberTypeClass),
|
|
(LongType(), NumberTypeClass),
|
|
(StringType(), StringTypeClass),
|
|
(
|
|
TimestampType(),
|
|
TimeTypeClass,
|
|
),
|
|
(
|
|
TimestamptzType(),
|
|
TimeTypeClass,
|
|
),
|
|
(TimeType(), TimeTypeClass),
|
|
(
|
|
UUIDType(),
|
|
StringTypeClass,
|
|
),
|
|
],
|
|
)
|
|
def test_iceberg_map_to_schema_field(
|
|
iceberg_type: PrimitiveType, expected_map_type: Any
|
|
) -> None:
|
|
"""
|
|
Test converting a map typed Iceberg field to a MapType SchemaField, where the key is the same type as the value.
|
|
"""
|
|
for map_column in [
|
|
NestedField(
|
|
1,
|
|
"mapField",
|
|
MapType(11, iceberg_type, 12, iceberg_type, True),
|
|
True,
|
|
"required field, required value documentation",
|
|
),
|
|
NestedField(
|
|
1,
|
|
"mapField",
|
|
MapType(11, iceberg_type, 12, iceberg_type, False),
|
|
True,
|
|
"required field, optional value documentation",
|
|
),
|
|
NestedField(
|
|
1,
|
|
"mapField",
|
|
MapType(11, iceberg_type, 12, iceberg_type, True),
|
|
False,
|
|
"optional field, required value documentation",
|
|
),
|
|
NestedField(
|
|
1,
|
|
"mapField",
|
|
MapType(11, iceberg_type, 12, iceberg_type, False),
|
|
False,
|
|
"optional field, optional value documentation",
|
|
),
|
|
]:
|
|
iceberg_source_instance = with_iceberg_source()
|
|
schema = Schema(map_column)
|
|
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
|
|
# Converting an Iceberg Map type will be done by creating an array of struct(key, value) records.
|
|
# The first field will be the array.
|
|
assert len(schema_fields) == 3, (
|
|
f"Expected 3 fields, but got {len(schema_fields)}"
|
|
)
|
|
assert_field(
|
|
schema_fields[0], map_column.doc, map_column.optional, ArrayTypeClass
|
|
)
|
|
|
|
# The second field will be the key type
|
|
assert_field(schema_fields[1], None, False, expected_map_type)
|
|
|
|
# The third field will be the value type
|
|
assert_field(
|
|
schema_fields[2],
|
|
None,
|
|
not map_column.field_type.value_required,
|
|
expected_map_type,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"iceberg_type, expected_schema_field_type",
|
|
[
|
|
(BinaryType(), BytesTypeClass),
|
|
(BooleanType(), BooleanTypeClass),
|
|
(DateType(), DateTypeClass),
|
|
(
|
|
DecimalType(3, 2),
|
|
NumberTypeClass,
|
|
),
|
|
(DoubleType(), NumberTypeClass),
|
|
(FixedType(4), FixedTypeClass),
|
|
(FloatType(), NumberTypeClass),
|
|
(IntegerType(), NumberTypeClass),
|
|
(LongType(), NumberTypeClass),
|
|
(StringType(), StringTypeClass),
|
|
(
|
|
TimestampType(),
|
|
TimeTypeClass,
|
|
),
|
|
(
|
|
TimestamptzType(),
|
|
TimeTypeClass,
|
|
),
|
|
(TimeType(), TimeTypeClass),
|
|
(
|
|
UUIDType(),
|
|
StringTypeClass,
|
|
),
|
|
],
|
|
)
|
|
def test_iceberg_struct_to_schema_field(
|
|
iceberg_type: PrimitiveType, expected_schema_field_type: Any
|
|
) -> None:
|
|
"""
|
|
Test converting a struct typed Iceberg field to a RecordType SchemaField.
|
|
"""
|
|
field1 = NestedField(11, "field1", iceberg_type, True, "field documentation")
|
|
struct_column = NestedField(
|
|
1, "structField", StructType(field1), True, "struct documentation"
|
|
)
|
|
iceberg_source_instance = with_iceberg_source()
|
|
schema = Schema(struct_column)
|
|
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
|
|
assert len(schema_fields) == 2, f"Expected 2 fields, but got {len(schema_fields)}"
|
|
assert_field(
|
|
schema_fields[0], struct_column.doc, struct_column.optional, RecordTypeClass
|
|
)
|
|
assert_field(
|
|
schema_fields[1], field1.doc, field1.optional, expected_schema_field_type
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"value_type, value, expected_value",
|
|
[
|
|
(BinaryType(), bytes([1, 2, 3, 4, 5]), "b'\\x01\\x02\\x03\\x04\\x05'"),
|
|
(BooleanType(), True, "True"),
|
|
(DateType(), 19543, "2023-07-05"),
|
|
(DecimalType(3, 2), Decimal((0, (3, 1, 4), -2)), "3.14"),
|
|
(DoubleType(), 3.4, "3.4"),
|
|
(FixedType(4), bytes([1, 2, 3, 4]), "b'\\x01\\x02\\x03\\x04'"),
|
|
(FloatType(), 3.4, "3.4"),
|
|
(IntegerType(), 3, "3"),
|
|
(LongType(), 4294967295000, "4294967295000"),
|
|
(StringType(), "a string", "a string"),
|
|
(
|
|
TimestampType(),
|
|
1688559488157000,
|
|
"2023-07-05T12:18:08.157000",
|
|
),
|
|
(
|
|
TimestamptzType(),
|
|
1688559488157000,
|
|
"2023-07-05T12:18:08.157000+00:00",
|
|
),
|
|
(TimeType(), 40400000000, "11:13:20"),
|
|
(
|
|
UUIDType(),
|
|
uuid.UUID("00010203-0405-0607-0809-0a0b0c0d0e0f"),
|
|
"00010203-0405-0607-0809-0a0b0c0d0e0f",
|
|
),
|
|
],
|
|
)
|
|
def test_iceberg_profiler_value_render(
|
|
value_type: IcebergType, value: Any, expected_value: Optional[str]
|
|
) -> None:
|
|
iceberg_profiler_instance = with_iceberg_profiler()
|
|
assert (
|
|
iceberg_profiler_instance._render_value("a.dataset", value_type, value)
|
|
== expected_value
|
|
)
|
|
|
|
|
|
def test_avro_decimal_bytes_nullable() -> None:
|
|
"""
|
|
The following test exposes a problem with decimal (bytes) not preserving extra attributes like _nullable. Decimal (fixed) and Boolean for example do.
|
|
NOTE: This bug was by-passed by mapping the Decimal type to fixed instead of bytes.
|
|
"""
|
|
import avro.schema
|
|
|
|
decimal_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "bytes", "precision": 3, "scale": 2, "logicalType": "decimal", "native_data_type": "decimal(3, 2)", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}"""
|
|
decimal_avro_schema = avro.schema.parse(decimal_avro_schema_string)
|
|
print("\nDecimal (bytes)")
|
|
print(
|
|
f"Original avro schema string: {decimal_avro_schema_string}"
|
|
)
|
|
print(f"After avro parsing, _nullable attribute is missing: {decimal_avro_schema}")
|
|
|
|
decimal_fixed_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "fixed", "logicalType": "decimal", "precision": 3, "scale": 2, "native_data_type": "decimal(3, 2)", "_nullable": false, "name": "bogusName", "size": 16}, "name": "required_field", "doc": "required field documentation"}]}"""
|
|
decimal_fixed_avro_schema = avro.schema.parse(decimal_fixed_avro_schema_string)
|
|
print("\nDecimal (fixed)")
|
|
print(
|
|
f"Original avro schema string: {decimal_fixed_avro_schema_string}"
|
|
)
|
|
print(
|
|
f"After avro parsing, _nullable attribute is preserved: {decimal_fixed_avro_schema}"
|
|
)
|
|
|
|
boolean_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "boolean", "native_data_type": "boolean", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}"""
|
|
boolean_avro_schema = avro.schema.parse(boolean_avro_schema_string)
|
|
print("\nBoolean")
|
|
print(
|
|
f"Original avro schema string: {boolean_avro_schema_string}"
|
|
)
|
|
print(
|
|
f"After avro parsing, _nullable attribute is preserved: {boolean_avro_schema}"
|
|
)
|
|
|
|
|
|
class MockCatalog:
|
|
def __init__(
|
|
self,
|
|
tables: Dict[str, Dict[str, Callable[[], Table]]],
|
|
namespace_properties: Optional[Dict[str, Dict[str, str]]] = None,
|
|
):
|
|
"""
|
|
|
|
:param tables: Dictionary containing namespaces as keys and dictionaries containing names of tables (keys) and
|
|
their metadata as values
|
|
"""
|
|
self.tables = tables
|
|
self.namespace_properties = (
|
|
namespace_properties if namespace_properties else defaultdict(dict)
|
|
)
|
|
|
|
def list_namespaces(self) -> Iterable[Tuple[str]]:
|
|
return [*[(key,) for key in self.tables]]
|
|
|
|
def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]:
|
|
return [(namespace[0], table) for table in self.tables[namespace[0]]]
|
|
|
|
def load_table(self, dataset_path: Tuple[str, str]) -> Table:
|
|
return self.tables[dataset_path[0]][dataset_path[1]]()
|
|
|
|
def load_namespace_properties(self, namespace: Tuple[str, ...]) -> Dict[str, str]:
|
|
return self.namespace_properties[namespace[0]]
|
|
|
|
|
|
class MockCatalogExceptionListingTables(MockCatalog):
|
|
def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]:
|
|
if namespace == ("no_such_namespace",):
|
|
raise NoSuchNamespaceError()
|
|
if namespace == ("generic_exception",):
|
|
raise Exception()
|
|
return super().list_tables(namespace)
|
|
|
|
|
|
class MockCatalogExceptionListingNamespaces(MockCatalog):
|
|
def list_namespaces(self) -> Iterable[Tuple[str]]:
|
|
raise Exception("Test exception")
|
|
|
|
|
|
def test_exception_while_listing_namespaces() -> None:
|
|
source = with_iceberg_source(processing_threads=2)
|
|
mock_catalog = MockCatalogExceptionListingNamespaces({})
|
|
with patch(
|
|
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
|
|
) as get_catalog:
|
|
get_catalog.return_value = mock_catalog
|
|
wus = [*source.get_workunits_internal()]
|
|
assert len(wus) == 0
|
|
assert source.report.failures.total_elements == 1
|
|
|
|
|
|
def test_known_exception_while_listing_tables() -> None:
|
|
source = with_iceberg_source(processing_threads=2)
|
|
mock_catalog = MockCatalogExceptionListingTables(
|
|
{
|
|
"namespaceA": {
|
|
"table1": lambda: Table(
|
|
identifier=("namespaceA", "table1"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table1",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table1",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
)
|
|
},
|
|
"no_such_namespace": {},
|
|
"namespaceB": {
|
|
"table2": lambda: Table(
|
|
identifier=("namespaceB", "table2"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceB/table2",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceB/table2",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table3": lambda: Table(
|
|
identifier=("namespaceB", "table3"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceB/table3",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceB/table3",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
},
|
|
"namespaceC": {
|
|
"table4": lambda: Table(
|
|
identifier=("namespaceC", "table4"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceC/table4",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceC/table4",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
)
|
|
},
|
|
"namespaceD": {
|
|
"table5": lambda: Table(
|
|
identifier=("namespaceD", "table5"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table5",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table5",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
)
|
|
},
|
|
}
|
|
)
|
|
with patch(
|
|
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
|
|
) as get_catalog:
|
|
get_catalog.return_value = mock_catalog
|
|
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
|
|
# ingested 5 tables (6 MCPs each) and 5 namespaces (4 MCPs each), despite exception
|
|
expected_wu_urns = [
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)",
|
|
] * MCPS_PER_TABLE + [
|
|
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
|
|
"urn:li:container:9cb5e87ec392b231720f23bf00d6f6aa",
|
|
"urn:li:container:74727446a56420d80ff3b1abf2a18087",
|
|
"urn:li:container:3f9a24213cca64ab22e409d1b9a94789",
|
|
"urn:li:container:38a0583b0305ec5066cb708199f6848c",
|
|
] * MCPS_PER_NAMESPACE
|
|
assert len(wu) == len(expected_wu_urns)
|
|
urns = []
|
|
for unit in wu:
|
|
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
|
|
urns.append(unit.metadata.entityUrn)
|
|
TestCase().assertCountEqual(
|
|
urns,
|
|
expected_wu_urns,
|
|
)
|
|
assert source.report.warnings.total_elements == 1
|
|
assert source.report.failures.total_elements == 0
|
|
assert source.report.tables_scanned == 5
|
|
|
|
|
|
def test_unknown_exception_while_listing_tables() -> None:
|
|
source = with_iceberg_source(processing_threads=2)
|
|
mock_catalog = MockCatalogExceptionListingTables(
|
|
{
|
|
"namespaceA": {
|
|
"table1": lambda: Table(
|
|
identifier=("namespaceA", "table1"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table1",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table1",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
)
|
|
},
|
|
"generic_exception": {},
|
|
"namespaceB": {
|
|
"table2": lambda: Table(
|
|
identifier=("namespaceB", "table2"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceB/table2",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceB/table2",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table3": lambda: Table(
|
|
identifier=("namespaceB", "table3"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceB/table3",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceB/table3",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
},
|
|
"namespaceC": {
|
|
"table4": lambda: Table(
|
|
identifier=("namespaceC", "table4"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceC/table4",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceC/table4",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
)
|
|
},
|
|
"namespaceD": {
|
|
"table5": lambda: Table(
|
|
identifier=("namespaceD", "table5"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table5",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table5",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
)
|
|
},
|
|
}
|
|
)
|
|
with patch(
|
|
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
|
|
) as get_catalog:
|
|
get_catalog.return_value = mock_catalog
|
|
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
|
|
# ingested 5 tables (6 MCPs each) and 5 namespaces (4 MCPs each), despite exception
|
|
expected_wu_urns = [
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)",
|
|
] * MCPS_PER_TABLE + [
|
|
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
|
|
"urn:li:container:be99158f9640329f4394315e1d8dacf3",
|
|
"urn:li:container:74727446a56420d80ff3b1abf2a18087",
|
|
"urn:li:container:3f9a24213cca64ab22e409d1b9a94789",
|
|
"urn:li:container:38a0583b0305ec5066cb708199f6848c",
|
|
] * MCPS_PER_NAMESPACE
|
|
assert len(wu) == len(expected_wu_urns)
|
|
urns = []
|
|
for unit in wu:
|
|
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
|
|
urns.append(unit.metadata.entityUrn)
|
|
TestCase().assertCountEqual(
|
|
urns,
|
|
expected_wu_urns,
|
|
)
|
|
assert source.report.warnings.total_elements == 0
|
|
assert source.report.failures.total_elements == 1
|
|
assert source.report.tables_scanned == 5
|
|
|
|
|
|
def test_proper_run_with_multiple_namespaces() -> None:
|
|
source = with_iceberg_source(processing_threads=3)
|
|
mock_catalog = MockCatalog(
|
|
{
|
|
"namespaceA": {
|
|
"table1": lambda: Table(
|
|
identifier=("namespaceA", "table1"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table1",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table1",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
)
|
|
},
|
|
"namespaceB": {},
|
|
}
|
|
)
|
|
with patch(
|
|
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
|
|
) as get_catalog:
|
|
get_catalog.return_value = mock_catalog
|
|
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
|
|
# ingested 1 table (6 MCPs) and 2 namespaces (4 MCPs each), despite exception
|
|
expected_wu_urns = [
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
|
|
] * MCPS_PER_TABLE + [
|
|
"urn:li:container:74727446a56420d80ff3b1abf2a18087",
|
|
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
|
|
] * MCPS_PER_NAMESPACE
|
|
assert len(wu) == len(expected_wu_urns)
|
|
urns = []
|
|
for unit in wu:
|
|
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
|
|
urns.append(unit.metadata.entityUrn)
|
|
TestCase().assertCountEqual(
|
|
urns,
|
|
expected_wu_urns,
|
|
)
|
|
|
|
|
|
def test_filtering() -> None:
|
|
source = with_iceberg_source(
|
|
processing_threads=1,
|
|
table_pattern=AllowDenyPattern(deny=[".*abcd.*"]),
|
|
namespace_pattern=AllowDenyPattern(allow=["namespace1"]),
|
|
)
|
|
mock_catalog = MockCatalog(
|
|
{
|
|
"namespace1": {
|
|
"table_xyz": lambda: Table(
|
|
identifier=("namespace1", "table_xyz"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespace1/table_xyz",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespace1/table_xyz",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"JKLtable": lambda: Table(
|
|
identifier=("namespace1", "JKLtable"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespace1/JKLtable",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespace1/JKLtable",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table_abcd": lambda: Table(
|
|
identifier=("namespace1", "table_abcd"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespace1/table_abcd",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespace1/table_abcd",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"aaabcd": lambda: Table(
|
|
identifier=("namespace1", "aaabcd"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespace1/aaabcd",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespace1/aaabcd",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
},
|
|
"namespace2": {
|
|
"foo": lambda: Table(
|
|
identifier=("namespace2", "foo"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespace2/foo",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespace2/foo",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"bar": lambda: Table(
|
|
identifier=("namespace2", "bar"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespace2/bar",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespace2/bar",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
},
|
|
"namespace3": {
|
|
"sales": lambda: Table(
|
|
identifier=("namespace3", "sales"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespace3/sales",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespace3/sales",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"products": lambda: Table(
|
|
identifier=("namespace2", "bar"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespace3/products",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespace3/products",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
},
|
|
}
|
|
)
|
|
with patch(
|
|
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
|
|
) as get_catalog:
|
|
get_catalog.return_value = mock_catalog
|
|
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
|
|
# ingested 2 tables (6 MCPs each) and 1 namespace (4 MCPs)
|
|
expected_wu_urns = [
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespace1.table_xyz,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespace1.JKLtable,PROD)",
|
|
] * MCPS_PER_TABLE + [
|
|
"urn:li:container:075fc8fdac17b0eb3482e73052e875f1",
|
|
] * MCPS_PER_NAMESPACE
|
|
assert len(wu) == len(expected_wu_urns)
|
|
urns = []
|
|
for unit in wu:
|
|
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
|
|
urns.append(unit.metadata.entityUrn)
|
|
TestCase().assertCountEqual(
|
|
urns,
|
|
expected_wu_urns,
|
|
)
|
|
assert source.report.tables_scanned == 2
|
|
|
|
|
|
def test_handle_expected_exceptions() -> None:
|
|
source = with_iceberg_source(processing_threads=3)
|
|
|
|
def _raise_no_such_property_exception():
|
|
raise NoSuchPropertyException()
|
|
|
|
def _raise_no_such_iceberg_table_exception():
|
|
raise NoSuchIcebergTableError()
|
|
|
|
def _raise_file_not_found_error():
|
|
raise FileNotFoundError()
|
|
|
|
def _raise_no_such_table_exception():
|
|
raise NoSuchTableError()
|
|
|
|
def _raise_server_error():
|
|
raise ServerError()
|
|
|
|
def _raise_fileio_error():
|
|
raise ValueError("Could not initialize FileIO: abc.dummy.fileio")
|
|
|
|
mock_catalog = MockCatalog(
|
|
{
|
|
"namespaceA": {
|
|
"table1": lambda: Table(
|
|
identifier=("namespaceA", "table1"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table1",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table1",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table2": lambda: Table(
|
|
identifier=("namespaceA", "table2"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table2",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table2",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table3": lambda: Table(
|
|
identifier=("namespaceA", "table3"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table3",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table3",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table4": lambda: Table(
|
|
identifier=("namespaceA", "table4"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table4",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table4",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table5": _raise_no_such_property_exception,
|
|
"table6": _raise_no_such_table_exception,
|
|
"table7": _raise_file_not_found_error,
|
|
"table8": _raise_no_such_iceberg_table_exception,
|
|
"table9": _raise_server_error,
|
|
"table10": _raise_fileio_error,
|
|
}
|
|
}
|
|
)
|
|
with patch(
|
|
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
|
|
) as get_catalog:
|
|
get_catalog.return_value = mock_catalog
|
|
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
|
|
# ingested 4 tables (6 MCPs each) and 1 namespace (4 MCPs)
|
|
expected_wu_urns = [
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
|
|
] * MCPS_PER_TABLE + [
|
|
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
|
|
] * MCPS_PER_NAMESPACE
|
|
assert len(wu) == len(expected_wu_urns)
|
|
urns = []
|
|
for unit in wu:
|
|
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
|
|
urns.append(unit.metadata.entityUrn)
|
|
TestCase().assertCountEqual(
|
|
urns,
|
|
expected_wu_urns,
|
|
)
|
|
assert source.report.warnings.total_elements == 6
|
|
assert source.report.failures.total_elements == 0
|
|
assert source.report.tables_scanned == 4
|
|
|
|
|
|
def test_handle_unexpected_exceptions() -> None:
|
|
source = with_iceberg_source(processing_threads=3)
|
|
|
|
def _raise_exception():
|
|
raise Exception()
|
|
|
|
def _raise_other_value_error_exception():
|
|
raise ValueError("Other value exception")
|
|
|
|
mock_catalog = MockCatalog(
|
|
{
|
|
"namespaceA": {
|
|
"table1": lambda: Table(
|
|
identifier=("namespaceA", "table1"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table1",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table1",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table2": lambda: Table(
|
|
identifier=("namespaceA", "table2"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table2",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table2",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table3": lambda: Table(
|
|
identifier=("namespaceA", "table3"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table3",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table3",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table4": lambda: Table(
|
|
identifier=("namespaceA", "table4"),
|
|
metadata=TableMetadataV2(
|
|
partition_specs=[PartitionSpec(spec_id=0)],
|
|
location="s3://abcdefg/namespaceA/table4",
|
|
last_column_id=0,
|
|
schemas=[Schema(schema_id=0)],
|
|
),
|
|
metadata_location="s3://abcdefg/namespaceA/table4",
|
|
io=PyArrowFileIO(),
|
|
catalog=None,
|
|
),
|
|
"table5": _raise_exception,
|
|
"table6": _raise_other_value_error_exception,
|
|
}
|
|
}
|
|
)
|
|
with patch(
|
|
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
|
|
) as get_catalog:
|
|
get_catalog.return_value = mock_catalog
|
|
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
|
|
# ingested 4 tables (6 MCPs each) and 1 namespace (4 MCPs)
|
|
expected_wu_urns = [
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
|
|
] * MCPS_PER_TABLE + [
|
|
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
|
|
] * MCPS_PER_NAMESPACE
|
|
# assert len(wu) == len(expected_wu_urns)
|
|
urns = []
|
|
for unit in wu:
|
|
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
|
|
urns.append(unit.metadata.entityUrn)
|
|
TestCase().assertCountEqual(
|
|
urns,
|
|
expected_wu_urns,
|
|
)
|
|
assert source.report.warnings.total_elements == 0
|
|
assert source.report.failures.total_elements == 1
|
|
assert source.report.tables_scanned == 4
|
|
# Needed to make sure all failures are recognized properly
|
|
failures = [f for f in source.report.failures]
|
|
TestCase().assertCountEqual(
|
|
failures[0].context,
|
|
[
|
|
"('namespaceA', 'table6') <class 'ValueError'>: Other value exception",
|
|
"('namespaceA', 'table5') <class 'Exception'>: ",
|
|
],
|
|
)
|
|
|
|
|
|
def test_ingesting_namespace_properties() -> None:
|
|
source = with_iceberg_source(processing_threads=2)
|
|
custom_properties = {"prop1": "foo", "prop2": "bar"}
|
|
mock_catalog = MockCatalog(
|
|
tables={
|
|
"namespaceA": {}, # mapped to urn:li:container:390e031441265aae5b7b7ae8d51b0c1f
|
|
"namespaceB": {}, # mapped to urn:li:container:74727446a56420d80ff3b1abf2a18087
|
|
},
|
|
namespace_properties={"namespaceA": {}, "namespaceB": custom_properties},
|
|
)
|
|
with patch(
|
|
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
|
|
) as get_catalog:
|
|
get_catalog.return_value = mock_catalog
|
|
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
|
|
aspects: Dict[str, Dict[str, Any]] = defaultdict(dict)
|
|
for unit in wu:
|
|
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
|
|
mcpw: MetadataChangeProposalWrapper = unit.metadata
|
|
assert mcpw.entityUrn
|
|
assert mcpw.aspectName
|
|
aspects[mcpw.entityUrn][mcpw.aspectName] = mcpw.aspect
|
|
assert (
|
|
aspects["urn:li:container:390e031441265aae5b7b7ae8d51b0c1f"][
|
|
"containerProperties"
|
|
].customProperties
|
|
== {}
|
|
)
|
|
assert (
|
|
aspects["urn:li:container:74727446a56420d80ff3b1abf2a18087"][
|
|
"containerProperties"
|
|
].customProperties
|
|
== custom_properties
|
|
)
|