mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-11 16:58:38 +00:00
MINOR: iceberg load table retry backoff (#23579)
This commit is contained in:
parent
ef96430be7
commit
da7a2778f6
@ -11,6 +11,7 @@
|
||||
"""
|
||||
Iceberg source methods.
|
||||
"""
|
||||
import time
|
||||
import traceback
|
||||
from typing import Any, Iterable, Optional, Tuple
|
||||
|
||||
@ -169,6 +170,43 @@ class IcebergSource(DatabaseServiceSource):
|
||||
yield Either(right=schema_request)
|
||||
self.register_record_schema_request(schema_request=schema_request)
|
||||
|
||||
def _load_iceberg_table(self, table_identifier: str):
|
||||
"""
|
||||
load iceberg table properly with handling
|
||||
network connection error
|
||||
"""
|
||||
try:
|
||||
from botocore.exceptions import EndpointConnectionError
|
||||
|
||||
# Add retry logic for transient network issues
|
||||
max_retries = 3
|
||||
retry_delay = 1
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
table = self.iceberg.load_table(table_identifier)
|
||||
# Success, exit retry loop
|
||||
return table
|
||||
except (OSError, EndpointConnectionError) as e:
|
||||
if "Couldn't resolve host name" in str(
|
||||
e
|
||||
) or "NETWORK_CONNECTION" in str(e):
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning(
|
||||
f"Network error loading table {table_identifier}, "
|
||||
f"retrying in {retry_delay} seconds (attempt {attempt + 1}/{max_retries}): {e}"
|
||||
)
|
||||
time.sleep(retry_delay)
|
||||
retry_delay *= 2 # Exponential backoff
|
||||
else:
|
||||
logger.warning(f"Maximum retries reached::: {exc}")
|
||||
else:
|
||||
logger.warning(f"Other error than host connection::: {exc}")
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Could not load iceberg table properly {exc}")
|
||||
return None
|
||||
|
||||
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
|
||||
"""
|
||||
Prepares the table name to be sent to stage.
|
||||
@ -178,9 +216,14 @@ class IcebergSource(DatabaseServiceSource):
|
||||
|
||||
for table_identifier in self.iceberg.list_tables(namespace):
|
||||
try:
|
||||
table = self.iceberg.load_table(table_identifier)
|
||||
table = self._load_iceberg_table(table_identifier)
|
||||
# extract table name from table identifier, which does not include catalog name
|
||||
table_name = get_table_name_as_str(table_identifier)
|
||||
if not table:
|
||||
logger.debug(
|
||||
f"iceberg Table could not be fetched for table name = {table_name}"
|
||||
)
|
||||
continue
|
||||
table_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Table,
|
||||
|
@ -661,9 +661,7 @@ class IcebergUnitTest(TestCase):
|
||||
|
||||
with (
|
||||
patch.object(HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST),
|
||||
patch.object(
|
||||
HiveCatalog, "load_table", side_effect=raise_no_such_iceberg_table
|
||||
),
|
||||
patch.object(self.iceberg, "_load_iceberg_table", return_value=None),
|
||||
):
|
||||
self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user