fix(ingestion/iceberg): Improve iceberg source resiliency to server errors (#14731)

This commit is contained in:
skrydal 2025-09-11 00:57:03 +02:00 committed by GitHub
parent 3fbef4a632
commit 5f23652fd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 317 additions and 38 deletions

View File

@ -12,7 +12,7 @@ from pyiceberg.exceptions import (
NoSuchNamespaceError,
NoSuchPropertyException,
NoSuchTableError,
ServerError,
RESTError,
)
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
from pyiceberg.table import Table
@ -154,6 +154,10 @@ class IcebergSource(StatefulIngestionSourceBase):
self.report: IcebergSourceReport = IcebergSourceReport()
self.config: IcebergSourceConfig = config
self.ctx: PipelineContext = ctx
self.stamping_processor = AutoSystemMetadata(
self.ctx
) # single instance used only when processing namespaces
self.namespaces: List[Tuple[Identifier, str]] = []
@classmethod
def create(cls, config_dict: Dict, ctx: PipelineContext) -> "IcebergSource":
@ -246,6 +250,13 @@ class IcebergSource(StatefulIngestionSourceBase):
context=str(namespace),
exc=e,
)
except RESTError as e:
self.report.warning(
title="Iceberg REST Server Error",
message="Iceberg REST Server returned error status when trying to list tables for a namespace, skipping it.",
context=str(namespace),
exc=e,
)
except Exception as e:
self.report.report_failure(
title="Error when processing a namespace",
@ -322,10 +333,10 @@ class IcebergSource(StatefulIngestionSourceBase):
context=dataset_name,
exc=e,
)
except ServerError as e:
except RESTError as e:
self.report.warning(
title="Iceberg REST Server Error",
message="Iceberg returned 500 HTTP status when trying to process a table, skipping it.",
message="Iceberg REST Server returned error status when trying to process a table, skipping it.",
context=dataset_name,
exc=e,
)
@ -365,7 +376,7 @@ class IcebergSource(StatefulIngestionSourceBase):
)
try:
catalog = self.config.get_catalog()
self.catalog = self.config.get_catalog()
except Exception as e:
self.report.report_failure(
title="Failed to initialize catalog object",
@ -375,33 +386,7 @@ class IcebergSource(StatefulIngestionSourceBase):
return
try:
stamping_processor = AutoSystemMetadata(self.ctx)
namespace_ids = self._get_namespaces(catalog)
namespaces: List[Tuple[Identifier, str]] = []
for namespace in namespace_ids:
namespace_repr = ".".join(namespace)
LOGGER.debug(f"Processing namespace {namespace_repr}")
namespace_urn = make_container_urn(
NamespaceKey(
namespace=namespace_repr,
platform=self.platform,
instance=self.config.platform_instance,
env=self.config.env,
)
)
namespace_properties: Properties = catalog.load_namespace_properties(
namespace
)
namespaces.append((namespace, namespace_urn))
for aspect in self._create_iceberg_namespace_aspects(
namespace, namespace_properties
):
yield stamping_processor.stamp_wu(
MetadataChangeProposalWrapper(
entityUrn=namespace_urn, aspect=aspect
).as_workunit()
)
LOGGER.debug("Namespaces ingestion completed")
yield from self._process_namespaces()
except Exception as e:
self.report.report_failure(
title="Failed to list namespaces",
@ -415,13 +400,70 @@ class IcebergSource(StatefulIngestionSourceBase):
args_list=[
(dataset_path, namespace_urn)
for dataset_path, namespace_urn in self._get_datasets(
catalog, namespaces
self.catalog, self.namespaces
)
],
max_workers=self.config.processing_threads,
):
yield wu
def _try_processing_namespace(
self, namespace: Identifier
) -> Iterable[MetadataWorkUnit]:
namespace_repr = ".".join(namespace)
try:
LOGGER.debug(f"Processing namespace {namespace_repr}")
namespace_urn = make_container_urn(
NamespaceKey(
namespace=namespace_repr,
platform=self.platform,
instance=self.config.platform_instance,
env=self.config.env,
)
)
namespace_properties: Properties = self.catalog.load_namespace_properties(
namespace
)
for aspect in self._create_iceberg_namespace_aspects(
namespace, namespace_properties
):
yield self.stamping_processor.stamp_wu(
MetadataChangeProposalWrapper(
entityUrn=namespace_urn, aspect=aspect
).as_workunit()
)
self.namespaces.append((namespace, namespace_urn))
except NoSuchNamespaceError as e:
self.report.report_warning(
title="Failed to retrieve namespace properties",
message="Couldn't find the namespace, was it deleted during the ingestion?",
context=namespace_repr,
exc=e,
)
return
except RESTError as e:
self.report.warning(
title="Iceberg REST Server Error",
message="Iceberg REST Server returned error status when trying to retrieve namespace properties, skipping it.",
context=str(namespace),
exc=e,
)
except Exception as e:
self.report.report_failure(
title="Failed to process namespace",
message="Unhandled exception happened during processing of the namespace",
context=namespace_repr,
exc=e,
)
def _process_namespaces(self) -> Iterable[MetadataWorkUnit]:
namespace_ids = self._get_namespaces(self.catalog)
for namespace in namespace_ids:
yield from self._try_processing_namespace(namespace)
LOGGER.debug("Namespaces ingestion completed")
def _create_iceberg_table_aspects(
self, dataset_name: str, table: Table, namespace_urn: str
) -> Iterable[_Aspect]:

View File

@ -12,6 +12,7 @@ from pyiceberg.exceptions import (
NoSuchNamespaceError,
NoSuchPropertyException,
NoSuchTableError,
RESTError,
ServerError,
)
from pyiceberg.io.pyarrow import PyArrowFileIO
@ -573,6 +574,8 @@ class MockCatalogExceptionListingTables(MockCatalog):
def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]:
if namespace == ("no_such_namespace",):
raise NoSuchNamespaceError()
if namespace == ("rest_error",):
raise RESTError()
if namespace == ("generic_exception",):
raise Exception()
return super().list_tables(namespace)
@ -583,6 +586,17 @@ class MockCatalogExceptionListingNamespaces(MockCatalog):
raise Exception("Test exception")
class MockCatalogExceptionRetrievingNamespaceProperties(MockCatalog):
def load_namespace_properties(self, namespace: Tuple[str, ...]) -> Dict[str, str]:
if namespace == ("no_such_namespace",):
raise NoSuchNamespaceError()
if namespace == ("rest_error",):
raise RESTError()
if namespace == ("generic_exception",):
raise Exception()
return super().load_namespace_properties(namespace)
def test_exception_while_listing_namespaces() -> None:
source = with_iceberg_source(processing_threads=2)
mock_catalog = MockCatalogExceptionListingNamespaces({})
@ -595,9 +609,9 @@ def test_exception_while_listing_namespaces() -> None:
assert source.report.failures.total_elements == 1
def test_known_exception_while_listing_tables() -> None:
def test_known_exception_while_retrieving_namespace_properties() -> None:
source = with_iceberg_source(processing_threads=2)
mock_catalog = MockCatalogExceptionListingTables(
mock_catalog = MockCatalogExceptionRetrievingNamespaceProperties(
{
"namespaceA": {
"table1": lambda: Table(
@ -614,6 +628,7 @@ def test_known_exception_while_listing_tables() -> None:
)
},
"no_such_namespace": {},
"rest_error": {},
"namespaceB": {
"table2": lambda: Table(
identifier=("namespaceB", "table2"),
@ -675,7 +690,7 @@ def test_known_exception_while_listing_tables() -> None:
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
# ingested 5 tables (6 MCPs each) and 5 namespaces (4 MCPs each), despite exception
# ingested 5 tables (6 MCPs each) and 4 namespaces (4 MCPs each), we will not ingest namespaces at all if we fail to get their properties
expected_wu_urns = [
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)",
@ -684,7 +699,6 @@ def test_known_exception_while_listing_tables() -> None:
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)",
] * MCPS_PER_TABLE + [
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
"urn:li:container:9cb5e87ec392b231720f23bf00d6f6aa",
"urn:li:container:74727446a56420d80ff3b1abf2a18087",
"urn:li:container:3f9a24213cca64ab22e409d1b9a94789",
"urn:li:container:38a0583b0305ec5066cb708199f6848c",
@ -698,7 +712,224 @@ def test_known_exception_while_listing_tables() -> None:
urns,
expected_wu_urns,
)
assert source.report.warnings.total_elements == 1
assert source.report.warnings.total_elements == 2
assert source.report.failures.total_elements == 0
assert source.report.tables_scanned == 5
def test_unknown_exception_while_retrieving_namespace_properties() -> None:
source = with_iceberg_source(processing_threads=2)
mock_catalog = MockCatalogExceptionRetrievingNamespaceProperties(
{
"namespaceA": {
"table1": lambda: Table(
identifier=("namespaceA", "table1"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table1",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table1",
io=PyArrowFileIO(),
catalog=None,
)
},
"generic_exception": {},
"namespaceB": {
"table2": lambda: Table(
identifier=("namespaceB", "table2"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceB/table2",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceB/table2",
io=PyArrowFileIO(),
catalog=None,
),
"table3": lambda: Table(
identifier=("namespaceB", "table3"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceB/table3",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceB/table3",
io=PyArrowFileIO(),
catalog=None,
),
},
"namespaceC": {
"table4": lambda: Table(
identifier=("namespaceC", "table4"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceC/table4",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceC/table4",
io=PyArrowFileIO(),
catalog=None,
)
},
"namespaceD": {
"table5": lambda: Table(
identifier=("namespaceD", "table5"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table5",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table5",
io=PyArrowFileIO(),
catalog=None,
)
},
}
)
with patch(
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
# ingested 5 tables (6 MCPs each) and 4 namespaces (4 MCPs each), despite exception
expected_wu_urns = [
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)",
] * MCPS_PER_TABLE + [
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
"urn:li:container:74727446a56420d80ff3b1abf2a18087",
"urn:li:container:3f9a24213cca64ab22e409d1b9a94789",
"urn:li:container:38a0583b0305ec5066cb708199f6848c",
] * MCPS_PER_NAMESPACE
assert len(wu) == len(expected_wu_urns)
urns = []
for unit in wu:
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
urns.append(unit.metadata.entityUrn)
TestCase().assertCountEqual(
urns,
expected_wu_urns,
)
assert source.report.warnings.total_elements == 0
assert source.report.failures.total_elements == 1
assert source.report.tables_scanned == 5
def test_known_exception_while_listing_tables() -> None:
source = with_iceberg_source(processing_threads=2)
mock_catalog = MockCatalogExceptionListingTables(
{
"namespaceA": {
"table1": lambda: Table(
identifier=("namespaceA", "table1"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table1",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table1",
io=PyArrowFileIO(),
catalog=None,
)
},
"no_such_namespace": {},
"rest_error": {},
"namespaceB": {
"table2": lambda: Table(
identifier=("namespaceB", "table2"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceB/table2",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceB/table2",
io=PyArrowFileIO(),
catalog=None,
),
"table3": lambda: Table(
identifier=("namespaceB", "table3"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceB/table3",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceB/table3",
io=PyArrowFileIO(),
catalog=None,
),
},
"namespaceC": {
"table4": lambda: Table(
identifier=("namespaceC", "table4"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceC/table4",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceC/table4",
io=PyArrowFileIO(),
catalog=None,
)
},
"namespaceD": {
"table5": lambda: Table(
identifier=("namespaceD", "table5"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table5",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table5",
io=PyArrowFileIO(),
catalog=None,
)
},
}
)
with patch(
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
# ingested 5 tables (6 MCPs each) and 6 namespaces (4 MCPs each), despite exception
expected_wu_urns = [
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)",
] * MCPS_PER_TABLE + [
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
"urn:li:container:9cb5e87ec392b231720f23bf00d6f6aa",
"urn:li:container:74727446a56420d80ff3b1abf2a18087",
"urn:li:container:3f9a24213cca64ab22e409d1b9a94789",
"urn:li:container:38a0583b0305ec5066cb708199f6848c",
"urn:li:container:7b510fcb61d4977da0b1707e533999d8",
] * MCPS_PER_NAMESPACE
assert len(wu) == len(expected_wu_urns)
urns = []
for unit in wu:
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
urns.append(unit.metadata.entityUrn)
TestCase().assertCountEqual(
urns,
expected_wu_urns,
)
assert source.report.warnings.total_elements == 2
assert source.report.failures.total_elements == 0
assert source.report.tables_scanned == 5
@ -1009,6 +1240,9 @@ def test_handle_expected_exceptions() -> None:
def _raise_server_error():
raise ServerError()
def _raise_rest_error():
raise RESTError()
def _raise_fileio_error():
raise ValueError("Could not initialize FileIO: abc.dummy.fileio")
@ -1069,6 +1303,7 @@ def test_handle_expected_exceptions() -> None:
"table8": _raise_no_such_iceberg_table_exception,
"table9": _raise_server_error,
"table10": _raise_fileio_error,
"table11": _raise_rest_error,
}
}
)
@ -1095,7 +1330,9 @@ def test_handle_expected_exceptions() -> None:
urns,
expected_wu_urns,
)
assert source.report.warnings.total_elements == 6
assert (
source.report.warnings.total_elements == 6
) # ServerError and RESTError exceptions are caught together
assert source.report.failures.total_elements == 0
assert source.report.tables_scanned == 4