refactor(ingest): clean up pipeline init error handling (#6817)

This commit is contained in:
Harshal Sheth 2022-12-20 22:21:28 -05:00 committed by GitHub
parent 88e40a9069
commit 2c911ccf7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 48 deletions

View File

@ -77,8 +77,8 @@ class NoopWriteCallback(WriteCallback):
pass
SinkReportType = TypeVar("SinkReportType", bound=SinkReport)
SinkConfig = TypeVar("SinkConfig", bound=ConfigModel)
SinkReportType = TypeVar("SinkReportType", bound=SinkReport, covariant=True)
SinkConfig = TypeVar("SinkConfig", bound=ConfigModel, covariant=True)
Self = TypeVar("Self", bound="Sink")

View File

@ -1,3 +1,4 @@
import contextlib
import itertools
import logging
import os
@ -5,19 +6,23 @@ import platform
import sys
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, NoReturn, Optional, cast
from typing import Any, Dict, Iterable, Iterator, List, Optional, cast
import click
import humanfriendly
import psutil
import datahub
from datahub.configuration.common import IgnorableError, PipelineExecutionError
from datahub.configuration.common import (
ConfigModel,
IgnorableError,
PipelineExecutionError,
)
from datahub.ingestion.api.committable import CommitPolicy
from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope
from datahub.ingestion.api.pipeline_run_listener import PipelineRunListener
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.sink import Sink, WriteCallback
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
from datahub.ingestion.api.source import Extractor, Source
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.extractor.extractor_registry import extractor_registry
@ -101,6 +106,16 @@ class PipelineInitError(Exception):
pass
@contextlib.contextmanager
def _add_init_error_context(step: str) -> Iterator[None]:
"""Enriches any exceptions raised with information about the step that failed."""
try:
yield
except Exception as e:
raise PipelineInitError(f"Failed to {step}: {e}") from e
@dataclass
class CliReport(Report):
cli_version: str = datahub.nice_version_name()
@ -121,12 +136,9 @@ class Pipeline:
ctx: PipelineContext
source: Source
extractor: Extractor
sink: Sink
sink: Sink[ConfigModel, SinkReport]
transformers: List[Transformer]
def _raise_initialization_error(self, e: Exception, msg: str) -> NoReturn:
raise PipelineInitError(f"{msg}: {e}") from e
def __init__(
self,
config: PipelineConfig,
@ -146,7 +158,7 @@ class Pipeline:
self.last_time_printed = int(time.time())
self.cli_report = CliReport()
try:
with _add_init_error_context("set up framework context"):
self.ctx = PipelineContext(
run_id=self.config.run_id,
datahub_api=self.config.datahub_api,
@ -155,64 +167,43 @@ class Pipeline:
preview_mode=preview_mode,
pipeline_config=self.config,
)
except Exception as e:
self._raise_initialization_error(e, "Failed to set up framework context")
sink_type = self.config.sink.type
try:
with _add_init_error_context(f"find a registered sink for type {sink_type}"):
sink_class = sink_registry.get(sink_type)
except Exception as e:
self._raise_initialization_error(
e, f"Failed to find a registered sink for type {sink_type}"
)
try:
with _add_init_error_context(f"configure the sink ({sink_type})"):
sink_config = self.config.sink.dict().get("config") or {}
self.sink: Sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")
self.sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type {self.config.sink.type} ({sink_class}) configured")
logger.info(f"Sink configured successfully. {self.sink.configured()}")
except Exception as e:
self._raise_initialization_error(
e, f"Failed to configure sink ({sink_type})"
)
# once a sink is configured, we can configure reporting immediately to get observability
try:
with _add_init_error_context("configure reporters"):
self._configure_reporting(report_to, no_default_report)
except Exception as e:
self._raise_initialization_error(e, "Failed to configure reporters")
try:
source_type = self.config.source.type
source_type = self.config.source.type
with _add_init_error_context(
f"find a registered source for type {source_type}"
):
source_class = source_registry.get(source_type)
except Exception as e:
self._raise_initialization_error(e, "Failed to create source")
try:
self.source: Source = source_class.create(
with _add_init_error_context(f"configure the source ({source_type})"):
self.source = source_class.create(
self.config.source.dict().get("config", {}), self.ctx
)
logger.debug(f"Source type:{source_type},{source_class} configured")
logger.debug(f"Source type {source_type} ({source_class}) configured")
logger.info("Source configured successfully.")
except Exception as e:
self._raise_initialization_error(
e, f"Failed to configure source ({source_type})"
)
try:
extractor_class = extractor_registry.get(self.config.source.extractor)
extractor_type = self.config.source.extractor
with _add_init_error_context(f"configure the extractor ({extractor_type})"):
extractor_class = extractor_registry.get(extractor_type)
self.extractor = extractor_class(
self.config.source.extractor_config, self.ctx
)
except Exception as e:
self._raise_initialization_error(
e, f"Failed to configure extractor ({self.config.source.extractor})"
)
try:
with _add_init_error_context("configure transformers"):
self._configure_transforms()
except ValueError as e:
self._raise_initialization_error(e, "Failed to configure transformers")
def _configure_transforms(self) -> None:
self.transformers = []
@ -523,7 +514,7 @@ class Pipeline:
)
num_failures_sink = len(self.sink.get_report().failures)
click.secho(
f"{'' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with at least {num_failures_source+num_failures_sink} failures {'so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
f"{'' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with at least {num_failures_source+num_failures_sink} failures{' so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
fg=self._get_text_color(
running=currently_running,
failures=True,