feat(ingest): enable stateful ingestion safety threshold (#10516)

This commit is contained in:
Harshal Sheth 2024-05-29 12:01:04 -07:00 committed by GitHub
parent 013425a9a9
commit 37bc423b50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 18 additions and 61 deletions

View File

@ -34,4 +34,5 @@ def inspect(pipeline_name: str, platform: str) -> None:
click.secho("No ingestion state found.", fg="red")
exit(1)
logger.info(f"Found ingestion state with {len(checkpoint.state.urns)} URNs.")
click.echo(json.dumps(checkpoint.state.urns, indent=2))

View File

@ -1445,7 +1445,7 @@ class DBTSourceBase(StatefulIngestionSourceBase):
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=upstreams_lineage_class,
).as_workunit()
).as_workunit(is_primary_source=False)
def extract_query_tag_aspects(
self,

View File

@ -37,7 +37,7 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
description="Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
fail_safe_threshold: float = pydantic.Field(
default=100.0,
default=40.0,
description="Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
le=100.0,
ge=0.0,
@ -224,6 +224,8 @@ class StaleEntityRemovalHandler(
assert self.stateful_ingestion_config
copy_previous_state_and_fail = False
# Check if the entity delta is below the fail-safe threshold.
entity_difference_percent = cur_checkpoint_state.get_percent_entities_changed(
last_checkpoint_state
@ -242,30 +244,32 @@ class StaleEntityRemovalHandler(
f"Will not soft-delete entities, since we'd be deleting {entity_difference_percent:.1f}% of the existing entities. "
f"To force a deletion, increase the value of 'stateful_ingestion.fail_safe_threshold' (currently {self.stateful_ingestion_config.fail_safe_threshold})",
)
return
copy_previous_state_and_fail = True
if self.source.get_report().events_produced == 0:
# SUBTLE: By reporting this as a failure here, we also ensure that the
# new (empty) state doesn't get committed.
# TODO: Move back to using fail_safe_threshold once we're confident that we've squashed all the bugs.
self.source.get_report().report_failure(
"stale-entity-removal",
"Skipping stale entity soft-deletion because the source produced no events. "
"This is a fail-safe mechanism to prevent accidental deletion of all entities.",
)
return
copy_previous_state_and_fail = True
# If the source already had a failure, skip soft-deletion.
# TODO: Eventually, switch this to check if anything in the pipeline had a failure so far.
# TODO: Eventually, switch this to check if anything in the pipeline had a failure so far, not just the source.
if self.source.get_report().failures:
for urn in last_checkpoint_state.get_urns_not_in(
type="*", other_checkpoint_state=cur_checkpoint_state
):
self.add_entity_to_state("", urn)
self.source.get_report().report_warning(
"stale-entity-removal",
"Skipping stale entity soft-deletion and coping urns from last state since source already had failures.",
"Skipping stale entity soft-deletion and copying urns from last state since source already had failures.",
)
copy_previous_state_and_fail = True
if copy_previous_state_and_fail:
logger.info(
f"Copying urns from last state (size {last_checkpoint_state.urns}) to current state (size {cur_checkpoint_state.urns}) "
"to ensure stale entities from previous runs are deleted on the next successful run."
)
for urn in last_checkpoint_state.urns:
self.add_entity_to_state("", urn)
return
# Everything looks good, emit the soft-deletion workunits

View File

@ -4258,54 +4258,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.dbt_postgres.an-aliased-view-for-monthly-billing,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-test-with-non-incremental-lineage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.dbt_postgres.an-aliased-view-for-payments,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-test-with-non-incremental-lineage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.dbt_postgres.payments_by_customer_by_month,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-test-with-non-incremental-lineage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:dbt:column_tag",