From 457f96e8c5f00cc8525bb7afe90f00f86c6b093c Mon Sep 17 00:00:00 2001 From: skrydal Date: Mon, 13 Jan 2025 14:37:09 +0100 Subject: [PATCH] feat(ingestion/iceberg): Improve iceberg connector logging (#12317) --- .../ingestion/source/iceberg/iceberg.py | 13 ++++- .../source/iceberg/iceberg_common.py | 58 ++++++++++++++++--- .../source/iceberg/iceberg_profiler.py | 4 +- 3 files changed, 62 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index 76f24bfd63..8101f01105 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -203,7 +203,9 @@ class IcebergSource(StatefulIngestionSourceBase): with PerfTimer() as timer: table = thread_local.local_catalog.load_table(dataset_path) time_taken = timer.elapsed_seconds() - self.report.report_table_load_time(time_taken) + self.report.report_table_load_time( + time_taken, dataset_name, table.metadata_location + ) LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}") yield from self._create_iceberg_workunit(dataset_name, table) except NoSuchPropertyException as e: @@ -247,7 +249,10 @@ class IcebergSource(StatefulIngestionSourceBase): f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it." ) except Exception as e: - self.report.report_failure("general", f"Failed to create workunit: {e}") + self.report.report_failure( + "general", + f"Failed to create workunit for dataset {dataset_name}: {e}", + ) LOGGER.exception( f"Exception while processing table {dataset_path}, skipping it.", ) @@ -312,7 +317,9 @@ class IcebergSource(StatefulIngestionSourceBase): dataset_snapshot.aspects.append(schema_metadata) mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) - self.report.report_table_processing_time(timer.elapsed_seconds()) + self.report.report_table_processing_time( + timer.elapsed_seconds(), dataset_name, table.metadata_location + ) yield MetadataWorkUnit(id=dataset_name, mce=mce) dpi_aspect = self._get_dataplatform_instance_aspect(dataset_urn=dataset_urn) diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py index 4a7f6bf4d6..83fe3d1c07 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List, Optional from humanfriendly import format_timespan from pydantic import Field, validator from pyiceberg.catalog import Catalog, load_catalog +from sortedcontainers import SortedList from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import DatasetSourceConfigMixin @@ -146,19 +147,40 @@ class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin) return load_catalog(name=catalog_name, **catalog_config) +class TopTableTimings: + _VALUE_FIELD: str = "timing" + top_entites: SortedList + _size: int + + def __init__(self, size: int = 10): + self._size = size + self.top_entites = SortedList(key=lambda x: -x.get(self._VALUE_FIELD, 0)) + + def add(self, entity: Dict[str, Any]) -> None: + if self._VALUE_FIELD not in entity: + return + self.top_entites.add(entity) + if len(self.top_entites) > self._size: + self.top_entites.pop() + + def __str__(self) -> str: + if len(self.top_entites) == 0: + return "no timings reported" + return str(list(self.top_entites)) + + class TimingClass: - times: List[int] + times: SortedList def __init__(self): - self.times = [] + self.times = SortedList() - def add_timing(self, t): - self.times.append(t) + def add_timing(self, t: float) -> None: + self.times.add(t) - def __str__(self): + def __str__(self) -> str: if len(self.times) == 0: return "no timings reported" - self.times.sort() total = sum(self.times) avg = total / len(self.times) return str( @@ -180,6 +202,9 @@ class IcebergSourceReport(StaleEntityRemovalSourceReport): load_table_timings: TimingClass = field(default_factory=TimingClass) processing_table_timings: TimingClass = field(default_factory=TimingClass) profiling_table_timings: TimingClass = field(default_factory=TimingClass) + tables_load_timings: TopTableTimings = field(default_factory=TopTableTimings) + tables_profile_timings: TopTableTimings = field(default_factory=TopTableTimings) + tables_process_timings: TopTableTimings = field(default_factory=TopTableTimings) listed_namespaces: int = 0 total_listed_tables: int = 0 tables_listed_per_namespace: TopKDict[str, int] = field( @@ -201,11 +226,26 @@ class IcebergSourceReport(StaleEntityRemovalSourceReport): def report_dropped(self, ent_name: str) -> None: self.filtered.append(ent_name) - def report_table_load_time(self, t: float) -> None: + def report_table_load_time( + self, t: float, table_name: str, table_metadata_location: str + ) -> None: self.load_table_timings.add_timing(t) + self.tables_load_timings.add( + {"table": table_name, "timing": t, "metadata_file": table_metadata_location} + ) - def report_table_processing_time(self, t: float) -> None: + def report_table_processing_time( + self, t: float, table_name: str, table_metadata_location: str + ) -> None: self.processing_table_timings.add_timing(t) + self.tables_process_timings.add( + {"table": table_name, "timing": t, "metadata_file": table_metadata_location} + ) - def report_table_profiling_time(self, t: float) -> None: + def report_table_profiling_time( + self, t: float, table_name: str, table_metadata_location: str + ) -> None: self.profiling_table_timings.add_timing(t) + self.tables_profile_timings.add( + {"table": table_name, "timing": t, "metadata_file": table_metadata_location} + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py index 9cc6dd0854..7642cabbd1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py @@ -204,7 +204,9 @@ class IcebergProfiler: ) dataset_profile.fieldProfiles.append(column_profile) time_taken = timer.elapsed_seconds() - self.report.report_table_profiling_time(time_taken) + self.report.report_table_profiling_time( + time_taken, dataset_name, table.metadata_location + ) LOGGER.debug( f"Finished profiling of dataset: {dataset_name} in {time_taken}" )