From 7b5680efbcf10e9c3dc6f44df9853d63a6ff13bc Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Fri, 10 Oct 2025 19:29:56 +0530 Subject: [PATCH] fix(ingest): serialisation of structured report (#14973) --- .../source_report/ingestion_stage.py | 11 +++---- .../unit/reporting/test_ingestion_stage.py | 29 ++++++++++--------- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index 0c211263f5..132ccb85ad 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -4,7 +4,6 @@ from contextlib import AbstractContextManager from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum -from typing import Tuple from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.stats_collections import TopKDict @@ -38,9 +37,7 @@ class IngestionStageReport: ingestion_high_stage_seconds: dict[IngestionHighStage, float] = field( default_factory=lambda: defaultdict(float) ) - ingestion_stage_durations: TopKDict[Tuple[IngestionHighStage, str], float] = field( - default_factory=TopKDict - ) + ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict) def new_stage( self, stage: str, high_stage: IngestionHighStage = IngestionHighStage._UNDEFINED @@ -81,9 +78,9 @@ class IngestionStageContext(AbstractContextManager): f"Time spent in stage <{self._ingestion_stage}>: {elapsed} seconds", stacklevel=2, ) - self._report.ingestion_stage_durations[ - (self._high_stage, self._ingestion_stage) - ] = elapsed + # Store tuple as string to avoid serialization errors + key = f"({self._high_stage.value}, {self._ingestion_stage})" + self._report.ingestion_stage_durations[key] = elapsed else: logger.info( f"Time spent in stage <{self._high_stage.value}>: {elapsed} seconds", diff --git a/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py index 4a8dbafae7..694044caef 100644 --- a/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py +++ b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py @@ -14,8 +14,8 @@ def test_ingestion_stage_context_records_duration(): pass assert len(report.ingestion_stage_durations) == 1 key = next(iter(report.ingestion_stage_durations.keys())) - assert key[0] == IngestionHighStage._UNDEFINED - assert "Test Stage" in key[1] + assert "Ingestion" in key + assert "Test Stage" in key def test_ingestion_stage_context_handles_exceptions(): @@ -27,7 +27,8 @@ def test_ingestion_stage_context_handles_exceptions(): pass assert len(report.ingestion_stage_durations) == 1 key = next(iter(report.ingestion_stage_durations.keys())) - assert "Test Stage" in key[1] + assert "Ingestion" in key + assert "Test Stage" in key def test_ingestion_stage_context_report_handles_multiple_stages(): @@ -45,9 +46,9 @@ def test_ingestion_stage_context_report_handles_multiple_stages(): ) sorted_stages = list(sorted(report.ingestion_stage_durations.keys())) - assert "Test Stage 1" in sorted_stages[0][1] - assert "Test Stage 2" in sorted_stages[1][1] - assert "Test Stage 3" in sorted_stages[2][1] + assert "Test Stage 1" in sorted_stages[0] + assert "Test Stage 2" in sorted_stages[1] + assert "Test Stage 3" in sorted_stages[2] def test_ingestion_stage_context_report_handles_nested_stages(): @@ -64,14 +65,14 @@ def test_ingestion_stage_context_report_handles_nested_stages(): for duration in report.ingestion_stage_durations.values() ) sorted_stages = list(sorted(report.ingestion_stage_durations.keys())) - assert "Inner1" in sorted_stages[0][1] - assert "Inner2" in sorted_stages[1][1] - assert "Outer" in sorted_stages[2][1] + assert "Inner1" in sorted_stages[0] + assert "Inner2" in sorted_stages[1] + assert "Outer" in sorted_stages[2] # Check that outer stage duration >= sum of inner stage durations - outer_key = [k for k in report.ingestion_stage_durations if "Outer" in k[1]][0] - inner1_key = [k for k in report.ingestion_stage_durations if "Inner1" in k[1]][0] - inner2_key = [k for k in report.ingestion_stage_durations if "Inner2" in k[1]][0] + outer_key = [k for k in report.ingestion_stage_durations if "Outer" in k][0] + inner1_key = [k for k in report.ingestion_stage_durations if "Inner1" in k][0] + inner2_key = [k for k in report.ingestion_stage_durations if "Inner2" in k][0] outer_duration = report.ingestion_stage_durations[outer_key] inner1_duration = report.ingestion_stage_durations[inner1_key] @@ -96,6 +97,6 @@ def test_ingestion_stage_with_high_stage(): time.sleep(0.1) assert len(report.ingestion_stage_durations) == 1 key = next(iter(report.ingestion_stage_durations.keys())) - assert key[0] == IngestionHighStage.PROFILING - assert "Test Stage" in key[1] + assert "Profiling" in key + assert "Test Stage" in key assert report.ingestion_high_stage_seconds[IngestionHighStage.PROFILING] > 0