diff --git a/metadata-ingestion/src/datahub/ingestion/api/sink.py b/metadata-ingestion/src/datahub/ingestion/api/sink.py index c6eb2ede95..44148c751d 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/sink.py +++ b/metadata-ingestion/src/datahub/ingestion/api/sink.py @@ -147,9 +147,6 @@ class Sink(Generic[SinkConfig, SinkReportType], Closeable, metaclass=ABCMeta): def close(self) -> None: pass - def flush(self) -> None: - pass - def configured(self) -> str: """Override this method to output a human-readable and scrubbed version of the configured sink""" return "" diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index db261f01ef..931cecccde 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -373,11 +373,13 @@ class Pipeline: ) current_version = version_stats.client.current.version - logger.debug(f""" + logger.debug( + f""" client_version: {current_version} server_default_version: {server_default_version} server_default_cli_ahead: True - """) + """ + ) self.source.get_report().warning( title="Server default CLI version is ahead of CLI version", @@ -560,8 +562,9 @@ class Pipeline: self._handle_uncaught_pipeline_exception(exc) finally: clear_global_warnings() - self.sink.flush() - self._notify_reporters_on_ingestion_completion() + + # This can't be in the finally part because this should happen after context manager exists + self._notify_reporters_on_ingestion_completion() def transform(self, records: Iterable[RecordEnvelope]) -> Iterable[RecordEnvelope]: """ diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 828ad4b2b2..bee90ee732 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -5,7 +5,6 @@ import functools import logging import os import threading -import time import uuid from enum import auto from typing import List, Optional, Tuple, Union @@ -349,17 +348,6 @@ class DatahubRestSink(Sink[DatahubRestSinkConfig, DataHubRestSinkReport]): RecordEnvelope(item, metadata={}), NoopWriteCallback() ) - def flush(self) -> None: - """Wait for all pending records to be written.""" - i = 0 - while self.report.pending_requests > 0: - time.sleep(0.1) - i += 1 - if i % 1000 == 0: - logger.info( - f"Waiting for {self.report.pending_requests} records to be written" - ) - def close(self): with self.report.main_thread_blocking_timer: self.executor.shutdown() diff --git a/metadata-ingestion/tests/unit/api/test_pipeline.py b/metadata-ingestion/tests/unit/api/test_pipeline.py index f44b223d18..6587b9c39f 100644 --- a/metadata-ingestion/tests/unit/api/test_pipeline.py +++ b/metadata-ingestion/tests/unit/api/test_pipeline.py @@ -1,5 +1,7 @@ import pathlib -from typing import Iterable, List, cast +import time +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Iterable, List, Optional, cast from unittest.mock import MagicMock, patch import pytest @@ -10,6 +12,7 @@ from datahub.configuration.common import DynamicTypedConfig from datahub.ingestion.api.committable import CommitPolicy, Committable from datahub.ingestion.api.common import RecordEnvelope from datahub.ingestion.api.decorators import platform_name +from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.transform import Transformer from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -577,3 +580,221 @@ def get_initial_mce() -> MetadataChangeEventClass: def get_status_removed_aspect() -> StatusClass: return StatusClass(removed=False) + + +class RealisticSinkReport(SinkReport): + """Sink report that simulates the real DatahubRestSinkReport behavior""" + + def __init__(self): + super().__init__() + self.pending_requests = ( + 0 # Start with 0, will be incremented by async operations + ) + + +class RealisticDatahubRestSink(Sink): + """ + Realistic simulation of DatahubRestSink that uses an executor like the real implementation. + This simulates the async behavior that can cause timing issues. + """ + + def __init__(self): + self.report = RealisticSinkReport() + self.executor = ThreadPoolExecutor( + max_workers=2, thread_name_prefix="sink-worker" + ) + self.close_called = False + self.shutdown_complete = False + + def write_record_async( + self, + record_envelope: RecordEnvelope, + write_callback: Optional[WriteCallback] = None, + ) -> None: + """Simulate async record writing like the real DatahubRestSink""" + # Increment pending requests when starting async work + self.report.pending_requests += 1 + print( + f"๐Ÿ“ Starting async write - pending_requests now: {self.report.pending_requests}" + ) + + # Submit async work to executor + future = self.executor.submit(self._process_record, record_envelope) + + # Add callback to decrement pending requests when done + future.add_done_callback(self._on_request_complete) + + def _process_record(self, record_envelope: RecordEnvelope) -> None: + """Simulate processing a record (like sending HTTP request)""" + # Simulate network delay + time.sleep(0.1) + print(f"๐ŸŒ Processed record: {record_envelope.record}") + + def _on_request_complete(self, future: Future) -> None: + """Callback when async request completes""" + self.report.pending_requests -= 1 + print( + f"โœ… Request completed - pending_requests now: {self.report.pending_requests}" + ) + + def close(self): + """Simulate the real DatahubRestSink close() method""" + print("๐Ÿ”ง Starting sink close()...") + self.close_called = True + + # Simulate the executor.shutdown() behavior + print( + f"โณ Shutting down executor with {self.report.pending_requests} pending requests..." + ) + self.executor.shutdown(wait=True) # Wait for all pending requests to complete + + # After shutdown, pending_requests should be 0 + self.report.pending_requests = 0 + self.shutdown_complete = True + print("โœ… Sink close() completed - all pending requests processed") + + +class MockReporter: + """Mock reporter that tracks when completion notification is called""" + + def __init__(self, sink=None): + self.completion_called = False + self.completion_pending_requests = None + self.sink = sink + self.start_called = False + + def on_start(self, ctx): + """Mock on_start method""" + self.start_called = True + print("๐Ÿ“Š MockReporter.on_start() called") + + def on_completion(self, status, report, ctx): + self.completion_called = True + # Check pending requests at the time of completion notification + if ( + self.sink + and hasattr(self.sink, "report") + and hasattr(self.sink.report, "pending_requests") + ): + self.completion_pending_requests = self.sink.report.pending_requests + print( + f"๐Ÿ“Š Completion notification sees {self.completion_pending_requests} pending requests" + ) + + +class TestSinkReportTimingOnClose: + """Test class for validating sink report timing when sink.close() is called""" + + def _create_real_pipeline_with_realistic_sink(self): + """Create a real Pipeline instance with a realistic sink using Pipeline.create()""" + + from datahub.ingestion.run.pipeline import Pipeline + + # Create realistic sink + sink = RealisticDatahubRestSink() + + # Create pipeline using Pipeline.create() like the existing tests do + # Use demo data source which is simpler and doesn't require external dependencies + pipeline = Pipeline.create( + { + "source": { + "type": "demo-data", + "config": {}, + }, + "sink": {"type": "console"}, # Use console sink to avoid network issues + } + ) + + # Replace the sink with our realistic sink and register it with the exit_stack + # This ensures the context manager calls close() on our realistic sink + pipeline.sink = pipeline.exit_stack.enter_context(sink) + + return pipeline, sink + + def _add_pending_requests_to_sink( + self, sink: RealisticDatahubRestSink, count: int = 3 + ) -> int: + """Add some pending requests to the sink to simulate async work""" + print(f"๐Ÿ“ Adding {count} pending requests to sink...") + for i in range(count): + sink.write_record_async(RecordEnvelope(f"record_{i}", metadata={})) + + # Give some time for work to start + time.sleep(0.05) + print(f"๐Ÿ“Š Current pending requests: {sink.report.pending_requests}") + return sink.report.pending_requests + + def test_sink_report_timing_on_close(self): + """Test that validates completion notification runs with 0 pending requests after sink.close()""" + print("\n๐Ÿงช Testing sink report timing on close...") + + # Create test pipeline with realistic sink + pipeline, sink = self._create_real_pipeline_with_realistic_sink() + + # Add pending requests to simulate async work + pending_count = self._add_pending_requests_to_sink(sink, 3) + assert pending_count > 0, "โŒ Expected pending requests to be added" + + # Create mock reporter to track completion + reporter = MockReporter(sink=sink) + pipeline.reporters = [reporter] + + # Create a wrapper that checks pending requests before calling the real method + original_notify = pipeline._notify_reporters_on_ingestion_completion + + def notify_with_pending_check(): + print("๐Ÿ”” Calling _notify_reporters_on_ingestion_completion()...") + + # Check pending requests before notifying reporters + if hasattr(pipeline.sink, "report") and hasattr( + pipeline.sink.report, "pending_requests" + ): + pending_requests = pipeline.sink.report.pending_requests + print( + f"๐Ÿ“Š Pending requests when _notify_reporters_on_ingestion_completion() runs: {pending_requests}" + ) + + # This is the key assertion - there should be no pending requests + assert pending_requests == 0, ( + f"โŒ Expected 0 pending requests when _notify_reporters_on_ingestion_completion() runs, " + f"but found {pending_requests}. This indicates a timing issue." + ) + print( + "โœ… No pending requests when _notify_reporters_on_ingestion_completion() runs" + ) + + # Call original method + original_notify() + + # Replace the method temporarily + pipeline._notify_reporters_on_ingestion_completion = notify_with_pending_check + + # Run the REAL Pipeline.run() method with the CORRECT behavior (completion notification after context manager) + print( + "๐Ÿ“ข Running REAL Pipeline.run() with correct timing (completion notification after context manager)..." + ) + pipeline.run() + + # Verify the fix: sink.close() was called by context manager before _notify_reporters_on_ingestion_completion + assert sink.close_called, "โŒ Sink close() should be called by context manager" + print("โœ… Sink close() was called by context manager") + + assert reporter.completion_called, "โŒ Completion notification should be called" + print("โœ… Completion notification was called") + + # Verify no pending requests at completion (the key fix) + assert reporter.completion_pending_requests == 0, ( + f"โŒ Expected 0 pending requests at completion, got {reporter.completion_pending_requests}. " + "This indicates the timing fix is working correctly." + ) + print("โœ… No pending requests at completion notification") + + # Verify the sink's pending_requests is also 0 + assert sink.report.pending_requests == 0, ( + f"โŒ Sink should have 0 pending requests after close() completes, got {sink.report.pending_requests}" + ) + print("โœ… Sink has 0 pending requests after close()") + + print( + "๐ŸŽ‰ Sink report timing fix test passed! The fix works with realistic async behavior." + )