feat(ingest): add support for a event failure log, reporting cancelled runs on cli (#5694)

This commit is contained in:
Shirshanka Das 2022-08-21 23:15:34 -07:00 committed by GitHub
parent 2afa3a2b2e
commit 7a58381eb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 22 deletions

View File

@ -96,6 +96,9 @@ class DatahubIngestionRunSummaryProvider(PipelineRunListener):
sink_type = sink_config_holder.type
sink_class = sink_registry.get(sink_type)
sink_config = sink_config_holder.dict().get("config") or {}
if sink_type == "datahub-rest":
sink_config["use_sync_emitter_on_async_failure"] = True
sink: Sink = sink_class.create(sink_config, ctx)
return cls(sink, reporter_config.report_recipe, ctx)

View File

@ -4,7 +4,7 @@ import platform
import sys
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional, cast
import click
@ -22,19 +22,27 @@ from datahub.ingestion.reporting.reporting_provider_registry import (
reporting_provider_registry,
)
from datahub.ingestion.run.pipeline_config import PipelineConfig, ReporterConfig
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.ingestion.sink.sink_registry import sink_registry
from datahub.ingestion.source.source_registry import source_registry
from datahub.ingestion.transformer.transform_registry import transform_registry
from datahub.metadata.schema_classes import MetadataChangeProposalClass
from datahub.telemetry import stats, telemetry
logger = logging.getLogger(__name__)
class LoggingCallback(WriteCallback):
def __init__(self, name: str = "") -> None:
super().__init__()
self.name = name
def on_success(
self, record_envelope: RecordEnvelope, success_metadata: dict
) -> None:
logger.debug(f"sink wrote workunit {record_envelope.metadata['workunit_id']}")
logger.debug(
f"{self.name} sink wrote workunit {record_envelope.metadata['workunit_id']}"
)
def on_failure(
self,
@ -43,11 +51,48 @@ class LoggingCallback(WriteCallback):
failure_metadata: dict,
) -> None:
logger.error(
f"failed to write record with workunit {record_envelope.metadata['workunit_id']}"
f"{self.name} failed to write record with workunit {record_envelope.metadata['workunit_id']}"
f" with {failure_exception} and info {failure_metadata}"
)
class DeadLetterQueueCallback(WriteCallback):
def __init__(self, ctx: PipelineContext, config: Optional[FileSinkConfig]) -> None:
if not config:
config = FileSinkConfig.parse_obj({"filename": "failed_events.json"})
self.file_sink: FileSink = FileSink(ctx, config)
self.logging_callback = LoggingCallback(name="failure-queue")
logger.info(f"Failure logging enabled. Will log to {config.filename}.")
def on_success(
self, record_envelope: RecordEnvelope, success_metadata: dict
) -> None:
pass
def on_failure(
self,
record_envelope: RecordEnvelope,
failure_exception: Exception,
failure_metadata: dict,
) -> None:
if "workunit_id" in record_envelope.metadata:
if isinstance(record_envelope.record, MetadataChangeProposalClass):
mcp = cast(MetadataChangeProposalClass, record_envelope.record)
if mcp.systemMetadata:
if not mcp.systemMetadata.properties:
mcp.systemMetadata.properties = {}
if "workunit_id" not in mcp.systemMetadata.properties:
# update the workunit id
mcp.systemMetadata.properties[
"workunit_id"
] = record_envelope.metadata["workunit_id"]
record_envelope.record = mcp
self.file_sink.write_record_async(record_envelope, self.logging_callback)
def close(self) -> None:
self.file_sink.close()
class PipelineInitError(Exception):
pass
@ -230,7 +275,11 @@ class Pipeline:
status="FAILURE"
if self.source.get_report().failures
or self.sink.get_report().failures
else "SUCCESS",
else "SUCCESS"
if self.final_status == "completed"
else "CANCELLED"
if self.final_status == "cancelled"
else "UNKNOWN",
report=self._get_structured_report(),
ctx=self.ctx,
)
@ -270,10 +319,17 @@ class Pipeline:
return False
def run(self) -> None:
self.final_status = "unknown"
self._notify_reporters_on_ingestion_start()
callback = None
try:
callback = LoggingCallback()
callback = (
LoggingCallback()
if not self.config.failure_log.enabled
else DeadLetterQueueCallback(
self.ctx, self.config.failure_log.log_config
)
)
extractor: Extractor = self.extractor_class()
for wu in itertools.islice(
self.source.get_workunits(),
@ -292,8 +348,12 @@ class Pipeline:
if not self.dry_run:
self.sink.write_record_async(record_envelope, callback)
except RuntimeError:
raise
except SystemExit:
raise
except Exception as e:
logger.error(f"Failed to extract some records due to: {e}")
logger.error("Failed to process some records. Continuing.", e)
extractor.close()
if not self.dry_run:
@ -315,7 +375,15 @@ class Pipeline:
self.sink.close()
self.process_commits()
self.final_status = "completed"
except (SystemExit, RuntimeError) as e:
self.final_status = "cancelled"
logger.error("Caught error", e)
raise
finally:
if callback and hasattr(callback, "close"):
callback.close() # type: ignore
self._notify_reporters_on_ingestion_completion()
def transform(self, records: Iterable[RecordEnvelope]) -> Iterable[RecordEnvelope]:
@ -417,16 +485,18 @@ class Pipeline:
num_failures_source = self._count_all_vals(
self.source.get_report().failures
)
num_failures_sink = len(self.sink.get_report().failures)
click.secho(
f"{'' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_failures_source} failures {'so far' if currently_running else ''}; produced {workunits_produced} events",
f"{'' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_failures_source+num_failures_sink} failures {'so far' if currently_running else ''}; produced {workunits_produced} events",
fg="bright_red",
bold=True,
)
return 1
elif self.source.get_report().warnings or self.sink.get_report().warnings:
num_warn_source = self._count_all_vals(self.source.get_report().warnings)
num_warn_sink = len(self.sink.get_report().warnings)
click.secho(
f"{'' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_warn_source} warnings {'so far' if currently_running else ''}; produced {workunits_produced} events",
f"{'' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with {num_warn_source+num_warn_sink} warnings {'so far' if currently_running else ''}; produced {workunits_produced} events",
fg="yellow",
bold=True,
)

View File

@ -9,6 +9,7 @@ from datahub.cli.cli_utils import get_url_and_token
from datahub.configuration import config_loader
from datahub.configuration.common import ConfigModel, DynamicTypedConfig
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.ingestion.sink.file import FileSinkConfig
logger = logging.getLogger(__name__)
@ -24,6 +25,14 @@ class ReporterConfig(DynamicTypedConfig):
)
class FailureLoggingConfig(ConfigModel):
enabled: bool = Field(
False,
description="When enabled, records that fail to be sent to DataHub are logged to disk",
)
log_config: Optional[FileSinkConfig] = None
class PipelineConfig(ConfigModel):
# Once support for discriminated unions gets merged into Pydantic, we can
# simplify this configuration and validation.
@ -36,6 +45,7 @@ class PipelineConfig(ConfigModel):
run_id: str = "__DEFAULT_RUN_ID"
datahub_api: Optional[DatahubClientConfig] = None
pipeline_name: Optional[str] = None
failure_log: FailureLoggingConfig = FailureLoggingConfig()
_raw_dict: Optional[
dict

View File

@ -29,7 +29,8 @@ logger = logging.getLogger(__name__)
class DatahubRestSinkConfig(DatahubClientConfig):
max_pending_requests = 1000
max_pending_requests: int = 1000
use_sync_emitter_on_async_failure: bool = False
@dataclass
@ -158,15 +159,16 @@ class DatahubRestSink(Sink):
write_callback.on_success(record_envelope, {})
elif isinstance(e, OperationalError):
# only OperationalErrors should be ignored
# trim exception stacktraces in all cases when reporting
if "stackTrace" in e.info:
with contextlib.suppress(Exception):
e.info["stackTrace"] = "\n".join(
e.info["stackTrace"].split("\n")[:3]
)
if not self.treat_errors_as_warnings:
self.report.report_failure({"error": e.message, "info": e.info})
else:
# trim exception stacktraces when reporting warnings
if "stackTrace" in e.info:
with contextlib.suppress(Exception):
e.info["stackTrace"] = "\n".join(
e.info["stackTrace"].split("\n")[:2]
)
record = record_envelope.record
if isinstance(record, MetadataChangeProposalWrapper):
# include information about the entity that failed
@ -195,13 +197,23 @@ class DatahubRestSink(Sink):
write_callback: WriteCallback,
) -> None:
record = record_envelope.record
write_future = self.executor.submit(self.emitter.emit, record)
write_future.add_done_callback(
functools.partial(
self._write_done_callback, record_envelope, write_callback
try:
write_future = self.executor.submit(self.emitter.emit, record)
write_future.add_done_callback(
functools.partial(
self._write_done_callback, record_envelope, write_callback
)
)
)
self.report.pending_requests += 1
self.report.pending_requests += 1
except RuntimeError:
if self.config.use_sync_emitter_on_async_failure:
try:
(start, end) = self.emitter.emit(record)
write_callback.on_success(record_envelope, success_metadata={})
except Exception as e:
write_callback.on_failure(record_envelope, e, failure_metadata={})
else:
raise
def get_report(self) -> SinkReport:
return self.report