fix(ingest): ignore irrelevant urns from % change computation (#11583)

This commit is contained in:
Mayuri Nehate 2024-10-11 16:55:27 +05:30 committed by GitHub
parent 43c185df01
commit d0d09a09f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 47 additions and 7 deletions

View File

@ -8,6 +8,11 @@ from datahub.utilities.checkpoint_state_util import CheckpointStateUtil
from datahub.utilities.dedup_list import deduplicate_list
from datahub.utilities.urns.urn import guess_entity_type
STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = {
"dataProcessInstance",
"query",
}
def pydantic_state_migrator(mapping: Dict[str, str]) -> classmethod:
# mapping would be something like:
@ -127,8 +132,11 @@ class GenericCheckpointState(CheckpointStateBase):
:param old_checkpoint_state: the old checkpoint state to compute the relative change percent against.
:return: (1-|intersection(self, old_checkpoint_state)| / |old_checkpoint_state|) * 100.0
"""
old_urns_filtered = filter_ignored_entity_types(old_checkpoint_state.urns)
return compute_percent_entities_changed(
new_entities=self.urns, old_entities=old_checkpoint_state.urns
new_entities=self.urns, old_entities=old_urns_filtered
)
def urn_count(self) -> int:
@ -153,3 +161,19 @@ def _get_entity_overlap_and_cardinalities(
new_set = set(new_entities)
old_set = set(old_entities)
return len(new_set.intersection(old_set)), len(old_set), len(new_set)
def filter_ignored_entity_types(urns: List[str]) -> List[str]:
# We previously stored ignored entity urns (e.g.dataProcessInstance) in state.
# For smoother transition from old checkpoint state, without requiring explicit
# setting of `fail_safe_threshold` due to removal of irrelevant urns from new state,
# here, we would ignore irrelevant urns from percentage entities changed computation
# This special handling can be removed after few months.
return [
urn
for urn in urns
if not any(
urn.startswith(f"urn:li:{entityType}")
for entityType in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES
)
]

View File

@ -11,7 +11,10 @@ from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.entity_removal_state import (
STATEFUL_INGESTION_IGNORED_ENTITY_TYPES,
GenericCheckpointState,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfig,
StatefulIngestionConfigBase,
@ -27,11 +30,6 @@ from datahub.utilities.urns.urn import guess_entity_type
logger: logging.Logger = logging.getLogger(__name__)
STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = {
"dataProcessInstance",
"query",
}
class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
"""

View File

@ -4,6 +4,7 @@ import pytest
from datahub.ingestion.source.state.entity_removal_state import (
compute_percent_entities_changed,
filter_ignored_entity_types,
)
EntList = List[str]
@ -46,3 +47,20 @@ def test_change_percent(
new_entities=new_entities, old_entities=old_entities
)
assert actual_percent_change == expected_percent_change
def test_filter_ignored_entity_types():
assert filter_ignored_entity_types(
[
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619",
"urn:li:query:query1",
]
) == [
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
]