From d0d09a09f8b70054c68f26373663d0424bd81413 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Fri, 11 Oct 2024 16:55:27 +0530 Subject: [PATCH] fix(ingest): ignore irrelevant urns from % change computation (#11583) --- .../source/state/entity_removal_state.py | 26 ++++++++++++++++++- .../state/stale_entity_removal_handler.py | 10 +++---- .../test_stale_entity_removal_handler.py | 18 +++++++++++++ 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py b/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py index f011aa7bdd..318395d4e6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py @@ -8,6 +8,11 @@ from datahub.utilities.checkpoint_state_util import CheckpointStateUtil from datahub.utilities.dedup_list import deduplicate_list from datahub.utilities.urns.urn import guess_entity_type +STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = { + "dataProcessInstance", + "query", +} + def pydantic_state_migrator(mapping: Dict[str, str]) -> classmethod: # mapping would be something like: @@ -127,8 +132,11 @@ class GenericCheckpointState(CheckpointStateBase): :param old_checkpoint_state: the old checkpoint state to compute the relative change percent against. :return: (1-|intersection(self, old_checkpoint_state)| / |old_checkpoint_state|) * 100.0 """ + + old_urns_filtered = filter_ignored_entity_types(old_checkpoint_state.urns) + return compute_percent_entities_changed( - new_entities=self.urns, old_entities=old_checkpoint_state.urns + new_entities=self.urns, old_entities=old_urns_filtered ) def urn_count(self) -> int: @@ -153,3 +161,19 @@ def _get_entity_overlap_and_cardinalities( new_set = set(new_entities) old_set = set(old_entities) return len(new_set.intersection(old_set)), len(old_set), len(new_set) + + +def filter_ignored_entity_types(urns: List[str]) -> List[str]: + # We previously stored ignored entity urns (e.g.dataProcessInstance) in state. + # For smoother transition from old checkpoint state, without requiring explicit + # setting of `fail_safe_threshold` due to removal of irrelevant urns from new state, + # here, we would ignore irrelevant urns from percentage entities changed computation + # This special handling can be removed after few months. + return [ + urn + for urn in urns + if not any( + urn.startswith(f"urn:li:{entityType}") + for entityType in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES + ) + ] diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index 9d77e13a0f..d4fcbf0992 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -11,7 +11,10 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.state.checkpoint import Checkpoint -from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState +from datahub.ingestion.source.state.entity_removal_state import ( + STATEFUL_INGESTION_IGNORED_ENTITY_TYPES, + GenericCheckpointState, +) from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfig, StatefulIngestionConfigBase, @@ -27,11 +30,6 @@ from datahub.utilities.urns.urn import guess_entity_type logger: logging.Logger = logging.getLogger(__name__) -STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = { - "dataProcessInstance", - "query", -} - class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig): """ diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stale_entity_removal_handler.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stale_entity_removal_handler.py index 62a42b954d..be2d8bac12 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stale_entity_removal_handler.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stale_entity_removal_handler.py @@ -4,6 +4,7 @@ import pytest from datahub.ingestion.source.state.entity_removal_state import ( compute_percent_entities_changed, + filter_ignored_entity_types, ) EntList = List[str] @@ -46,3 +47,20 @@ def test_change_percent( new_entities=new_entities, old_entities=old_entities ) assert actual_percent_change == expected_percent_change + + +def test_filter_ignored_entity_types(): + + assert filter_ignored_entity_types( + [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619", + "urn:li:query:query1", + ] + ) == [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + ]