feat(ingestion/iceberg): Improve iceberg connector logging (#12317)

This commit is contained in:
skrydal 2025-01-13 14:37:09 +01:00 committed by GitHub
parent 8d48622c0f
commit 457f96e8c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 62 additions and 13 deletions

View File

@ -203,7 +203,9 @@ class IcebergSource(StatefulIngestionSourceBase):
with PerfTimer() as timer: with PerfTimer() as timer:
table = thread_local.local_catalog.load_table(dataset_path) table = thread_local.local_catalog.load_table(dataset_path)
time_taken = timer.elapsed_seconds() time_taken = timer.elapsed_seconds()
self.report.report_table_load_time(time_taken) self.report.report_table_load_time(
time_taken, dataset_name, table.metadata_location
)
LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}") LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}")
yield from self._create_iceberg_workunit(dataset_name, table) yield from self._create_iceberg_workunit(dataset_name, table)
except NoSuchPropertyException as e: except NoSuchPropertyException as e:
@ -247,7 +249,10 @@ class IcebergSource(StatefulIngestionSourceBase):
f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it." f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it."
) )
except Exception as e: except Exception as e:
self.report.report_failure("general", f"Failed to create workunit: {e}") self.report.report_failure(
"general",
f"Failed to create workunit for dataset {dataset_name}: {e}",
)
LOGGER.exception( LOGGER.exception(
f"Exception while processing table {dataset_path}, skipping it.", f"Exception while processing table {dataset_path}, skipping it.",
) )
@ -312,7 +317,9 @@ class IcebergSource(StatefulIngestionSourceBase):
dataset_snapshot.aspects.append(schema_metadata) dataset_snapshot.aspects.append(schema_metadata)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
self.report.report_table_processing_time(timer.elapsed_seconds()) self.report.report_table_processing_time(
timer.elapsed_seconds(), dataset_name, table.metadata_location
)
yield MetadataWorkUnit(id=dataset_name, mce=mce) yield MetadataWorkUnit(id=dataset_name, mce=mce)
dpi_aspect = self._get_dataplatform_instance_aspect(dataset_urn=dataset_urn) dpi_aspect = self._get_dataplatform_instance_aspect(dataset_urn=dataset_urn)

View File

@ -5,6 +5,7 @@ from typing import Any, Dict, List, Optional
from humanfriendly import format_timespan from humanfriendly import format_timespan
from pydantic import Field, validator from pydantic import Field, validator
from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.catalog import Catalog, load_catalog
from sortedcontainers import SortedList
from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin from datahub.configuration.source_common import DatasetSourceConfigMixin
@ -146,19 +147,40 @@ class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin)
return load_catalog(name=catalog_name, **catalog_config) return load_catalog(name=catalog_name, **catalog_config)
class TopTableTimings:
_VALUE_FIELD: str = "timing"
top_entites: SortedList
_size: int
def __init__(self, size: int = 10):
self._size = size
self.top_entites = SortedList(key=lambda x: -x.get(self._VALUE_FIELD, 0))
def add(self, entity: Dict[str, Any]) -> None:
if self._VALUE_FIELD not in entity:
return
self.top_entites.add(entity)
if len(self.top_entites) > self._size:
self.top_entites.pop()
def __str__(self) -> str:
if len(self.top_entites) == 0:
return "no timings reported"
return str(list(self.top_entites))
class TimingClass: class TimingClass:
times: List[int] times: SortedList
def __init__(self): def __init__(self):
self.times = [] self.times = SortedList()
def add_timing(self, t): def add_timing(self, t: float) -> None:
self.times.append(t) self.times.add(t)
def __str__(self): def __str__(self) -> str:
if len(self.times) == 0: if len(self.times) == 0:
return "no timings reported" return "no timings reported"
self.times.sort()
total = sum(self.times) total = sum(self.times)
avg = total / len(self.times) avg = total / len(self.times)
return str( return str(
@ -180,6 +202,9 @@ class IcebergSourceReport(StaleEntityRemovalSourceReport):
load_table_timings: TimingClass = field(default_factory=TimingClass) load_table_timings: TimingClass = field(default_factory=TimingClass)
processing_table_timings: TimingClass = field(default_factory=TimingClass) processing_table_timings: TimingClass = field(default_factory=TimingClass)
profiling_table_timings: TimingClass = field(default_factory=TimingClass) profiling_table_timings: TimingClass = field(default_factory=TimingClass)
tables_load_timings: TopTableTimings = field(default_factory=TopTableTimings)
tables_profile_timings: TopTableTimings = field(default_factory=TopTableTimings)
tables_process_timings: TopTableTimings = field(default_factory=TopTableTimings)
listed_namespaces: int = 0 listed_namespaces: int = 0
total_listed_tables: int = 0 total_listed_tables: int = 0
tables_listed_per_namespace: TopKDict[str, int] = field( tables_listed_per_namespace: TopKDict[str, int] = field(
@ -201,11 +226,26 @@ class IcebergSourceReport(StaleEntityRemovalSourceReport):
def report_dropped(self, ent_name: str) -> None: def report_dropped(self, ent_name: str) -> None:
self.filtered.append(ent_name) self.filtered.append(ent_name)
def report_table_load_time(self, t: float) -> None: def report_table_load_time(
self, t: float, table_name: str, table_metadata_location: str
) -> None:
self.load_table_timings.add_timing(t) self.load_table_timings.add_timing(t)
self.tables_load_timings.add(
{"table": table_name, "timing": t, "metadata_file": table_metadata_location}
)
def report_table_processing_time(self, t: float) -> None: def report_table_processing_time(
self, t: float, table_name: str, table_metadata_location: str
) -> None:
self.processing_table_timings.add_timing(t) self.processing_table_timings.add_timing(t)
self.tables_process_timings.add(
{"table": table_name, "timing": t, "metadata_file": table_metadata_location}
)
def report_table_profiling_time(self, t: float) -> None: def report_table_profiling_time(
self, t: float, table_name: str, table_metadata_location: str
) -> None:
self.profiling_table_timings.add_timing(t) self.profiling_table_timings.add_timing(t)
self.tables_profile_timings.add(
{"table": table_name, "timing": t, "metadata_file": table_metadata_location}
)

View File

@ -204,7 +204,9 @@ class IcebergProfiler:
) )
dataset_profile.fieldProfiles.append(column_profile) dataset_profile.fieldProfiles.append(column_profile)
time_taken = timer.elapsed_seconds() time_taken = timer.elapsed_seconds()
self.report.report_table_profiling_time(time_taken) self.report.report_table_profiling_time(
time_taken, dataset_name, table.metadata_location
)
LOGGER.debug( LOGGER.debug(
f"Finished profiling of dataset: {dataset_name} in {time_taken}" f"Finished profiling of dataset: {dataset_name} in {time_taken}"
) )