fix(ingest): serialisation of structured report (#14973)

This commit is contained in:
Aseem Bansal 2025-10-10 19:29:56 +05:30 committed by GitHub
parent 602f40d01d
commit 7b5680efbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 21 deletions

View File

@ -4,7 +4,6 @@ from contextlib import AbstractContextManager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
from enum import Enum from enum import Enum
from typing import Tuple
from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict from datahub.utilities.stats_collections import TopKDict
@ -38,9 +37,7 @@ class IngestionStageReport:
ingestion_high_stage_seconds: dict[IngestionHighStage, float] = field( ingestion_high_stage_seconds: dict[IngestionHighStage, float] = field(
default_factory=lambda: defaultdict(float) default_factory=lambda: defaultdict(float)
) )
ingestion_stage_durations: TopKDict[Tuple[IngestionHighStage, str], float] = field( ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict)
default_factory=TopKDict
)
def new_stage( def new_stage(
self, stage: str, high_stage: IngestionHighStage = IngestionHighStage._UNDEFINED self, stage: str, high_stage: IngestionHighStage = IngestionHighStage._UNDEFINED
@ -81,9 +78,9 @@ class IngestionStageContext(AbstractContextManager):
f"Time spent in stage <{self._ingestion_stage}>: {elapsed} seconds", f"Time spent in stage <{self._ingestion_stage}>: {elapsed} seconds",
stacklevel=2, stacklevel=2,
) )
self._report.ingestion_stage_durations[ # Store tuple as string to avoid serialization errors
(self._high_stage, self._ingestion_stage) key = f"({self._high_stage.value}, {self._ingestion_stage})"
] = elapsed self._report.ingestion_stage_durations[key] = elapsed
else: else:
logger.info( logger.info(
f"Time spent in stage <{self._high_stage.value}>: {elapsed} seconds", f"Time spent in stage <{self._high_stage.value}>: {elapsed} seconds",

View File

@ -14,8 +14,8 @@ def test_ingestion_stage_context_records_duration():
pass pass
assert len(report.ingestion_stage_durations) == 1 assert len(report.ingestion_stage_durations) == 1
key = next(iter(report.ingestion_stage_durations.keys())) key = next(iter(report.ingestion_stage_durations.keys()))
assert key[0] == IngestionHighStage._UNDEFINED assert "Ingestion" in key
assert "Test Stage" in key[1] assert "Test Stage" in key
def test_ingestion_stage_context_handles_exceptions(): def test_ingestion_stage_context_handles_exceptions():
@ -27,7 +27,8 @@ def test_ingestion_stage_context_handles_exceptions():
pass pass
assert len(report.ingestion_stage_durations) == 1 assert len(report.ingestion_stage_durations) == 1
key = next(iter(report.ingestion_stage_durations.keys())) key = next(iter(report.ingestion_stage_durations.keys()))
assert "Test Stage" in key[1] assert "Ingestion" in key
assert "Test Stage" in key
def test_ingestion_stage_context_report_handles_multiple_stages(): def test_ingestion_stage_context_report_handles_multiple_stages():
@ -45,9 +46,9 @@ def test_ingestion_stage_context_report_handles_multiple_stages():
) )
sorted_stages = list(sorted(report.ingestion_stage_durations.keys())) sorted_stages = list(sorted(report.ingestion_stage_durations.keys()))
assert "Test Stage 1" in sorted_stages[0][1] assert "Test Stage 1" in sorted_stages[0]
assert "Test Stage 2" in sorted_stages[1][1] assert "Test Stage 2" in sorted_stages[1]
assert "Test Stage 3" in sorted_stages[2][1] assert "Test Stage 3" in sorted_stages[2]
def test_ingestion_stage_context_report_handles_nested_stages(): def test_ingestion_stage_context_report_handles_nested_stages():
@ -64,14 +65,14 @@ def test_ingestion_stage_context_report_handles_nested_stages():
for duration in report.ingestion_stage_durations.values() for duration in report.ingestion_stage_durations.values()
) )
sorted_stages = list(sorted(report.ingestion_stage_durations.keys())) sorted_stages = list(sorted(report.ingestion_stage_durations.keys()))
assert "Inner1" in sorted_stages[0][1] assert "Inner1" in sorted_stages[0]
assert "Inner2" in sorted_stages[1][1] assert "Inner2" in sorted_stages[1]
assert "Outer" in sorted_stages[2][1] assert "Outer" in sorted_stages[2]
# Check that outer stage duration >= sum of inner stage durations # Check that outer stage duration >= sum of inner stage durations
outer_key = [k for k in report.ingestion_stage_durations if "Outer" in k[1]][0] outer_key = [k for k in report.ingestion_stage_durations if "Outer" in k][0]
inner1_key = [k for k in report.ingestion_stage_durations if "Inner1" in k[1]][0] inner1_key = [k for k in report.ingestion_stage_durations if "Inner1" in k][0]
inner2_key = [k for k in report.ingestion_stage_durations if "Inner2" in k[1]][0] inner2_key = [k for k in report.ingestion_stage_durations if "Inner2" in k][0]
outer_duration = report.ingestion_stage_durations[outer_key] outer_duration = report.ingestion_stage_durations[outer_key]
inner1_duration = report.ingestion_stage_durations[inner1_key] inner1_duration = report.ingestion_stage_durations[inner1_key]
@ -96,6 +97,6 @@ def test_ingestion_stage_with_high_stage():
time.sleep(0.1) time.sleep(0.1)
assert len(report.ingestion_stage_durations) == 1 assert len(report.ingestion_stage_durations) == 1
key = next(iter(report.ingestion_stage_durations.keys())) key = next(iter(report.ingestion_stage_durations.keys()))
assert key[0] == IngestionHighStage.PROFILING assert "Profiling" in key
assert "Test Stage" in key[1] assert "Test Stage" in key
assert report.ingestion_high_stage_seconds[IngestionHighStage.PROFILING] > 0 assert report.ingestion_high_stage_seconds[IngestionHighStage.PROFILING] > 0