fix(ingest): add status aspect to dataProcessInstance (#10757)

This commit is contained in:
Harshal Sheth 2024-06-27 12:07:28 -07:00 committed by GitHub
parent d63f25faa8
commit fa2ab1bcee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 228 additions and 47 deletions

View File

@ -2,7 +2,6 @@ import logging
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterable,
List,
@ -43,9 +42,6 @@ from datahub.utilities.urns.urn_iter import list_urns, lowercase_dataset_urns
if TYPE_CHECKING:
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
logger = logging.getLogger(__name__)
@ -139,39 +135,6 @@ def auto_status_aspect(
).as_workunit()
def _default_entity_type_fn(wu: MetadataWorkUnit) -> Optional[str]:
urn = wu.get_urn()
entity_type = guess_entity_type(urn)
return entity_type
def auto_stale_entity_removal(
stale_entity_removal_handler: "StaleEntityRemovalHandler",
stream: Iterable[MetadataWorkUnit],
entity_type_fn: Callable[
[MetadataWorkUnit], Optional[str]
] = _default_entity_type_fn,
) -> Iterable[MetadataWorkUnit]:
"""
Record all entities that are found, and emit removals for any that disappeared in this run.
"""
for wu in stream:
urn = wu.get_urn()
if wu.is_primary_source:
entity_type = entity_type_fn(wu)
if entity_type is not None:
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)
else:
stale_entity_removal_handler.add_urn_to_skip(urn)
yield wu
# Clean up stale entities.
yield from stale_entity_removal_handler.gen_removed_entity_workunits()
T = TypeVar("T", bound=MetadataWorkUnit)

View File

@ -9,7 +9,6 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import entity_supports_aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.api.source_helpers import auto_stale_entity_removal
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
@ -28,6 +27,10 @@ from datahub.utilities.urns.urn import guess_entity_type
logger: logging.Logger = logging.getLogger(__name__)
STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = {
"dataProcessInstance",
}
class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
"""
@ -59,6 +62,30 @@ class StaleEntityRemovalSourceReport(StatefulIngestionReport):
self.last_state_non_deletable_entities.append(urn)
def auto_stale_entity_removal(
stale_entity_removal_handler: "StaleEntityRemovalHandler",
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""
Record all entities that are found, and emit removals for any that disappeared in this run.
"""
for wu in stream:
urn = wu.get_urn()
if wu.is_primary_source:
entity_type = guess_entity_type(urn)
if entity_type is not None:
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)
else:
stale_entity_removal_handler.add_urn_to_skip(urn)
yield wu
# Clean up stale entities.
yield from stale_entity_removal_handler.gen_removed_entity_workunits()
class StaleEntityRemovalHandler(
StatefulIngestionUsecaseHandlerBase["GenericCheckpointState"]
):
@ -285,7 +312,11 @@ class StaleEntityRemovalHandler(
for urn in last_checkpoint_state.get_urns_not_in(
type="*", other_checkpoint_state=cur_checkpoint_state
):
if not entity_supports_aspect(guess_entity_type(urn), StatusClass):
entity_type = guess_entity_type(urn)
if (
entity_type in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES
or not entity_supports_aspect(entity_type, StatusClass)
):
# If any entity does not support aspect 'status' then skip that entity urn
report.report_last_state_non_deletable_entities(urn)
continue

View File

@ -6010,6 +6010,70 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:36225e795a4597b2376996774a803b0d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:57aa623f096cf3a28af70fe94b713907",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:a42a5b1bee156e45972e12d4156fb7a2",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fc6268f0be68fd04c310705b65efd6fe",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:dbt:tag_from_dbt",

View File

@ -24,6 +24,7 @@
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
@ -186,6 +187,7 @@
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
@ -614,5 +616,53 @@
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -624,5 +624,53 @@
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -1,9 +1,6 @@
# Adapted from https://github.com/big-data-europe/docker-hive.
version: "3"
services:
presto:
image: starburstdata/presto:350-e.15
container_name: "presto"

View File

@ -1,5 +1,4 @@
---
version: '3.8'
services:
connect:
image: confluentinc/cp-kafka-connect:7.4.0

View File

@ -1,5 +1,4 @@
---
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.2

View File

@ -1,9 +1,6 @@
# Adapted from https://github.com/big-data-europe/docker-hive.
version: "3"
services:
testtrino:
image: trinodb/trino:369
container_name: "testtrino"

View File

@ -68,5 +68,21 @@
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -53,6 +53,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",

View File

@ -110,6 +110,7 @@ entities:
- dataProcessInstanceProperties
- dataProcessInstanceRelationships
- dataProcessInstanceRunEvent
- status
- name: chart
category: core
keyAspect: chartKey