From fa2ab1bcee01143bc20b896facdb5a7795dec5e7 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 27 Jun 2024 12:07:28 -0700 Subject: [PATCH] fix(ingest): add status aspect to dataProcessInstance (#10757) --- .../datahub/ingestion/api/source_helpers.py | 37 ----------- .../state/stale_entity_removal_handler.py | 35 +++++++++- ...bt_test_test_model_performance_golden.json | 64 +++++++++++++++++++ ...nowflake_empty_connection_user_golden.json | 50 +++++++++++++++ .../fivetran/fivetran_snowflake_golden.json | 48 ++++++++++++++ .../hive-metastore/docker-compose.yml | 3 - .../kafka-connect/docker-compose.override.yml | 1 - .../integration/kafka/docker-compose.yml | 1 - .../integration/trino/docker-compose.yml | 3 - .../state/golden_test_stateful_ingestion.json | 16 +++++ ...test_stateful_ingestion_after_deleted.json | 16 +++++ .../src/main/resources/entity-registry.yml | 1 + 12 files changed, 228 insertions(+), 47 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 8cc2cc565d..2de129d7d0 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -2,7 +2,6 @@ import logging from datetime import datetime, timezone from typing import ( TYPE_CHECKING, - Callable, Dict, Iterable, List, @@ -43,9 +42,6 @@ from datahub.utilities.urns.urn_iter import list_urns, lowercase_dataset_urns if TYPE_CHECKING: from datahub.ingestion.api.source import SourceReport - from datahub.ingestion.source.state.stale_entity_removal_handler import ( - StaleEntityRemovalHandler, - ) logger = logging.getLogger(__name__) @@ -139,39 +135,6 @@ def auto_status_aspect( ).as_workunit() -def _default_entity_type_fn(wu: MetadataWorkUnit) -> Optional[str]: - urn = wu.get_urn() - entity_type = guess_entity_type(urn) - return entity_type - - -def auto_stale_entity_removal( - stale_entity_removal_handler: "StaleEntityRemovalHandler", - stream: Iterable[MetadataWorkUnit], - entity_type_fn: Callable[ - [MetadataWorkUnit], Optional[str] - ] = _default_entity_type_fn, -) -> Iterable[MetadataWorkUnit]: - """ - Record all entities that are found, and emit removals for any that disappeared in this run. - """ - - for wu in stream: - urn = wu.get_urn() - - if wu.is_primary_source: - entity_type = entity_type_fn(wu) - if entity_type is not None: - stale_entity_removal_handler.add_entity_to_state(entity_type, urn) - else: - stale_entity_removal_handler.add_urn_to_skip(urn) - - yield wu - - # Clean up stale entities. - yield from stale_entity_removal_handler.gen_removed_entity_workunits() - - T = TypeVar("T", bound=MetadataWorkUnit) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index 97c9dd9e24..ee1ccdff78 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -9,7 +9,6 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import entity_supports_aspect from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId -from datahub.ingestion.api.source_helpers import auto_stale_entity_removal from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.state.checkpoint import Checkpoint from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState @@ -28,6 +27,10 @@ from datahub.utilities.urns.urn import guess_entity_type logger: logging.Logger = logging.getLogger(__name__) +STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = { + "dataProcessInstance", +} + class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig): """ @@ -59,6 +62,30 @@ class StaleEntityRemovalSourceReport(StatefulIngestionReport): self.last_state_non_deletable_entities.append(urn) +def auto_stale_entity_removal( + stale_entity_removal_handler: "StaleEntityRemovalHandler", + stream: Iterable[MetadataWorkUnit], +) -> Iterable[MetadataWorkUnit]: + """ + Record all entities that are found, and emit removals for any that disappeared in this run. + """ + + for wu in stream: + urn = wu.get_urn() + + if wu.is_primary_source: + entity_type = guess_entity_type(urn) + if entity_type is not None: + stale_entity_removal_handler.add_entity_to_state(entity_type, urn) + else: + stale_entity_removal_handler.add_urn_to_skip(urn) + + yield wu + + # Clean up stale entities. + yield from stale_entity_removal_handler.gen_removed_entity_workunits() + + class StaleEntityRemovalHandler( StatefulIngestionUsecaseHandlerBase["GenericCheckpointState"] ): @@ -285,7 +312,11 @@ class StaleEntityRemovalHandler( for urn in last_checkpoint_state.get_urns_not_in( type="*", other_checkpoint_state=cur_checkpoint_state ): - if not entity_supports_aspect(guess_entity_type(urn), StatusClass): + entity_type = guess_entity_type(urn) + if ( + entity_type in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES + or not entity_supports_aspect(entity_type, StatusClass) + ): # If any entity does not support aspect 'status' then skip that entity urn report.report_last_state_non_deletable_entities(urn) continue diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_test_model_performance_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_test_model_performance_golden.json index 201924744d..a9b7df7c2b 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_test_model_performance_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_test_model_performance_golden.json @@ -6010,6 +6010,70 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:36225e795a4597b2376996774a803b0d", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "dbt-model-performance", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:57aa623f096cf3a28af70fe94b713907", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "dbt-model-performance", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:a42a5b1bee156e45972e12d4156fb7a2", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "dbt-model-performance", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:fc6268f0be68fd04c310705b65efd6fe", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "dbt-model-performance", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "tag", "entityUrn": "urn:li:tag:dbt:tag_from_dbt", diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json index 65f8620d0b..d2ae437605 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json @@ -24,6 +24,7 @@ "aspect": { "json": { "owners": [], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:fivetran" @@ -186,6 +187,7 @@ "aspect": { "json": { "owners": [], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:fivetran" @@ -614,5 +616,53 @@ "runId": "powerbi-test", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json index 8545f43348..59e545183a 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json @@ -624,5 +624,53 @@ "runId": "powerbi-test", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/hive-metastore/docker-compose.yml b/metadata-ingestion/tests/integration/hive-metastore/docker-compose.yml index bc52779d71..d6a6701451 100644 --- a/metadata-ingestion/tests/integration/hive-metastore/docker-compose.yml +++ b/metadata-ingestion/tests/integration/hive-metastore/docker-compose.yml @@ -1,9 +1,6 @@ # Adapted from https://github.com/big-data-europe/docker-hive. -version: "3" - services: - presto: image: starburstdata/presto:350-e.15 container_name: "presto" diff --git a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml index bcd8c25a1d..85b6a310a0 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml @@ -1,5 +1,4 @@ --- -version: '3.8' services: connect: image: confluentinc/cp-kafka-connect:7.4.0 diff --git a/metadata-ingestion/tests/integration/kafka/docker-compose.yml b/metadata-ingestion/tests/integration/kafka/docker-compose.yml index 0a4422e075..044842850f 100644 --- a/metadata-ingestion/tests/integration/kafka/docker-compose.yml +++ b/metadata-ingestion/tests/integration/kafka/docker-compose.yml @@ -1,5 +1,4 @@ --- -version: "3.8" services: zookeeper: image: confluentinc/cp-zookeeper:7.2.2 diff --git a/metadata-ingestion/tests/integration/trino/docker-compose.yml b/metadata-ingestion/tests/integration/trino/docker-compose.yml index 59ba91e46d..5982e6af2d 100644 --- a/metadata-ingestion/tests/integration/trino/docker-compose.yml +++ b/metadata-ingestion/tests/integration/trino/docker-compose.yml @@ -1,9 +1,6 @@ # Adapted from https://github.com/big-data-europe/docker-hive. -version: "3" - services: - testtrino: image: trinodb/trino:369 container_name: "testtrino" diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json index c5d0df1aeb..adf11a2833 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json @@ -68,5 +68,21 @@ "runId": "dummy-test-stateful-ingestion", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json index c1bdc8ffee..e4893642d6 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json @@ -53,6 +53,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index 9c05c3d485..ed19cd3a1d 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -110,6 +110,7 @@ entities: - dataProcessInstanceProperties - dataProcessInstanceRelationships - dataProcessInstanceRunEvent + - status - name: chart category: core keyAspect: chartKey