From 5f23652fd33ad5e8f0b5be7ea5ea4b3bba0710fe Mon Sep 17 00:00:00 2001 From: skrydal Date: Thu, 11 Sep 2025 00:57:03 +0200 Subject: [PATCH] fix(ingestion/iceberg): Improve iceberg source resiliency to server errors (#14731) --- .../ingestion/source/iceberg/iceberg.py | 106 +++++--- metadata-ingestion/tests/unit/test_iceberg.py | 249 +++++++++++++++++- 2 files changed, 317 insertions(+), 38 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index d84b5fa7f6..89c483b2f7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -12,7 +12,7 @@ from pyiceberg.exceptions import ( NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError, - ServerError, + RESTError, ) from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit from pyiceberg.table import Table @@ -154,6 +154,10 @@ class IcebergSource(StatefulIngestionSourceBase): self.report: IcebergSourceReport = IcebergSourceReport() self.config: IcebergSourceConfig = config self.ctx: PipelineContext = ctx + self.stamping_processor = AutoSystemMetadata( + self.ctx + ) # single instance used only when processing namespaces + self.namespaces: List[Tuple[Identifier, str]] = [] @classmethod def create(cls, config_dict: Dict, ctx: PipelineContext) -> "IcebergSource": @@ -246,6 +250,13 @@ class IcebergSource(StatefulIngestionSourceBase): context=str(namespace), exc=e, ) + except RESTError as e: + self.report.warning( + title="Iceberg REST Server Error", + message="Iceberg REST Server returned error status when trying to list tables for a namespace, skipping it.", + context=str(namespace), + exc=e, + ) except Exception as e: self.report.report_failure( title="Error when processing a namespace", @@ -322,10 +333,10 @@ class IcebergSource(StatefulIngestionSourceBase): context=dataset_name, exc=e, ) - except ServerError as e: + except RESTError as e: self.report.warning( title="Iceberg REST Server Error", - message="Iceberg returned 500 HTTP status when trying to process a table, skipping it.", + message="Iceberg REST Server returned error status when trying to process a table, skipping it.", context=dataset_name, exc=e, ) @@ -365,7 +376,7 @@ class IcebergSource(StatefulIngestionSourceBase): ) try: - catalog = self.config.get_catalog() + self.catalog = self.config.get_catalog() except Exception as e: self.report.report_failure( title="Failed to initialize catalog object", @@ -375,33 +386,7 @@ class IcebergSource(StatefulIngestionSourceBase): return try: - stamping_processor = AutoSystemMetadata(self.ctx) - namespace_ids = self._get_namespaces(catalog) - namespaces: List[Tuple[Identifier, str]] = [] - for namespace in namespace_ids: - namespace_repr = ".".join(namespace) - LOGGER.debug(f"Processing namespace {namespace_repr}") - namespace_urn = make_container_urn( - NamespaceKey( - namespace=namespace_repr, - platform=self.platform, - instance=self.config.platform_instance, - env=self.config.env, - ) - ) - namespace_properties: Properties = catalog.load_namespace_properties( - namespace - ) - namespaces.append((namespace, namespace_urn)) - for aspect in self._create_iceberg_namespace_aspects( - namespace, namespace_properties - ): - yield stamping_processor.stamp_wu( - MetadataChangeProposalWrapper( - entityUrn=namespace_urn, aspect=aspect - ).as_workunit() - ) - LOGGER.debug("Namespaces ingestion completed") + yield from self._process_namespaces() except Exception as e: self.report.report_failure( title="Failed to list namespaces", @@ -415,13 +400,70 @@ class IcebergSource(StatefulIngestionSourceBase): args_list=[ (dataset_path, namespace_urn) for dataset_path, namespace_urn in self._get_datasets( - catalog, namespaces + self.catalog, self.namespaces ) ], max_workers=self.config.processing_threads, ): yield wu + def _try_processing_namespace( + self, namespace: Identifier + ) -> Iterable[MetadataWorkUnit]: + namespace_repr = ".".join(namespace) + try: + LOGGER.debug(f"Processing namespace {namespace_repr}") + namespace_urn = make_container_urn( + NamespaceKey( + namespace=namespace_repr, + platform=self.platform, + instance=self.config.platform_instance, + env=self.config.env, + ) + ) + + namespace_properties: Properties = self.catalog.load_namespace_properties( + namespace + ) + for aspect in self._create_iceberg_namespace_aspects( + namespace, namespace_properties + ): + yield self.stamping_processor.stamp_wu( + MetadataChangeProposalWrapper( + entityUrn=namespace_urn, aspect=aspect + ).as_workunit() + ) + self.namespaces.append((namespace, namespace_urn)) + except NoSuchNamespaceError as e: + self.report.report_warning( + title="Failed to retrieve namespace properties", + message="Couldn't find the namespace, was it deleted during the ingestion?", + context=namespace_repr, + exc=e, + ) + return + except RESTError as e: + self.report.warning( + title="Iceberg REST Server Error", + message="Iceberg REST Server returned error status when trying to retrieve namespace properties, skipping it.", + context=str(namespace), + exc=e, + ) + except Exception as e: + self.report.report_failure( + title="Failed to process namespace", + message="Unhandled exception happened during processing of the namespace", + context=namespace_repr, + exc=e, + ) + + def _process_namespaces(self) -> Iterable[MetadataWorkUnit]: + namespace_ids = self._get_namespaces(self.catalog) + for namespace in namespace_ids: + yield from self._try_processing_namespace(namespace) + + LOGGER.debug("Namespaces ingestion completed") + def _create_iceberg_table_aspects( self, dataset_name: str, table: Table, namespace_urn: str ) -> Iterable[_Aspect]: diff --git a/metadata-ingestion/tests/unit/test_iceberg.py b/metadata-ingestion/tests/unit/test_iceberg.py index 980ee26227..6eaeef4315 100644 --- a/metadata-ingestion/tests/unit/test_iceberg.py +++ b/metadata-ingestion/tests/unit/test_iceberg.py @@ -12,6 +12,7 @@ from pyiceberg.exceptions import ( NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError, + RESTError, ServerError, ) from pyiceberg.io.pyarrow import PyArrowFileIO @@ -573,6 +574,8 @@ class MockCatalogExceptionListingTables(MockCatalog): def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]: if namespace == ("no_such_namespace",): raise NoSuchNamespaceError() + if namespace == ("rest_error",): + raise RESTError() if namespace == ("generic_exception",): raise Exception() return super().list_tables(namespace) @@ -583,6 +586,17 @@ class MockCatalogExceptionListingNamespaces(MockCatalog): raise Exception("Test exception") +class MockCatalogExceptionRetrievingNamespaceProperties(MockCatalog): + def load_namespace_properties(self, namespace: Tuple[str, ...]) -> Dict[str, str]: + if namespace == ("no_such_namespace",): + raise NoSuchNamespaceError() + if namespace == ("rest_error",): + raise RESTError() + if namespace == ("generic_exception",): + raise Exception() + return super().load_namespace_properties(namespace) + + def test_exception_while_listing_namespaces() -> None: source = with_iceberg_source(processing_threads=2) mock_catalog = MockCatalogExceptionListingNamespaces({}) @@ -595,9 +609,9 @@ def test_exception_while_listing_namespaces() -> None: assert source.report.failures.total_elements == 1 -def test_known_exception_while_listing_tables() -> None: +def test_known_exception_while_retrieving_namespace_properties() -> None: source = with_iceberg_source(processing_threads=2) - mock_catalog = MockCatalogExceptionListingTables( + mock_catalog = MockCatalogExceptionRetrievingNamespaceProperties( { "namespaceA": { "table1": lambda: Table( @@ -614,6 +628,7 @@ def test_known_exception_while_listing_tables() -> None: ) }, "no_such_namespace": {}, + "rest_error": {}, "namespaceB": { "table2": lambda: Table( identifier=("namespaceB", "table2"), @@ -675,7 +690,7 @@ def test_known_exception_while_listing_tables() -> None: ) as get_catalog: get_catalog.return_value = mock_catalog wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] - # ingested 5 tables (6 MCPs each) and 5 namespaces (4 MCPs each), despite exception + # ingested 5 tables (6 MCPs each) and 4 namespaces (4 MCPs each), we will not ingest namespaces at all if we fail to get their properties expected_wu_urns = [ "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)", "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)", @@ -684,7 +699,6 @@ def test_known_exception_while_listing_tables() -> None: "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)", ] * MCPS_PER_TABLE + [ "urn:li:container:390e031441265aae5b7b7ae8d51b0c1f", - "urn:li:container:9cb5e87ec392b231720f23bf00d6f6aa", "urn:li:container:74727446a56420d80ff3b1abf2a18087", "urn:li:container:3f9a24213cca64ab22e409d1b9a94789", "urn:li:container:38a0583b0305ec5066cb708199f6848c", @@ -698,7 +712,224 @@ def test_known_exception_while_listing_tables() -> None: urns, expected_wu_urns, ) - assert source.report.warnings.total_elements == 1 + assert source.report.warnings.total_elements == 2 + assert source.report.failures.total_elements == 0 + assert source.report.tables_scanned == 5 + + +def test_unknown_exception_while_retrieving_namespace_properties() -> None: + source = with_iceberg_source(processing_threads=2) + mock_catalog = MockCatalogExceptionRetrievingNamespaceProperties( + { + "namespaceA": { + "table1": lambda: Table( + identifier=("namespaceA", "table1"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table1", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table1", + io=PyArrowFileIO(), + catalog=None, + ) + }, + "generic_exception": {}, + "namespaceB": { + "table2": lambda: Table( + identifier=("namespaceB", "table2"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceB/table2", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceB/table2", + io=PyArrowFileIO(), + catalog=None, + ), + "table3": lambda: Table( + identifier=("namespaceB", "table3"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceB/table3", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceB/table3", + io=PyArrowFileIO(), + catalog=None, + ), + }, + "namespaceC": { + "table4": lambda: Table( + identifier=("namespaceC", "table4"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceC/table4", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceC/table4", + io=PyArrowFileIO(), + catalog=None, + ) + }, + "namespaceD": { + "table5": lambda: Table( + identifier=("namespaceD", "table5"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table5", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table5", + io=PyArrowFileIO(), + catalog=None, + ) + }, + } + ) + with patch( + "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" + ) as get_catalog: + get_catalog.return_value = mock_catalog + wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] + # ingested 5 tables (6 MCPs each) and 4 namespaces (4 MCPs each), despite exception + expected_wu_urns = [ + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)", + ] * MCPS_PER_TABLE + [ + "urn:li:container:390e031441265aae5b7b7ae8d51b0c1f", + "urn:li:container:74727446a56420d80ff3b1abf2a18087", + "urn:li:container:3f9a24213cca64ab22e409d1b9a94789", + "urn:li:container:38a0583b0305ec5066cb708199f6848c", + ] * MCPS_PER_NAMESPACE + assert len(wu) == len(expected_wu_urns) + urns = [] + for unit in wu: + assert isinstance(unit.metadata, MetadataChangeProposalWrapper) + urns.append(unit.metadata.entityUrn) + TestCase().assertCountEqual( + urns, + expected_wu_urns, + ) + assert source.report.warnings.total_elements == 0 + assert source.report.failures.total_elements == 1 + assert source.report.tables_scanned == 5 + + +def test_known_exception_while_listing_tables() -> None: + source = with_iceberg_source(processing_threads=2) + mock_catalog = MockCatalogExceptionListingTables( + { + "namespaceA": { + "table1": lambda: Table( + identifier=("namespaceA", "table1"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table1", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table1", + io=PyArrowFileIO(), + catalog=None, + ) + }, + "no_such_namespace": {}, + "rest_error": {}, + "namespaceB": { + "table2": lambda: Table( + identifier=("namespaceB", "table2"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceB/table2", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceB/table2", + io=PyArrowFileIO(), + catalog=None, + ), + "table3": lambda: Table( + identifier=("namespaceB", "table3"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceB/table3", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceB/table3", + io=PyArrowFileIO(), + catalog=None, + ), + }, + "namespaceC": { + "table4": lambda: Table( + identifier=("namespaceC", "table4"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceC/table4", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceC/table4", + io=PyArrowFileIO(), + catalog=None, + ) + }, + "namespaceD": { + "table5": lambda: Table( + identifier=("namespaceD", "table5"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table5", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table5", + io=PyArrowFileIO(), + catalog=None, + ) + }, + } + ) + with patch( + "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" + ) as get_catalog: + get_catalog.return_value = mock_catalog + wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] + # ingested 5 tables (6 MCPs each) and 6 namespaces (4 MCPs each), despite exception + expected_wu_urns = [ + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)", + ] * MCPS_PER_TABLE + [ + "urn:li:container:390e031441265aae5b7b7ae8d51b0c1f", + "urn:li:container:9cb5e87ec392b231720f23bf00d6f6aa", + "urn:li:container:74727446a56420d80ff3b1abf2a18087", + "urn:li:container:3f9a24213cca64ab22e409d1b9a94789", + "urn:li:container:38a0583b0305ec5066cb708199f6848c", + "urn:li:container:7b510fcb61d4977da0b1707e533999d8", + ] * MCPS_PER_NAMESPACE + assert len(wu) == len(expected_wu_urns) + urns = [] + for unit in wu: + assert isinstance(unit.metadata, MetadataChangeProposalWrapper) + urns.append(unit.metadata.entityUrn) + TestCase().assertCountEqual( + urns, + expected_wu_urns, + ) + assert source.report.warnings.total_elements == 2 assert source.report.failures.total_elements == 0 assert source.report.tables_scanned == 5 @@ -1009,6 +1240,9 @@ def test_handle_expected_exceptions() -> None: def _raise_server_error(): raise ServerError() + def _raise_rest_error(): + raise RESTError() + def _raise_fileio_error(): raise ValueError("Could not initialize FileIO: abc.dummy.fileio") @@ -1069,6 +1303,7 @@ def test_handle_expected_exceptions() -> None: "table8": _raise_no_such_iceberg_table_exception, "table9": _raise_server_error, "table10": _raise_fileio_error, + "table11": _raise_rest_error, } } ) @@ -1095,7 +1330,9 @@ def test_handle_expected_exceptions() -> None: urns, expected_wu_urns, ) - assert source.report.warnings.total_elements == 6 + assert ( + source.report.warnings.total_elements == 6 + ) # ServerError and RESTError exceptions are caught together assert source.report.failures.total_elements == 0 assert source.report.tables_scanned == 4