feat(ingestion): Copy urns from previous checkpoint state on ingestion failure (#10347)

This commit is contained in:
Shubham Jagtap 2024-05-07 17:36:40 +05:30 committed by GitHub
parent d08f36f14b
commit ae3f0fd5ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 368 additions and 79 deletions

View File

@ -26,9 +26,7 @@ class IngestionCheckpointingProviderBase(StatefulCommittable[CheckpointJobStates
The base class for all checkpointing state provider implementations. The base class for all checkpointing state provider implementations.
""" """
def __init__( def __init__(self, name: str, commit_policy: CommitPolicy = CommitPolicy.ALWAYS):
self, name: str, commit_policy: CommitPolicy = CommitPolicy.ON_NO_ERRORS
):
# Set the initial state to an empty dict. # Set the initial state to an empty dict.
super().__init__(name, commit_policy, {}) super().__init__(name, commit_policy, {})

View File

@ -164,6 +164,9 @@ class StaleEntityRemovalHandler(
def is_checkpointing_enabled(self) -> bool: def is_checkpointing_enabled(self) -> bool:
return self.checkpointing_enabled return self.checkpointing_enabled
def _get_state_obj(self):
return self.state_type_class()
def create_checkpoint(self) -> Optional[Checkpoint]: def create_checkpoint(self) -> Optional[Checkpoint]:
if self.is_checkpointing_enabled() and not self._ignore_new_state(): if self.is_checkpointing_enabled() and not self._ignore_new_state():
assert self.stateful_ingestion_config is not None assert self.stateful_ingestion_config is not None
@ -172,7 +175,7 @@ class StaleEntityRemovalHandler(
job_name=self.job_id, job_name=self.job_id,
pipeline_name=self.pipeline_name, pipeline_name=self.pipeline_name,
run_id=self.run_id, run_id=self.run_id,
state=self.state_type_class(), state=self._get_state_obj(),
) )
return None return None
@ -255,9 +258,13 @@ class StaleEntityRemovalHandler(
# If the source already had a failure, skip soft-deletion. # If the source already had a failure, skip soft-deletion.
# TODO: Eventually, switch this to check if anything in the pipeline had a failure so far. # TODO: Eventually, switch this to check if anything in the pipeline had a failure so far.
if self.source.get_report().failures: if self.source.get_report().failures:
for urn in last_checkpoint_state.get_urns_not_in(
type="*", other_checkpoint_state=cur_checkpoint_state
):
self.add_entity_to_state("", urn)
self.source.get_report().report_warning( self.source.get_report().report_warning(
"stale-entity-removal", "stale-entity-removal",
"Skipping stale entity soft-deletion since source already had failures.", "Skipping stale entity soft-deletion and coping urns from last state since source already had failures.",
) )
return return

View File

@ -16,8 +16,8 @@
"config": "", "config": "",
"state": { "state": {
"formatVersion": "1.0", "formatVersion": "1.0",
"serde": "base85-bz2-json", "serde": "utf-8",
"payload": "LRx4!F+o`-Q(1w>5G4QrYoCBnWH=B60MH7jr`{?c0BA?5L)2-AGyu>6y;V<9hz%Mv0Bt1*)lOMzr>a0|Iq-4VtTsYONQsFPLn1EpdQS;HIy|&CvSAlRvAJwmtCEM+Rx(v_)~sVvkx3V@WX4O`=losC6yZWb2OL0@" "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}"
}, },
"runId": "dummy-test-stateful-ingestion" "runId": "dummy-test-stateful-ingestion"
} }

View File

@ -16,8 +16,8 @@
"config": "", "config": "",
"state": { "state": {
"formatVersion": "1.0", "formatVersion": "1.0",
"serde": "base85-bz2-json", "serde": "utf-8",
"payload": "LRx4!F+o`-Q(317h`0a%NgsevWH1l}0MH7jr`{?c0B9vdZ9%mLfYG4P6;f$2G%+v`9z&~6n|e(JEPC2_Iix~CA_im)jR-zsjEK*yo|HQz#IUUHtf@DYVEme-lUW9{Xmmt~y^2jCdyY95az!{$kf#WUxB" "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\"]}"
}, },
"runId": "dummy-test-stateful-ingestion" "runId": "dummy-test-stateful-ingestion"
} }

View File

@ -0,0 +1,26 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,dummy_stateful,prod),default_stale_entity_removal)",
"changeType": "UPSERT",
"aspectName": "datahubIngestionCheckpoint",
"aspect": {
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"pipelineName": "dummy_stateful",
"platformInstanceId": "",
"config": "",
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
}
}
]

View File

@ -0,0 +1,26 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,dummy_stateful,prod),default_stale_entity_removal)",
"changeType": "UPSERT",
"aspectName": "datahubIngestionCheckpoint",
"aspect": {
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"pipelineName": "dummy_stateful",
"platformInstanceId": "",
"config": "",
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
}
}
]

View File

