mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-03 12:16:10 +00:00
feat(ingest): add more fail-safes to stateful ingestion (#8111)
This commit is contained in:
parent
60dd9ef187
commit
690ed083d9
@ -70,14 +70,15 @@ An ingestion reporting state provider is responsible for saving and retrieving t
|
||||
associated with the ingestion runs of various jobs inside the source connector of the ingestion pipeline.
|
||||
The data model used for capturing the telemetry is [DatahubIngestionRunSummary](https://github.com/datahub-project/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/datajob/datahub/DatahubIngestionRunSummary.pdl).
|
||||
A reporting ingestion state provider needs to implement the [IngestionReportingProviderBase](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_reporting_provider_base.py)
|
||||
interface and register itself with datahub by adding an entry under `datahub.ingestion.checkpointing_provider.plugins`
|
||||
interface and register itself with datahub by adding an entry under `datahub.ingestion.reporting_provider.plugins`
|
||||
key of the entry_points section in [setup.py](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/setup.py)
|
||||
with its type and implementation class as shown below.
|
||||
```python
|
||||
entry_points = {
|
||||
# <snip other keys>"
|
||||
"datahub.ingestion.checkpointing_provider.plugins": [
|
||||
"datahub = datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:DatahubIngestionCheckpointingProvider",
|
||||
"datahub.ingestion.reporting_provider.plugins": [
|
||||
"datahub = datahub.ingestion.reporting.datahub_ingestion_run_summary_provider:DatahubIngestionRunSummaryProvider",
|
||||
"file = datahub.ingestion.reporting.file_reporter:FileReporter",
|
||||
],
|
||||
}
|
||||
```
|
||||
|
||||
@ -308,8 +308,7 @@ class Pipeline:
|
||||
status="CANCELLED"
|
||||
if self.final_status == "cancelled"
|
||||
else "FAILURE"
|
||||
if self.source.get_report().failures
|
||||
or self.sink.get_report().failures
|
||||
if self.has_failures()
|
||||
else "SUCCESS"
|
||||
if self.final_status == "completed"
|
||||
else "UNKNOWN",
|
||||
@ -385,6 +384,7 @@ class Pipeline:
|
||||
except SystemExit:
|
||||
raise
|
||||
except Exception as e:
|
||||
# TODO: Transformer errors should cause the pipeline to fail.
|
||||
logger.error(
|
||||
"Failed to process some records. Continuing.", exc_info=e
|
||||
)
|
||||
@ -410,7 +410,7 @@ class Pipeline:
|
||||
self.sink.close()
|
||||
self.process_commits()
|
||||
self.final_status = "completed"
|
||||
except (SystemExit, RuntimeError) as e:
|
||||
except (SystemExit, RuntimeError, KeyboardInterrupt) as e:
|
||||
self.final_status = "cancelled"
|
||||
logger.error("Caught error", exc_info=e)
|
||||
raise
|
||||
@ -534,6 +534,11 @@ class Pipeline:
|
||||
else:
|
||||
return "bright_green"
|
||||
|
||||
def has_failures(self) -> bool:
|
||||
return bool(
|
||||
self.source.get_report().failures or self.sink.get_report().failures
|
||||
)
|
||||
|
||||
def pretty_print_summary(
|
||||
self, warnings_as_failure: bool = False, currently_running: bool = False
|
||||
) -> int:
|
||||
|
||||
@ -128,27 +128,23 @@ class GenericCheckpointState(CheckpointStateBase):
|
||||
:return: (1-|intersection(self, old_checkpoint_state)| / |old_checkpoint_state|) * 100.0
|
||||
"""
|
||||
return compute_percent_entities_changed(
|
||||
[(self.urns, old_checkpoint_state.urns)]
|
||||
new_entities=self.urns, old_entities=old_checkpoint_state.urns
|
||||
)
|
||||
|
||||
|
||||
def compute_percent_entities_changed(
|
||||
new_old_entity_list: List[Tuple[List[str], List[str]]]
|
||||
new_entities: List[str], old_entities: List[str]
|
||||
) -> float:
|
||||
old_count_all = 0
|
||||
overlap_count_all = 0
|
||||
for new_entities, old_entities in new_old_entity_list:
|
||||
(overlap_count, old_count, _,) = get_entity_overlap_and_cardinalities(
|
||||
new_entities=new_entities, old_entities=old_entities
|
||||
)
|
||||
overlap_count_all += overlap_count
|
||||
old_count_all += old_count
|
||||
if old_count_all:
|
||||
return (1 - overlap_count_all / old_count_all) * 100.0
|
||||
(overlap_count, old_count, _,) = _get_entity_overlap_and_cardinalities(
|
||||
new_entities=new_entities, old_entities=old_entities
|
||||
)
|
||||
|
||||
if old_count:
|
||||
return (1 - overlap_count / old_count) * 100.0
|
||||
return 0.0
|
||||
|
||||
|
||||
def get_entity_overlap_and_cardinalities(
|
||||
def _get_entity_overlap_and_cardinalities(
|
||||
new_entities: List[str], old_entities: List[str]
|
||||
) -> Tuple[int, int, int]:
|
||||
new_set = set(new_entities)
|
||||
|
||||
@ -208,7 +208,7 @@ class StaleEntityRemovalHandler(
|
||||
if not self.is_checkpointing_enabled() or self._ignore_old_state():
|
||||
return
|
||||
logger.debug("Checking for stale entity removal.")
|
||||
last_checkpoint: Optional[Checkpoint] = self.state_provider.get_last_checkpoint(
|
||||
last_checkpoint = self.state_provider.get_last_checkpoint(
|
||||
self.job_id, self.state_type_class
|
||||
)
|
||||
if not last_checkpoint:
|
||||
@ -216,14 +216,15 @@ class StaleEntityRemovalHandler(
|
||||
cur_checkpoint = self.state_provider.get_current_checkpoint(self.job_id)
|
||||
assert cur_checkpoint is not None
|
||||
# Get the underlying states
|
||||
last_checkpoint_state = cast(GenericCheckpointState, last_checkpoint.state)
|
||||
last_checkpoint_state: GenericCheckpointState = last_checkpoint.state
|
||||
cur_checkpoint_state = cast(GenericCheckpointState, cur_checkpoint.state)
|
||||
|
||||
assert self.stateful_ingestion_config
|
||||
|
||||
# Check if the entity delta is below the fail-safe threshold.
|
||||
entity_difference_percent = cur_checkpoint_state.get_percent_entities_changed(
|
||||
last_checkpoint_state
|
||||
)
|
||||
assert self.stateful_ingestion_config
|
||||
if (
|
||||
entity_difference_percent
|
||||
> self.stateful_ingestion_config.fail_safe_threshold
|
||||
@ -234,11 +235,30 @@ class StaleEntityRemovalHandler(
|
||||
):
|
||||
# Log the failure. This would prevent the current state from getting committed.
|
||||
self.source.get_report().report_failure(
|
||||
"Stateful Ingestion",
|
||||
"stale-entity-removal",
|
||||
f"Will not soft-delete entities, since we'd be deleting {entity_difference_percent:.1f}% of the existing entities. "
|
||||
f"To force a deletion, increase the value of 'stateful_ingestion.fail_safe_threshold' (currently {self.stateful_ingestion_config.fail_safe_threshold})",
|
||||
)
|
||||
# Bail so that we don't emit the stale entity removal workunits.
|
||||
return
|
||||
|
||||
if self.source.get_report().events_produced == 0:
|
||||
# SUBTLE: By reporting this as a failure here, we also ensure that the
|
||||
# new (empty) state doesn't get committed.
|
||||
# TODO: Move back to using fail_safe_threshold once we're confident that we've squashed all the bugs.
|
||||
self.source.get_report().report_failure(
|
||||
"stale-entity-removal",
|
||||
"Skipping stale entity soft-deletion because the source produced no events. "
|
||||
"This is a fail-safe mechanism to prevent accidental deletion of all entities.",
|
||||
)
|
||||
return
|
||||
|
||||
# If the source already had a failure, skip soft-deletion.
|
||||
# TODO: Eventually, switch this to check if anything in the pipeline had a failure so far.
|
||||
if self.source.get_report().failures:
|
||||
self.source.get_report().report_warning(
|
||||
"stale-entity-removal",
|
||||
"Skipping stale entity soft-deletion since source already had failures.",
|
||||
)
|
||||
return
|
||||
|
||||
# Everything looks good, emit the soft-deletion workunits
|
||||
|
||||
@ -321,7 +321,7 @@ class StateProviderWrapper:
|
||||
# Base-class implementations for common state management tasks.
|
||||
def get_last_checkpoint(
|
||||
self, job_id: JobId, checkpoint_state_class: Type[StateType]
|
||||
) -> Optional[Checkpoint]:
|
||||
) -> Optional[Checkpoint[StateType]]:
|
||||
if not self.is_stateful_ingestion_configured() or (
|
||||
self.stateful_ingestion_config
|
||||
and self.stateful_ingestion_config.ignore_old_state
|
||||
|
||||
@ -11,7 +11,7 @@ from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import (
|
||||
JobId,
|
||||
)
|
||||
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
|
||||
from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass, StatusClass
|
||||
from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -115,14 +115,11 @@ class DatahubIngestionCheckpointingProvider(IngestionCheckpointingProviderBase):
|
||||
job_name,
|
||||
)
|
||||
|
||||
self.graph.emit_mcp(
|
||||
# We don't want the state payloads to show up in search. As such, we emit the
|
||||
# dataJob aspects as soft-deleted. This doesn't affect the ability to query
|
||||
# them using the timeseries API.
|
||||
MetadataChangeProposalWrapper(
|
||||
entityUrn=datajob_urn,
|
||||
aspect=StatusClass(removed=True),
|
||||
)
|
||||
# We don't want the state payloads to show up in search. As such, we emit the
|
||||
# dataJob aspects as soft-deleted. This doesn't affect the ability to query
|
||||
# them using the timeseries API.
|
||||
self.graph.soft_delete_entity(
|
||||
urn=datajob_urn,
|
||||
)
|
||||
self.graph.emit_mcp(
|
||||
MetadataChangeProposalWrapper(
|
||||
|
||||
@ -6,42 +6,43 @@ from datahub.ingestion.source.state.entity_removal_state import (
|
||||
compute_percent_entities_changed,
|
||||
)
|
||||
|
||||
EntList = List[str]
|
||||
OldNewEntLists = List[Tuple[List[str], List[str]]]
|
||||
|
||||
old_new_ent_tests: Dict[str, Tuple[OldNewEntLists, float]] = {
|
||||
"no_change_empty_old_and_new": ([([], [])], 0.0),
|
||||
"no_change_empty_old_and_non_empty_new": ([(["a"], [])], 0.0),
|
||||
new_old_ent_tests: Dict[str, Tuple[EntList, EntList, float]] = {
|
||||
"no_change_empty_old_and_new": ([], [], 0.0),
|
||||
"no_change_empty_old_and_non_empty_new": (["a"], [], 0.0),
|
||||
"no_change_non_empty_old_new_equals_old": (
|
||||
[(["a", "b", "c"], ["c", "b", "a"])],
|
||||
["a", "b", "c"],
|
||||
["c", "b", "a"],
|
||||
0.0,
|
||||
),
|
||||
"no_change_non_empty_old_new_superset_old": (
|
||||
[(["a", "b", "c", "d"], ["c", "b", "a"])],
|
||||
["a", "b", "c", "d"],
|
||||
["c", "b", "a"],
|
||||
0.0,
|
||||
),
|
||||
"change_25_percent_delta": ([(["a", "b", "c"], ["d", "c", "b", "a"])], 25.0),
|
||||
"change_25_percent_delta": (["a", "b", "c"], ["d", "c", "b", "a"], 25.0),
|
||||
"change_50_percent_delta": (
|
||||
[
|
||||
(
|
||||
["b", "a"],
|
||||
["a", "b", "c", "d"],
|
||||
)
|
||||
],
|
||||
["b", "a"],
|
||||
["a", "b", "c", "d"],
|
||||
50.0,
|
||||
),
|
||||
"change_75_percent_delta": ([(["a"], ["a", "b", "c", "d"])], 75.0),
|
||||
"change_100_percent_delta_empty_new": ([([], ["a", "b", "c", "d"])], 100.0),
|
||||
"change_100_percent_delta_non_empty_new": ([(["e"], ["a", "b", "c", "d"])], 100.0),
|
||||
"change_75_percent_delta": (["a"], ["a", "b", "c", "d"], 75.0),
|
||||
"change_100_percent_delta_empty_new": ([], ["a", "b", "c", "d"], 100.0),
|
||||
"change_100_percent_delta_non_empty_new": (["e"], ["a", "b", "c", "d"], 100.0),
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"new_old_entity_list, expected_percent_change",
|
||||
old_new_ent_tests.values(),
|
||||
ids=old_new_ent_tests.keys(),
|
||||
"new_entities, old_entities, expected_percent_change",
|
||||
new_old_ent_tests.values(),
|
||||
ids=new_old_ent_tests.keys(),
|
||||
)
|
||||
def test_change_percent(
|
||||
new_old_entity_list: OldNewEntLists, expected_percent_change: float
|
||||
new_entities: EntList, old_entities: EntList, expected_percent_change: float
|
||||
) -> None:
|
||||
actual_percent_change = compute_percent_entities_changed(new_old_entity_list)
|
||||
actual_percent_change = compute_percent_entities_changed(
|
||||
new_entities=new_entities, old_entities=old_entities
|
||||
)
|
||||
assert actual_percent_change == expected_percent_change
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user