import uuid from collections import defaultdict from decimal import Decimal from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple from unittest import TestCase from unittest.mock import patch import pytest from pydantic import ValidationError from pyiceberg.exceptions import ( NoSuchIcebergTableError, NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError, ServerError, ) from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.types import ( BinaryType, BooleanType, DateType, DecimalType, DoubleType, FixedType, FloatType, IcebergType, IntegerType, ListType, LongType, MapType, NestedField, PrimitiveType, StringType, StructType, TimestampType, TimestamptzType, TimeType, UUIDType, ) from datahub.configuration.common import AllowDenyPattern from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.iceberg.iceberg import ( IcebergProfiler, IcebergSource, IcebergSourceConfig, ) from datahub.metadata.com.linkedin.pegasus2avro.schema import ArrayType, SchemaField from datahub.metadata.schema_classes import ( ArrayTypeClass, BooleanTypeClass, BytesTypeClass, DateTypeClass, FixedTypeClass, NumberTypeClass, RecordTypeClass, StringTypeClass, TimeTypeClass, ) MCPS_PER_TABLE = 7 # assuming no profiling MCPS_PER_NAMESPACE = 5 def with_iceberg_source(processing_threads: int = 1, **kwargs: Any) -> IcebergSource: catalog = {"test": {"type": "rest"}} config = IcebergSourceConfig( catalog=catalog, processing_threads=processing_threads, **kwargs ) return IcebergSource( ctx=PipelineContext(run_id="iceberg-source-test"), config=config, ) def with_iceberg_profiler() -> IcebergProfiler: iceberg_source_instance = with_iceberg_source() return IcebergProfiler( iceberg_source_instance.report, iceberg_source_instance.config.profiling ) def assert_field( schema_field: SchemaField, expected_description: Optional[str], expected_nullable: bool, expected_type: Any, ) -> None: assert schema_field.description == expected_description, ( f"Field description '{schema_field.description}' is different from expected description '{expected_description}'" ) assert schema_field.nullable == expected_nullable, ( f"Field nullable '{schema_field.nullable}' is different from expected nullable '{expected_nullable}'" ) assert isinstance(schema_field.type.type, expected_type), ( f"Field type {schema_field.type.type} is different from expected type {expected_type}" ) def test_config_no_catalog(): """ Test when no Iceberg catalog is provided. """ with pytest.raises(ValidationError, match="catalog"): IcebergSourceConfig() # type: ignore def test_config_catalog_not_configured(): """ Test when an Iceberg catalog is provided, but not properly configured. """ # When no catalog configurationis provided, the config should be invalid with pytest.raises(ValidationError, match="type"): IcebergSourceConfig(catalog={}) # type: ignore # When a catalog name is provided without configuration, the config should be invalid with pytest.raises(ValidationError): IcebergSourceConfig(catalog={"test": {}}) def test_config_deprecated_catalog_configuration(): """ Test when a deprecated Iceberg catalog configuration is provided, it should be converted to the current scheme. """ deprecated_config = { "name": "test", "type": "rest", "config": {"uri": "http://a.uri.test", "another_prop": "another_value"}, } migrated_config = IcebergSourceConfig(catalog=deprecated_config) assert migrated_config.catalog["test"] is not None assert migrated_config.catalog["test"]["type"] == "rest" assert migrated_config.catalog["test"]["uri"] == "http://a.uri.test" assert migrated_config.catalog["test"]["another_prop"] == "another_value" def test_config_for_tests(): """ Test valid iceberg source that will be used in unit tests. """ with_iceberg_source() def test_config_support_nested_dicts(): """ Test that Iceberg source supports nested dictionaries inside its configuration, as allowed by pyiceberg. """ catalog = { "test": { "type": "rest", "nested_dict": { "nested_key": "nested_value", "nested_array": ["a1", "a2"], "subnested_dict": {"subnested_key": "subnested_value"}, }, } } test_config = IcebergSourceConfig(catalog=catalog) assert isinstance(test_config.catalog["test"]["nested_dict"], Dict) assert test_config.catalog["test"]["nested_dict"]["nested_key"] == "nested_value" assert isinstance(test_config.catalog["test"]["nested_dict"]["nested_array"], List) assert test_config.catalog["test"]["nested_dict"]["nested_array"][0] == "a1" assert isinstance( test_config.catalog["test"]["nested_dict"]["subnested_dict"], Dict ) assert ( test_config.catalog["test"]["nested_dict"]["subnested_dict"]["subnested_key"] == "subnested_value" ) @pytest.mark.parametrize( "iceberg_type, expected_schema_field_type", [ (BinaryType(), BytesTypeClass), (BooleanType(), BooleanTypeClass), (DateType(), DateTypeClass), ( DecimalType(3, 2), NumberTypeClass, ), (DoubleType(), NumberTypeClass), (FixedType(4), FixedTypeClass), (FloatType(), NumberTypeClass), (IntegerType(), NumberTypeClass), (LongType(), NumberTypeClass), (StringType(), StringTypeClass), ( TimestampType(), TimeTypeClass, ), ( TimestamptzType(), TimeTypeClass, ), (TimeType(), TimeTypeClass), ( UUIDType(), StringTypeClass, ), ], ) def test_iceberg_primitive_type_to_schema_field( iceberg_type: PrimitiveType, expected_schema_field_type: Any ) -> None: """ Test converting a primitive typed Iceberg field to a SchemaField """ iceberg_source_instance = with_iceberg_source() for column in [ NestedField( 1, "required_field", iceberg_type, True, "required field documentation" ), NestedField( 1, "optional_field", iceberg_type, False, "optional field documentation" ), ]: schema = Schema(column) schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema) assert len(schema_fields) == 1, ( f"Expected 1 field, but got {len(schema_fields)}" ) assert_field( schema_fields[0], column.doc, column.optional, expected_schema_field_type, ) @pytest.mark.parametrize( "iceberg_type, expected_array_nested_type", [ (BinaryType(), "bytes"), (BooleanType(), "boolean"), (DateType(), "date"), ( DecimalType(3, 2), "decimal", ), (DoubleType(), "double"), (FixedType(4), "fixed"), (FloatType(), "float"), (IntegerType(), "int"), (LongType(), "long"), (StringType(), "string"), ( TimestampType(), "timestamp-micros", ), ( TimestamptzType(), "timestamp-micros", ), (TimeType(), "time-micros"), ( UUIDType(), "uuid", ), ], ) def test_iceberg_list_to_schema_field( iceberg_type: PrimitiveType, expected_array_nested_type: Any ) -> None: """ Test converting a list typed Iceberg field to an ArrayType SchemaField, including the list nested type. """ for list_column in [ NestedField( 1, "listField", ListType(2, iceberg_type, True), True, "required field, required element documentation", ), NestedField( 1, "listField", ListType(2, iceberg_type, False), True, "required field, optional element documentation", ), NestedField( 1, "listField", ListType(2, iceberg_type, True), False, "optional field, required element documentation", ), NestedField( 1, "listField", ListType(2, iceberg_type, False), False, "optional field, optional element documentation", ), ]: iceberg_source_instance = with_iceberg_source() schema = Schema(list_column) schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema) assert len(schema_fields) == 1, ( f"Expected 1 field, but got {len(schema_fields)}" ) assert_field( schema_fields[0], list_column.doc, list_column.optional, ArrayTypeClass ) assert isinstance(schema_fields[0].type.type, ArrayType), ( f"Field type {schema_fields[0].type.type} was expected to be {ArrayType}" ) arrayType: ArrayType = schema_fields[0].type.type assert arrayType.nestedType == [expected_array_nested_type], ( f"List Field nested type {arrayType.nestedType} was expected to be {expected_array_nested_type}" ) @pytest.mark.parametrize( "iceberg_type, expected_map_type", [ (BinaryType(), BytesTypeClass), (BooleanType(), BooleanTypeClass), (DateType(), DateTypeClass), ( DecimalType(3, 2), NumberTypeClass, ), (DoubleType(), NumberTypeClass), (FixedType(4), FixedTypeClass), (FloatType(), NumberTypeClass), (IntegerType(), NumberTypeClass), (LongType(), NumberTypeClass), (StringType(), StringTypeClass), ( TimestampType(), TimeTypeClass, ), ( TimestamptzType(), TimeTypeClass, ), (TimeType(), TimeTypeClass), ( UUIDType(), StringTypeClass, ), ], ) def test_iceberg_map_to_schema_field( iceberg_type: PrimitiveType, expected_map_type: Any ) -> None: """ Test converting a map typed Iceberg field to a MapType SchemaField, where the key is the same type as the value. """ for map_column in [ NestedField( 1, "mapField", MapType(11, iceberg_type, 12, iceberg_type, True), True, "required field, required value documentation", ), NestedField( 1, "mapField", MapType(11, iceberg_type, 12, iceberg_type, False), True, "required field, optional value documentation", ), NestedField( 1, "mapField", MapType(11, iceberg_type, 12, iceberg_type, True), False, "optional field, required value documentation", ), NestedField( 1, "mapField", MapType(11, iceberg_type, 12, iceberg_type, False), False, "optional field, optional value documentation", ), ]: iceberg_source_instance = with_iceberg_source() schema = Schema(map_column) schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema) # Converting an Iceberg Map type will be done by creating an array of struct(key, value) records. # The first field will be the array. assert len(schema_fields) == 3, ( f"Expected 3 fields, but got {len(schema_fields)}" ) assert_field( schema_fields[0], map_column.doc, map_column.optional, ArrayTypeClass ) # The second field will be the key type assert_field(schema_fields[1], None, False, expected_map_type) # The third field will be the value type assert_field( schema_fields[2], None, not map_column.field_type.value_required, expected_map_type, ) @pytest.mark.parametrize( "iceberg_type, expected_schema_field_type", [ (BinaryType(), BytesTypeClass), (BooleanType(), BooleanTypeClass), (DateType(), DateTypeClass), ( DecimalType(3, 2), NumberTypeClass, ), (DoubleType(), NumberTypeClass), (FixedType(4), FixedTypeClass), (FloatType(), NumberTypeClass), (IntegerType(), NumberTypeClass), (LongType(), NumberTypeClass), (StringType(), StringTypeClass), ( TimestampType(), TimeTypeClass, ), ( TimestamptzType(), TimeTypeClass, ), (TimeType(), TimeTypeClass), ( UUIDType(), StringTypeClass, ), ], ) def test_iceberg_struct_to_schema_field( iceberg_type: PrimitiveType, expected_schema_field_type: Any ) -> None: """ Test converting a struct typed Iceberg field to a RecordType SchemaField. """ field1 = NestedField(11, "field1", iceberg_type, True, "field documentation") struct_column = NestedField( 1, "structField", StructType(field1), True, "struct documentation" ) iceberg_source_instance = with_iceberg_source() schema = Schema(struct_column) schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema) assert len(schema_fields) == 2, f"Expected 2 fields, but got {len(schema_fields)}" assert_field( schema_fields[0], struct_column.doc, struct_column.optional, RecordTypeClass ) assert_field( schema_fields[1], field1.doc, field1.optional, expected_schema_field_type ) @pytest.mark.parametrize( "value_type, value, expected_value", [ (BinaryType(), bytes([1, 2, 3, 4, 5]), "b'\\x01\\x02\\x03\\x04\\x05'"), (BooleanType(), True, "True"), (DateType(), 19543, "2023-07-05"), (DecimalType(3, 2), Decimal((0, (3, 1, 4), -2)), "3.14"), (DoubleType(), 3.4, "3.4"), (FixedType(4), bytes([1, 2, 3, 4]), "b'\\x01\\x02\\x03\\x04'"), (FloatType(), 3.4, "3.4"), (IntegerType(), 3, "3"), (LongType(), 4294967295000, "4294967295000"), (StringType(), "a string", "a string"), ( TimestampType(), 1688559488157000, "2023-07-05T12:18:08.157000", ), ( TimestamptzType(), 1688559488157000, "2023-07-05T12:18:08.157000+00:00", ), (TimeType(), 40400000000, "11:13:20"), ( UUIDType(), uuid.UUID("00010203-0405-0607-0809-0a0b0c0d0e0f"), "00010203-0405-0607-0809-0a0b0c0d0e0f", ), ], ) def test_iceberg_profiler_value_render( value_type: IcebergType, value: Any, expected_value: Optional[str] ) -> None: iceberg_profiler_instance = with_iceberg_profiler() assert ( iceberg_profiler_instance._render_value("a.dataset", value_type, value) == expected_value ) def test_avro_decimal_bytes_nullable() -> None: """ The following test exposes a problem with decimal (bytes) not preserving extra attributes like _nullable. Decimal (fixed) and Boolean for example do. NOTE: This bug was by-passed by mapping the Decimal type to fixed instead of bytes. """ import avro.schema decimal_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "bytes", "precision": 3, "scale": 2, "logicalType": "decimal", "native_data_type": "decimal(3, 2)", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}""" decimal_avro_schema = avro.schema.parse(decimal_avro_schema_string) print("\nDecimal (bytes)") print( f"Original avro schema string: {decimal_avro_schema_string}" ) print(f"After avro parsing, _nullable attribute is missing: {decimal_avro_schema}") decimal_fixed_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "fixed", "logicalType": "decimal", "precision": 3, "scale": 2, "native_data_type": "decimal(3, 2)", "_nullable": false, "name": "bogusName", "size": 16}, "name": "required_field", "doc": "required field documentation"}]}""" decimal_fixed_avro_schema = avro.schema.parse(decimal_fixed_avro_schema_string) print("\nDecimal (fixed)") print( f"Original avro schema string: {decimal_fixed_avro_schema_string}" ) print( f"After avro parsing, _nullable attribute is preserved: {decimal_fixed_avro_schema}" ) boolean_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "boolean", "native_data_type": "boolean", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}""" boolean_avro_schema = avro.schema.parse(boolean_avro_schema_string) print("\nBoolean") print( f"Original avro schema string: {boolean_avro_schema_string}" ) print( f"After avro parsing, _nullable attribute is preserved: {boolean_avro_schema}" ) class MockCatalog: def __init__( self, tables: Dict[str, Dict[str, Callable[[], Table]]], namespace_properties: Optional[Dict[str, Dict[str, str]]] = None, ): """ :param tables: Dictionary containing namespaces as keys and dictionaries containing names of tables (keys) and their metadata as values """ self.tables = tables self.namespace_properties = ( namespace_properties if namespace_properties else defaultdict(dict) ) def list_namespaces(self) -> Iterable[Tuple[str]]: return [*[(key,) for key in self.tables]] def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]: return [(namespace[0], table) for table in self.tables[namespace[0]]] def load_table(self, dataset_path: Tuple[str, str]) -> Table: return self.tables[dataset_path[0]][dataset_path[1]]() def load_namespace_properties(self, namespace: Tuple[str, ...]) -> Dict[str, str]: return self.namespace_properties[namespace[0]] class MockCatalogExceptionListingTables(MockCatalog): def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]: if namespace == ("no_such_namespace",): raise NoSuchNamespaceError() if namespace == ("generic_exception",): raise Exception() return super().list_tables(namespace) class MockCatalogExceptionListingNamespaces(MockCatalog): def list_namespaces(self) -> Iterable[Tuple[str]]: raise Exception("Test exception") def test_exception_while_listing_namespaces() -> None: source = with_iceberg_source(processing_threads=2) mock_catalog = MockCatalogExceptionListingNamespaces({}) with patch( "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" ) as get_catalog: get_catalog.return_value = mock_catalog wus = [*source.get_workunits_internal()] assert len(wus) == 0 assert source.report.failures.total_elements == 1 def test_known_exception_while_listing_tables() -> None: source = with_iceberg_source(processing_threads=2) mock_catalog = MockCatalogExceptionListingTables( { "namespaceA": { "table1": lambda: Table( identifier=("namespaceA", "table1"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table1", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table1", io=PyArrowFileIO(), catalog=None, ) }, "no_such_namespace": {}, "namespaceB": { "table2": lambda: Table( identifier=("namespaceB", "table2"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceB/table2", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceB/table2", io=PyArrowFileIO(), catalog=None, ), "table3": lambda: Table( identifier=("namespaceB", "table3"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceB/table3", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceB/table3", io=PyArrowFileIO(), catalog=None, ), }, "namespaceC": { "table4": lambda: Table( identifier=("namespaceC", "table4"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceC/table4", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceC/table4", io=PyArrowFileIO(), catalog=None, ) }, "namespaceD": { "table5": lambda: Table( identifier=("namespaceD", "table5"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table5", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table5", io=PyArrowFileIO(), catalog=None, ) }, } ) with patch( "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" ) as get_catalog: get_catalog.return_value = mock_catalog wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] # ingested 5 tables (6 MCPs each) and 5 namespaces (4 MCPs each), despite exception expected_wu_urns = [ "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)", ] * MCPS_PER_TABLE + [ "urn:li:container:390e031441265aae5b7b7ae8d51b0c1f", "urn:li:container:9cb5e87ec392b231720f23bf00d6f6aa", "urn:li:container:74727446a56420d80ff3b1abf2a18087", "urn:li:container:3f9a24213cca64ab22e409d1b9a94789", "urn:li:container:38a0583b0305ec5066cb708199f6848c", ] * MCPS_PER_NAMESPACE assert len(wu) == len(expected_wu_urns) urns = [] for unit in wu: assert isinstance(unit.metadata, MetadataChangeProposalWrapper) urns.append(unit.metadata.entityUrn) TestCase().assertCountEqual( urns, expected_wu_urns, ) assert source.report.warnings.total_elements == 1 assert source.report.failures.total_elements == 0 assert source.report.tables_scanned == 5 def test_unknown_exception_while_listing_tables() -> None: source = with_iceberg_source(processing_threads=2) mock_catalog = MockCatalogExceptionListingTables( { "namespaceA": { "table1": lambda: Table( identifier=("namespaceA", "table1"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table1", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table1", io=PyArrowFileIO(), catalog=None, ) }, "generic_exception": {}, "namespaceB": { "table2": lambda: Table( identifier=("namespaceB", "table2"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceB/table2", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceB/table2", io=PyArrowFileIO(), catalog=None, ), "table3": lambda: Table( identifier=("namespaceB", "table3"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceB/table3", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceB/table3", io=PyArrowFileIO(), catalog=None, ), }, "namespaceC": { "table4": lambda: Table( identifier=("namespaceC", "table4"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceC/table4", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceC/table4", io=PyArrowFileIO(), catalog=None, ) }, "namespaceD": { "table5": lambda: Table( identifier=("namespaceD", "table5"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table5", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table5", io=PyArrowFileIO(), catalog=None, ) }, } ) with patch( "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" ) as get_catalog: get_catalog.return_value = mock_catalog wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] # ingested 5 tables (6 MCPs each) and 5 namespaces (4 MCPs each), despite exception expected_wu_urns = [ "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)", ] * MCPS_PER_TABLE + [ "urn:li:container:390e031441265aae5b7b7ae8d51b0c1f", "urn:li:container:be99158f9640329f4394315e1d8dacf3", "urn:li:container:74727446a56420d80ff3b1abf2a18087", "urn:li:container:3f9a24213cca64ab22e409d1b9a94789", "urn:li:container:38a0583b0305ec5066cb708199f6848c", ] * MCPS_PER_NAMESPACE assert len(wu) == len(expected_wu_urns) urns = [] for unit in wu: assert isinstance(unit.metadata, MetadataChangeProposalWrapper) urns.append(unit.metadata.entityUrn) TestCase().assertCountEqual( urns, expected_wu_urns, ) assert source.report.warnings.total_elements == 0 assert source.report.failures.total_elements == 1 assert source.report.tables_scanned == 5 def test_proper_run_with_multiple_namespaces() -> None: source = with_iceberg_source(processing_threads=3) mock_catalog = MockCatalog( { "namespaceA": { "table1": lambda: Table( identifier=("namespaceA", "table1"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table1", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table1", io=PyArrowFileIO(), catalog=None, ) }, "namespaceB": {}, } ) with patch( "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" ) as get_catalog: get_catalog.return_value = mock_catalog wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] # ingested 1 table (6 MCPs) and 2 namespaces (4 MCPs each), despite exception expected_wu_urns = [ "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)", ] * MCPS_PER_TABLE + [ "urn:li:container:74727446a56420d80ff3b1abf2a18087", "urn:li:container:390e031441265aae5b7b7ae8d51b0c1f", ] * MCPS_PER_NAMESPACE assert len(wu) == len(expected_wu_urns) urns = [] for unit in wu: assert isinstance(unit.metadata, MetadataChangeProposalWrapper) urns.append(unit.metadata.entityUrn) TestCase().assertCountEqual( urns, expected_wu_urns, ) def test_filtering() -> None: source = with_iceberg_source( processing_threads=1, table_pattern=AllowDenyPattern(deny=[".*abcd.*"]), namespace_pattern=AllowDenyPattern(allow=["namespace1"]), ) mock_catalog = MockCatalog( { "namespace1": { "table_xyz": lambda: Table( identifier=("namespace1", "table_xyz"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespace1/table_xyz", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespace1/table_xyz", io=PyArrowFileIO(), catalog=None, ), "JKLtable": lambda: Table( identifier=("namespace1", "JKLtable"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespace1/JKLtable", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespace1/JKLtable", io=PyArrowFileIO(), catalog=None, ), "table_abcd": lambda: Table( identifier=("namespace1", "table_abcd"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespace1/table_abcd", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespace1/table_abcd", io=PyArrowFileIO(), catalog=None, ), "aaabcd": lambda: Table( identifier=("namespace1", "aaabcd"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespace1/aaabcd", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespace1/aaabcd", io=PyArrowFileIO(), catalog=None, ), }, "namespace2": { "foo": lambda: Table( identifier=("namespace2", "foo"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespace2/foo", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespace2/foo", io=PyArrowFileIO(), catalog=None, ), "bar": lambda: Table( identifier=("namespace2", "bar"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespace2/bar", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespace2/bar", io=PyArrowFileIO(), catalog=None, ), }, "namespace3": { "sales": lambda: Table( identifier=("namespace3", "sales"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespace3/sales", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespace3/sales", io=PyArrowFileIO(), catalog=None, ), "products": lambda: Table( identifier=("namespace2", "bar"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespace3/products", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespace3/products", io=PyArrowFileIO(), catalog=None, ), }, } ) with patch( "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" ) as get_catalog: get_catalog.return_value = mock_catalog wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] # ingested 2 tables (6 MCPs each) and 1 namespace (4 MCPs) expected_wu_urns = [ "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespace1.table_xyz,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespace1.JKLtable,PROD)", ] * MCPS_PER_TABLE + [ "urn:li:container:075fc8fdac17b0eb3482e73052e875f1", ] * MCPS_PER_NAMESPACE assert len(wu) == len(expected_wu_urns) urns = [] for unit in wu: assert isinstance(unit.metadata, MetadataChangeProposalWrapper) urns.append(unit.metadata.entityUrn) TestCase().assertCountEqual( urns, expected_wu_urns, ) assert source.report.tables_scanned == 2 def test_handle_expected_exceptions() -> None: source = with_iceberg_source(processing_threads=3) def _raise_no_such_property_exception(): raise NoSuchPropertyException() def _raise_no_such_iceberg_table_exception(): raise NoSuchIcebergTableError() def _raise_file_not_found_error(): raise FileNotFoundError() def _raise_no_such_table_exception(): raise NoSuchTableError() def _raise_server_error(): raise ServerError() def _raise_fileio_error(): raise ValueError("Could not initialize FileIO: abc.dummy.fileio") mock_catalog = MockCatalog( { "namespaceA": { "table1": lambda: Table( identifier=("namespaceA", "table1"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table1", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table1", io=PyArrowFileIO(), catalog=None, ), "table2": lambda: Table( identifier=("namespaceA", "table2"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table2", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table2", io=PyArrowFileIO(), catalog=None, ), "table3": lambda: Table( identifier=("namespaceA", "table3"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table3", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table3", io=PyArrowFileIO(), catalog=None, ), "table4": lambda: Table( identifier=("namespaceA", "table4"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table4", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table4", io=PyArrowFileIO(), catalog=None, ), "table5": _raise_no_such_property_exception, "table6": _raise_no_such_table_exception, "table7": _raise_file_not_found_error, "table8": _raise_no_such_iceberg_table_exception, "table9": _raise_server_error, "table10": _raise_fileio_error, } } ) with patch( "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" ) as get_catalog: get_catalog.return_value = mock_catalog wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] # ingested 4 tables (6 MCPs each) and 1 namespace (4 MCPs) expected_wu_urns = [ "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)", ] * MCPS_PER_TABLE + [ "urn:li:container:390e031441265aae5b7b7ae8d51b0c1f", ] * MCPS_PER_NAMESPACE assert len(wu) == len(expected_wu_urns) urns = [] for unit in wu: assert isinstance(unit.metadata, MetadataChangeProposalWrapper) urns.append(unit.metadata.entityUrn) TestCase().assertCountEqual( urns, expected_wu_urns, ) assert source.report.warnings.total_elements == 6 assert source.report.failures.total_elements == 0 assert source.report.tables_scanned == 4 def test_handle_unexpected_exceptions() -> None: source = with_iceberg_source(processing_threads=3) def _raise_exception(): raise Exception() def _raise_other_value_error_exception(): raise ValueError("Other value exception") mock_catalog = MockCatalog( { "namespaceA": { "table1": lambda: Table( identifier=("namespaceA", "table1"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table1", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table1", io=PyArrowFileIO(), catalog=None, ), "table2": lambda: Table( identifier=("namespaceA", "table2"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table2", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table2", io=PyArrowFileIO(), catalog=None, ), "table3": lambda: Table( identifier=("namespaceA", "table3"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table3", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table3", io=PyArrowFileIO(), catalog=None, ), "table4": lambda: Table( identifier=("namespaceA", "table4"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], location="s3://abcdefg/namespaceA/table4", last_column_id=0, schemas=[Schema(schema_id=0)], ), metadata_location="s3://abcdefg/namespaceA/table4", io=PyArrowFileIO(), catalog=None, ), "table5": _raise_exception, "table6": _raise_other_value_error_exception, } } ) with patch( "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" ) as get_catalog: get_catalog.return_value = mock_catalog wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] # ingested 4 tables (6 MCPs each) and 1 namespace (4 MCPs) expected_wu_urns = [ "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)", ] * MCPS_PER_TABLE + [ "urn:li:container:390e031441265aae5b7b7ae8d51b0c1f", ] * MCPS_PER_NAMESPACE # assert len(wu) == len(expected_wu_urns) urns = [] for unit in wu: assert isinstance(unit.metadata, MetadataChangeProposalWrapper) urns.append(unit.metadata.entityUrn) TestCase().assertCountEqual( urns, expected_wu_urns, ) assert source.report.warnings.total_elements == 0 assert source.report.failures.total_elements == 1 assert source.report.tables_scanned == 4 # Needed to make sure all failures are recognized properly failures = [f for f in source.report.failures] TestCase().assertCountEqual( failures[0].context, [ "('namespaceA', 'table6') : Other value exception", "('namespaceA', 'table5') : ", ], ) def test_ingesting_namespace_properties() -> None: source = with_iceberg_source(processing_threads=2) custom_properties = {"prop1": "foo", "prop2": "bar"} mock_catalog = MockCatalog( tables={ "namespaceA": {}, # mapped to urn:li:container:390e031441265aae5b7b7ae8d51b0c1f "namespaceB": {}, # mapped to urn:li:container:74727446a56420d80ff3b1abf2a18087 }, namespace_properties={"namespaceA": {}, "namespaceB": custom_properties}, ) with patch( "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" ) as get_catalog: get_catalog.return_value = mock_catalog wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] aspects: Dict[str, Dict[str, Any]] = defaultdict(dict) for unit in wu: assert isinstance(unit.metadata, MetadataChangeProposalWrapper) mcpw: MetadataChangeProposalWrapper = unit.metadata assert mcpw.entityUrn assert mcpw.aspectName aspects[mcpw.entityUrn][mcpw.aspectName] = mcpw.aspect assert ( aspects["urn:li:container:390e031441265aae5b7b7ae8d51b0c1f"][ "containerProperties" ].customProperties == {} ) assert ( aspects["urn:li:container:74727446a56420d80ff3b1abf2a18087"][ "containerProperties" ].customProperties == custom_properties )