fix(ingest/pipeline): Fix for slow ingestion and incomplete ingestion report metrics (#14735)

This commit is contained in:
Tamas Nemeth 2025-09-11 16:07:47 +02:00 committed by GitHub
parent d0519ddce3
commit 01932d3f87
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 229 additions and 20 deletions

View File

@ -147,9 +147,6 @@ class Sink(Generic[SinkConfig, SinkReportType], Closeable, metaclass=ABCMeta):
def close(self) -> None:
pass
def flush(self) -> None:
pass
def configured(self) -> str:
"""Override this method to output a human-readable and scrubbed version of the configured sink"""
return ""

View File

@ -373,11 +373,13 @@ class Pipeline:
)
current_version = version_stats.client.current.version
logger.debug(f"""
logger.debug(
f"""
client_version: {current_version}
server_default_version: {server_default_version}
server_default_cli_ahead: True
""")
"""
)
self.source.get_report().warning(
title="Server default CLI version is ahead of CLI version",
@ -560,8 +562,9 @@ class Pipeline:
self._handle_uncaught_pipeline_exception(exc)
finally:
clear_global_warnings()
self.sink.flush()
self._notify_reporters_on_ingestion_completion()
# This can't be in the finally part because this should happen after context manager exists
self._notify_reporters_on_ingestion_completion()
def transform(self, records: Iterable[RecordEnvelope]) -> Iterable[RecordEnvelope]:
"""

View File

@ -5,7 +5,6 @@ import functools
import logging
import os
import threading
import time
import uuid
from enum import auto
from typing import List, Optional, Tuple, Union
@ -349,17 +348,6 @@ class DatahubRestSink(Sink[DatahubRestSinkConfig, DataHubRestSinkReport]):
RecordEnvelope(item, metadata={}), NoopWriteCallback()
)
def flush(self) -> None:
"""Wait for all pending records to be written."""
i = 0
while self.report.pending_requests > 0:
time.sleep(0.1)
i += 1
if i % 1000 == 0:
logger.info(
f"Waiting for {self.report.pending_requests} records to be written"
)
def close(self):
with self.report.main_thread_blocking_timer:
self.executor.shutdown()

View File

@ -1,5 +1,7 @@
import pathlib
from typing import Iterable, List, cast
import time
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Iterable, List, Optional, cast
from unittest.mock import MagicMock, patch
import pytest
@ -10,6 +12,7 @@ from datahub.configuration.common import DynamicTypedConfig
from datahub.ingestion.api.committable import CommitPolicy, Committable
from datahub.ingestion.api.common import RecordEnvelope
from datahub.ingestion.api.decorators import platform_name
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.api.workunit import MetadataWorkUnit
@ -577,3 +580,221 @@ def get_initial_mce() -> MetadataChangeEventClass:
def get_status_removed_aspect() -> StatusClass:
return StatusClass(removed=False)
class RealisticSinkReport(SinkReport):
"""Sink report that simulates the real DatahubRestSinkReport behavior"""
def __init__(self):
super().__init__()
self.pending_requests = (
0 # Start with 0, will be incremented by async operations
)
class RealisticDatahubRestSink(Sink):
"""
Realistic simulation of DatahubRestSink that uses an executor like the real implementation.
This simulates the async behavior that can cause timing issues.
"""
def __init__(self):
self.report = RealisticSinkReport()
self.executor = ThreadPoolExecutor(
max_workers=2, thread_name_prefix="sink-worker"
)
self.close_called = False
self.shutdown_complete = False
def write_record_async(
self,
record_envelope: RecordEnvelope,
write_callback: Optional[WriteCallback] = None,
) -> None:
"""Simulate async record writing like the real DatahubRestSink"""
# Increment pending requests when starting async work
self.report.pending_requests += 1
print(
f"📝 Starting async write - pending_requests now: {self.report.pending_requests}"
)
# Submit async work to executor
future = self.executor.submit(self._process_record, record_envelope)
# Add callback to decrement pending requests when done
future.add_done_callback(self._on_request_complete)
def _process_record(self, record_envelope: RecordEnvelope) -> None:
"""Simulate processing a record (like sending HTTP request)"""
# Simulate network delay
time.sleep(0.1)
print(f"🌐 Processed record: {record_envelope.record}")
def _on_request_complete(self, future: Future) -> None:
"""Callback when async request completes"""
self.report.pending_requests -= 1
print(
f"✅ Request completed - pending_requests now: {self.report.pending_requests}"
)
def close(self):
"""Simulate the real DatahubRestSink close() method"""
print("🔧 Starting sink close()...")
self.close_called = True
# Simulate the executor.shutdown() behavior
print(
f"⏳ Shutting down executor with {self.report.pending_requests} pending requests..."
)
self.executor.shutdown(wait=True) # Wait for all pending requests to complete
# After shutdown, pending_requests should be 0
self.report.pending_requests = 0
self.shutdown_complete = True
print("✅ Sink close() completed - all pending requests processed")
class MockReporter:
"""Mock reporter that tracks when completion notification is called"""
def __init__(self, sink=None):
self.completion_called = False
self.completion_pending_requests = None
self.sink = sink
self.start_called = False
def on_start(self, ctx):
"""Mock on_start method"""
self.start_called = True
print("📊 MockReporter.on_start() called")
def on_completion(self, status, report, ctx):
self.completion_called = True
# Check pending requests at the time of completion notification
if (
self.sink
and hasattr(self.sink, "report")
and hasattr(self.sink.report, "pending_requests")
):
self.completion_pending_requests = self.sink.report.pending_requests
print(
f"📊 Completion notification sees {self.completion_pending_requests} pending requests"
)
class TestSinkReportTimingOnClose:
"""Test class for validating sink report timing when sink.close() is called"""
def _create_real_pipeline_with_realistic_sink(self):
"""Create a real Pipeline instance with a realistic sink using Pipeline.create()"""
from datahub.ingestion.run.pipeline import Pipeline
# Create realistic sink
sink = RealisticDatahubRestSink()
# Create pipeline using Pipeline.create() like the existing tests do
# Use demo data source which is simpler and doesn't require external dependencies
pipeline = Pipeline.create(
{
"source": {
"type": "demo-data",
"config": {},
},
"sink": {"type": "console"}, # Use console sink to avoid network issues
}
)
# Replace the sink with our realistic sink and register it with the exit_stack
# This ensures the context manager calls close() on our realistic sink
pipeline.sink = pipeline.exit_stack.enter_context(sink)
return pipeline, sink
def _add_pending_requests_to_sink(
self, sink: RealisticDatahubRestSink, count: int = 3
) -> int:
"""Add some pending requests to the sink to simulate async work"""
print(f"📝 Adding {count} pending requests to sink...")
for i in range(count):
sink.write_record_async(RecordEnvelope(f"record_{i}", metadata={}))
# Give some time for work to start
time.sleep(0.05)
print(f"📊 Current pending requests: {sink.report.pending_requests}")
return sink.report.pending_requests
def test_sink_report_timing_on_close(self):
"""Test that validates completion notification runs with 0 pending requests after sink.close()"""
print("\n🧪 Testing sink report timing on close...")
# Create test pipeline with realistic sink
pipeline, sink = self._create_real_pipeline_with_realistic_sink()
# Add pending requests to simulate async work
pending_count = self._add_pending_requests_to_sink(sink, 3)
assert pending_count > 0, "❌ Expected pending requests to be added"
# Create mock reporter to track completion
reporter = MockReporter(sink=sink)
pipeline.reporters = [reporter]
# Create a wrapper that checks pending requests before calling the real method
original_notify = pipeline._notify_reporters_on_ingestion_completion
def notify_with_pending_check():
print("🔔 Calling _notify_reporters_on_ingestion_completion()...")
# Check pending requests before notifying reporters
if hasattr(pipeline.sink, "report") and hasattr(
pipeline.sink.report, "pending_requests"
):
pending_requests = pipeline.sink.report.pending_requests
print(
f"📊 Pending requests when _notify_reporters_on_ingestion_completion() runs: {pending_requests}"
)
# This is the key assertion - there should be no pending requests
assert pending_requests == 0, (
f"❌ Expected 0 pending requests when _notify_reporters_on_ingestion_completion() runs, "
f"but found {pending_requests}. This indicates a timing issue."
)
print(
"✅ No pending requests when _notify_reporters_on_ingestion_completion() runs"
)
# Call original method
original_notify()
# Replace the method temporarily
pipeline._notify_reporters_on_ingestion_completion = notify_with_pending_check
# Run the REAL Pipeline.run() method with the CORRECT behavior (completion notification after context manager)
print(
"📢 Running REAL Pipeline.run() with correct timing (completion notification after context manager)..."
)
pipeline.run()
# Verify the fix: sink.close() was called by context manager before _notify_reporters_on_ingestion_completion
assert sink.close_called, "❌ Sink close() should be called by context manager"
print("✅ Sink close() was called by context manager")
assert reporter.completion_called, "❌ Completion notification should be called"
print("✅ Completion notification was called")
# Verify no pending requests at completion (the key fix)
assert reporter.completion_pending_requests == 0, (
f"❌ Expected 0 pending requests at completion, got {reporter.completion_pending_requests}. "
"This indicates the timing fix is working correctly."
)
print("✅ No pending requests at completion notification")
# Verify the sink's pending_requests is also 0
assert sink.report.pending_requests == 0, (
f"❌ Sink should have 0 pending requests after close() completes, got {sink.report.pending_requests}"
)
print("✅ Sink has 0 pending requests after close()")
print(
"🎉 Sink report timing fix test passed! The fix works with realistic async behavior."
)