diff --git a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py index 6ae642296e..a767cdbee2 100644 --- a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py +++ b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py @@ -96,6 +96,9 @@ class DatahubIngestionRunSummaryProvider(PipelineRunListener): sink_type = sink_config_holder.type sink_class = sink_registry.get(sink_type) sink_config = sink_config_holder.dict().get("config") or {} + if sink_type == "datahub-rest": + sink_config["use_sync_emitter_on_async_failure"] = True + sink: Sink = sink_class.create(sink_config, ctx) return cls(sink, reporter_config.report_recipe, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 899adf4212..bceeea1467 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -4,7 +4,7 @@ import platform import sys import time from dataclasses import dataclass -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, cast import click @@ -22,19 +22,27 @@ from datahub.ingestion.reporting.reporting_provider_registry import ( reporting_provider_registry, ) from datahub.ingestion.run.pipeline_config import PipelineConfig, ReporterConfig +from datahub.ingestion.sink.file import FileSink, FileSinkConfig from datahub.ingestion.sink.sink_registry import sink_registry from datahub.ingestion.source.source_registry import source_registry from datahub.ingestion.transformer.transform_registry import transform_registry +from datahub.metadata.schema_classes import MetadataChangeProposalClass from datahub.telemetry import stats, telemetry logger = logging.getLogger(__name__) class LoggingCallback(WriteCallback): + def __init__(self, name: str = "") -> None: + super().__init__() + self.name = name + def on_success( self, record_envelope: RecordEnvelope, success_metadata: dict ) -> None: - logger.debug(f"sink wrote workunit {record_envelope.metadata['workunit_id']}") + logger.debug( + f"{self.name} sink wrote workunit {record_envelope.metadata['workunit_id']}" + ) def on_failure( self, @@ -43,11 +51,48 @@ class LoggingCallback(WriteCallback): failure_metadata: dict, ) -> None: logger.error( - f"failed to write record with workunit {record_envelope.metadata['workunit_id']}" + f"{self.name} failed to write record with workunit {record_envelope.metadata['workunit_id']}" f" with {failure_exception} and info {failure_metadata}" ) +class DeadLetterQueueCallback(WriteCallback): + def __init__(self, ctx: PipelineContext, config: Optional[FileSinkConfig]) -> None: + if not config: + config = FileSinkConfig.parse_obj({"filename": "failed_events.json"}) + self.file_sink: FileSink = FileSink(ctx, config) + self.logging_callback = LoggingCallback(name="failure-queue") + logger.info(f"Failure logging enabled. Will log to {config.filename}.") + + def on_success( + self, record_envelope: RecordEnvelope, success_metadata: dict + ) -> None: + pass + + def on_failure( + self, + record_envelope: RecordEnvelope, + failure_exception: Exception, + failure_metadata: dict, + ) -> None: + if "workunit_id" in record_envelope.metadata: + if isinstance(record_envelope.record, MetadataChangeProposalClass): + mcp = cast(MetadataChangeProposalClass, record_envelope.record) + if mcp.systemMetadata: + if not mcp.systemMetadata.properties: + mcp.systemMetadata.properties = {} + if "workunit_id" not in mcp.systemMetadata.properties: + # update the workunit id + mcp.systemMetadata.properties[ + "workunit_id" + ] = record_envelope.metadata["workunit_id"] + record_envelope.record = mcp + self.file_sink.write_record_async(record_envelope, self.logging_callback) + + def close(self) -> None: + self.file_sink.close() + + class PipelineInitError(Exception): pass @@ -230,7 +275,11 @@ class Pipeline: status="FAILURE" if self.source.get_report().failures or self.sink.get_report().failures - else "SUCCESS", + else "SUCCESS" + if self.final_status == "completed" + else "CANCELLED" + if self.final_status == "cancelled" + else "UNKNOWN", report=self._get_structured_report(), ctx=self.ctx, ) @@ -270,10 +319,17 @@ class Pipeline: return False def run(self) -> None: - + self.final_status = "unknown" self._notify_reporters_on_ingestion_start() + callback = None try: - callback = LoggingCallback() + callback = ( + LoggingCallback() + if not self.config.failure_log.enabled + else DeadLetterQueueCallback( + self.ctx, self.config.failure_log.log_config + ) + ) extractor: Extractor = self.extractor_class() for wu in itertools.islice( self.source.get_workunits(), @@ -292,8 +348,12 @@ class Pipeline: if not self.dry_run: self.sink.write_record_async(record_envelope, callback) + except RuntimeError: + raise + except SystemExit: + raise except Exception as e: - logger.error(f"Failed to extract some records due to: {e}") + logger.error("Failed to process some records. Continuing.", e) extractor.close() if not self.dry_run: @@ -315,7 +375,15 @@ class Pipeline: self.sink.close() self.process_commits() + self.final_status = "completed" + except (SystemExit, RuntimeError) as e: + self.final_status = "cancelled" + logger.error("Caught error", e) + raise finally: + if callback and hasattr(callback, "close"): + callback.close() # type: ignore + self._notify_reporters_on_ingestion_completion() def transform(self, records: Iterable[RecordEnvelope]) -> Iterable[RecordEnvelope]: @@ -417,16 +485,18 @@ class Pipeline: num_failures_source = self._count_all_vals( self.source.get_report().failures ) + num_failures_sink = len(self.sink.get_report().failures) click.secho( - f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_failures_source} failures {'so far' if currently_running else ''}; produced {workunits_produced} events", + f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_failures_source+num_failures_sink} failures {'so far' if currently_running else ''}; produced {workunits_produced} events", fg="bright_red", bold=True, ) return 1 elif self.source.get_report().warnings or self.sink.get_report().warnings: num_warn_source = self._count_all_vals(self.source.get_report().warnings) + num_warn_sink = len(self.sink.get_report().warnings) click.secho( - f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_warn_source} warnings {'so far' if currently_running else ''}; produced {workunits_produced} events", + f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_warn_source+num_warn_sink} warnings {'so far' if currently_running else ''}; produced {workunits_produced} events", fg="yellow", bold=True, ) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index 79760fc3ff..58e523a104 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -9,6 +9,7 @@ from datahub.cli.cli_utils import get_url_and_token from datahub.configuration import config_loader from datahub.configuration.common import ConfigModel, DynamicTypedConfig from datahub.ingestion.graph.client import DatahubClientConfig +from datahub.ingestion.sink.file import FileSinkConfig logger = logging.getLogger(__name__) @@ -24,6 +25,14 @@ class ReporterConfig(DynamicTypedConfig): ) +class FailureLoggingConfig(ConfigModel): + enabled: bool = Field( + False, + description="When enabled, records that fail to be sent to DataHub are logged to disk", + ) + log_config: Optional[FileSinkConfig] = None + + class PipelineConfig(ConfigModel): # Once support for discriminated unions gets merged into Pydantic, we can # simplify this configuration and validation. @@ -36,6 +45,7 @@ class PipelineConfig(ConfigModel): run_id: str = "__DEFAULT_RUN_ID" datahub_api: Optional[DatahubClientConfig] = None pipeline_name: Optional[str] = None + failure_log: FailureLoggingConfig = FailureLoggingConfig() _raw_dict: Optional[ dict diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 06e6e3d8c3..0f99556c91 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -29,7 +29,8 @@ logger = logging.getLogger(__name__) class DatahubRestSinkConfig(DatahubClientConfig): - max_pending_requests = 1000 + max_pending_requests: int = 1000 + use_sync_emitter_on_async_failure: bool = False @dataclass @@ -158,15 +159,16 @@ class DatahubRestSink(Sink): write_callback.on_success(record_envelope, {}) elif isinstance(e, OperationalError): # only OperationalErrors should be ignored + # trim exception stacktraces in all cases when reporting + if "stackTrace" in e.info: + with contextlib.suppress(Exception): + e.info["stackTrace"] = "\n".join( + e.info["stackTrace"].split("\n")[:3] + ) + if not self.treat_errors_as_warnings: self.report.report_failure({"error": e.message, "info": e.info}) else: - # trim exception stacktraces when reporting warnings - if "stackTrace" in e.info: - with contextlib.suppress(Exception): - e.info["stackTrace"] = "\n".join( - e.info["stackTrace"].split("\n")[:2] - ) record = record_envelope.record if isinstance(record, MetadataChangeProposalWrapper): # include information about the entity that failed @@ -195,13 +197,23 @@ class DatahubRestSink(Sink): write_callback: WriteCallback, ) -> None: record = record_envelope.record - write_future = self.executor.submit(self.emitter.emit, record) - write_future.add_done_callback( - functools.partial( - self._write_done_callback, record_envelope, write_callback + try: + write_future = self.executor.submit(self.emitter.emit, record) + write_future.add_done_callback( + functools.partial( + self._write_done_callback, record_envelope, write_callback + ) ) - ) - self.report.pending_requests += 1 + self.report.pending_requests += 1 + except RuntimeError: + if self.config.use_sync_emitter_on_async_failure: + try: + (start, end) = self.emitter.emit(record) + write_callback.on_success(record_envelope, success_metadata={}) + except Exception as e: + write_callback.on_failure(record_envelope, e, failure_metadata={}) + else: + raise def get_report(self) -> SinkReport: return self.report