diff --git a/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py b/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py index 166ddd40c2f..16e18f6142c 100644 --- a/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py @@ -11,6 +11,7 @@ """ Iceberg source methods. """ +import time import traceback from typing import Any, Iterable, Optional, Tuple @@ -169,6 +170,43 @@ class IcebergSource(DatabaseServiceSource): yield Either(right=schema_request) self.register_record_schema_request(schema_request=schema_request) + def _load_iceberg_table(self, table_identifier: str): + """ + load iceberg table properly with handling + network connection error + """ + try: + from botocore.exceptions import EndpointConnectionError + + # Add retry logic for transient network issues + max_retries = 3 + retry_delay = 1 + + for attempt in range(max_retries): + try: + table = self.iceberg.load_table(table_identifier) + # Success, exit retry loop + return table + except (OSError, EndpointConnectionError) as e: + if "Couldn't resolve host name" in str( + e + ) or "NETWORK_CONNECTION" in str(e): + if attempt < max_retries - 1: + logger.warning( + f"Network error loading table {table_identifier}, " + f"retrying in {retry_delay} seconds (attempt {attempt + 1}/{max_retries}): {e}" + ) + time.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + logger.warning(f"Maximum retries reached::: {exc}") + else: + logger.warning(f"Other error than host connection::: {exc}") + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Could not load iceberg table properly {exc}") + return None + def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: """ Prepares the table name to be sent to stage. @@ -178,9 +216,14 @@ class IcebergSource(DatabaseServiceSource): for table_identifier in self.iceberg.list_tables(namespace): try: - table = self.iceberg.load_table(table_identifier) + table = self._load_iceberg_table(table_identifier) # extract table name from table identifier, which does not include catalog name table_name = get_table_name_as_str(table_identifier) + if not table: + logger.debug( + f"iceberg Table could not be fetched for table name = {table_name}" + ) + continue table_fqn = fqn.build( self.metadata, entity_type=Table, diff --git a/ingestion/tests/unit/topology/database/test_iceberg.py b/ingestion/tests/unit/topology/database/test_iceberg.py index 46bf98c8f70..e0739837990 100644 --- a/ingestion/tests/unit/topology/database/test_iceberg.py +++ b/ingestion/tests/unit/topology/database/test_iceberg.py @@ -661,9 +661,7 @@ class IcebergUnitTest(TestCase): with ( patch.object(HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST), - patch.object( - HiveCatalog, "load_table", side_effect=raise_no_such_iceberg_table - ), + patch.object(self.iceberg, "_load_iceberg_table", return_value=None), ): self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0)