@ -0,0 +1,34 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -0,0 +1,50 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -1,7 +1,9 @@
from dataclasses import dataclass, field as dataclass_field from dataclasses import dataclass, field as dataclass_field
from typing import Any, Dict, Iterable, List, Optional, cast from typing import Any, Dict, Iterable, List, Optional, cast
from unittest import mock
import pydantic import pydantic
import pytest
from freezegun import freeze_time from freezegun import freeze_time
from pydantic import Field from pydantic import Field
@ -56,6 +58,10 @@ class DummySourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field( stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Dummy source Ingestion Config." default=None, description="Dummy source Ingestion Config."
) )
report_failure: bool = Field(
default=False,
description="Should this dummy source report a failure.",
)
class DummySource(StatefulIngestionSourceBase): class DummySource(StatefulIngestionSourceBase):
@ -103,10 +109,23 @@ class DummySource(StatefulIngestionSourceBase):
aspect=StatusClass(removed=False), aspect=StatusClass(removed=False),
).as_workunit() ).as_workunit()
if self.source_config.report_failure:
self.reporter.report_failure("Dummy error", "Error")
def get_report(self) -> SourceReport: def get_report(self) -> SourceReport:
return self.reporter return self.reporter
@pytest.fixture(scope="module")
def mock_generic_checkpoint_state():
with mock.patch(
"datahub.ingestion.source.state.entity_removal_state.GenericCheckpointState"
) as mock_checkpoint_state:
checkpoint_state = mock_checkpoint_state.return_value
checkpoint_state.serde.return_value = "utf-8"
yield mock_checkpoint_state
@freeze_time(FROZEN_TIME) @freeze_time(FROZEN_TIME)
def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):
# test stateful ingestion using dummy source # test stateful ingestion using dummy source
@ -148,6 +167,10 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):
}, },
} }
with mock.patch(
"datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj"
) as mock_state:
mock_state.return_value = GenericCheckpointState(serde="utf-8")
pipeline_run1 = None pipeline_run1 = None
pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore
base_pipeline_config # type: ignore base_pipeline_config # type: ignore
@ -175,6 +198,10 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):
assert checkpoint1 assert checkpoint1
assert checkpoint1.state assert checkpoint1.state
with mock.patch(
"datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj"
) as mock_state:
mock_state.return_value = GenericCheckpointState(serde="utf-8")
pipeline_run2 = None pipeline_run2 = None
pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore
pipeline_run2_config["source"]["config"]["dataset_patterns"] = { pipeline_run2_config["source"]["config"]["dataset_patterns"] = {
@ -225,3 +252,124 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
] ]
assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns) assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns)
@freeze_time(FROZEN_TIME)
def test_stateful_ingestion_failure(pytestconfig, tmp_path, mock_time):
# test stateful ingestion using dummy source with pipeline execution failed in second ingestion
state_file_name: str = "checkpoint_state_mces_failure.json"
golden_state_file_name: str = "golden_test_checkpoint_state_failure.json"
golden_state_file_name_after_deleted: str = (
"golden_test_checkpoint_state_after_deleted_failure.json"
)
output_file_name: str = "dummy_mces_failure.json"
golden_file_name: str = "golden_test_stateful_ingestion_failure.json"
output_file_name_after_deleted: str = (
"dummy_mces_stateful_after_deleted_failure.json"
)
golden_file_name_after_deleted: str = (
"golden_test_stateful_ingestion_after_deleted_failure.json"
)
test_resources_dir = pytestconfig.rootpath / "tests/unit/stateful_ingestion/state"
base_pipeline_config = {
"run_id": "dummy-test-stateful-ingestion",
"pipeline_name": "dummy_stateful",
"source": {
"type": "tests.unit.stateful_ingestion.state.test_stateful_ingestion.DummySource",
"config": {
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"state_provider": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{state_file_name}",
},
},
},
},
},
"sink": {
"type": "file",
"config": {},
},
}
with mock.patch(
"datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj"
) as mock_state:
mock_state.return_value = GenericCheckpointState(serde="utf-8")
pipeline_run1 = None
pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore
base_pipeline_config # type: ignore
)
pipeline_run1_config["sink"]["config"][
"filename"
] = f"{tmp_path}/{output_file_name}"
pipeline_run1 = Pipeline.create(pipeline_run1_config)
pipeline_run1.run()
pipeline_run1.raise_from_status()
pipeline_run1.pretty_print_summary()
# validate both dummy source mces and checkpoint state mces files
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / output_file_name,
golden_path=f"{test_resources_dir}/{golden_file_name}",
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / state_file_name,
golden_path=f"{test_resources_dir}/{golden_state_file_name}",
)
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1)
assert checkpoint1
assert checkpoint1.state
with mock.patch(
"datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj"
) as mock_state:
mock_state.return_value = GenericCheckpointState(serde="utf-8")
pipeline_run2 = None
pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore
pipeline_run2_config["source"]["config"]["dataset_patterns"] = {
"allow": ["dummy_dataset1", "dummy_dataset2"],
}
pipeline_run2_config["source"]["config"]["report_failure"] = True
pipeline_run2_config["sink"]["config"][
"filename"
] = f"{tmp_path}/{output_file_name_after_deleted}"
pipeline_run2 = Pipeline.create(pipeline_run2_config)
pipeline_run2.run()
pipeline_run2.pretty_print_summary()
# validate both updated dummy source mces and checkpoint state mces files after deleting dataset
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / output_file_name_after_deleted,
golden_path=f"{test_resources_dir}/{golden_file_name_after_deleted}",
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / state_file_name,
golden_path=f"{test_resources_dir}/{golden_state_file_name_after_deleted}",
)
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2)
assert checkpoint2
assert checkpoint2.state
# Validate that all providers have committed successfully.
validate_all_providers_have_committed_successfully(
pipeline=pipeline_run1, expected_providers=1
)
validate_all_providers_have_committed_successfully(
pipeline=pipeline_run2, expected_providers=1
)
# Perform assertions on the states. The deleted table should be
# still part of the second state as pipeline run failed
state1 = cast(GenericCheckpointState, checkpoint1.state)
state2 = cast(GenericCheckpointState, checkpoint2.state)
assert state1 == state2