Currently, datahub supports the [Stale Metadata Removal](./stateful.md#stale-entity-removal) and
the [Redunant Run Elimination](./stateful.md#redundant-run-elimination) use-cases on top of the more generic stateful ingestion
capability available for the sources. This document describes how to add support for these two use-cases to new sources.
## Adding Stale Metadata Removal to a Source
Adding the stale metadata removal use-case to a new source involves
1. Defining the new checkpoint state that stores the list of entities emitted from a specific ingestion run.
2. Modifying the `SourceConfig` associated with the source to use a custom `stateful_ingestion` config param.
3. Modifying the `SourceReport` associated with the source to include soft-deleted entities in the report.
4. Modifying the `Source` to
1. Instantiate the StaleEntityRemovalHandler object
2. Add entities from the current run to the state object
3. Emit stale metadata removal workunits
The [datahub.ingestion.source.state.stale_entity_removal_handler](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py) module provides the supporting infrastructure for all the steps described
above and substantially simplifies the implementation on the source side. Below is a detailed explanation of each of these
steps along with examples.
### 1. Defining the checkpoint state for the source.
The checkpoint state class is responsible for tracking the entities emitted from each ingestion run. If none of the existing states do not meet the needs of the new source, a new checkpoint state must be created. The state must
inherit from the `StaleEntityCheckpointStateBase` abstract class shown below, and implement each of the abstract methods.
```python
class StaleEntityCheckpointStateBase(CheckpointStateBase, ABC, Generic[Derived]):
"""
Defines the abstract interface for the checkpoint states that are used for stale entity removal.
Examples include sql_common state for tracking table and & view urns,
dbt that tracks node & assertion urns, kafka state tracking topic urns.
The source's config must inherit from `StatefulIngestionConfigBase`, and should declare a field named `stateful_ingestion` of type `Optional[StatefulStaleMetadataRemovalConfig]`.
2. The [DBTStatefulIngestionConfig](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/dbt.py#L131)
and the [DBTConfig](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/dbt.py#L317).
### 3. Modifying the SourceReport
The report class of the source should inherit from `StaleEntityRemovalSourceReport` whose definition is shown below.
```python
from typing import List
from dataclasses import dataclass, field
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionReport
@dataclass
class StaleEntityRemovalSourceReport(StatefulIngestionReport):
# NOTE: Populating the current state happens during the execution of this code.
# ...
# Clean up stale entities at the end
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()
```
## Adding Redundant Run Elimination to a Source
This use-case applies to the sources that drive ingestion by querying logs over a specified duration via the config(such
as snowflake usage, bigquery usage etc.). It typically involves expensive and long-running queries. To add redundant
run elimination to a new source to prevent the expensive reruns for the same time range(potentially due to a user
error or a scheduler malfunction), the following steps
are required.
1. Update the `SourceConfig`
2. Update the `SourceReport`
3. Modify the `Source` to
1. Instantiate the RedundantRunSkipHandler object.
2. Check if the current run should be skipped.
3. Update the state for the current run(start & end times).
The [datahub.ingestion.source.state.redundant_run_skip_handler](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py)
modules provides the supporting infrastructure required for all the steps described above.
NOTE: The handler currently uses a simple state,
the [BaseUsageCheckpointState](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/usage_common_state.py),
across all sources it supports (unlike the StaleEntityRemovalHandler).
### 1. Modifying the SourceConfig
The `SourceConfig` must inherit from the [StatefulRedundantRunSkipConfig](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py#L23) class.
Examples:
1. Snowflake Usage
```python
from datahub.ingestion.source.state.redundant_run_skip_handler import (
StatefulRedundantRunSkipConfig,
)
class SnowflakeStatefulIngestionConfig(StatefulRedundantRunSkipConfig):
pass
```
### 2. Modifying the SourceReport
The `SourceReport` must inherit from the [StatefulIngestionReport](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py#L102) class.
Examples:
1. Snowflake Usage
```python
@dataclass
class SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport):
# <membersspecifictosnowflakeusagereport>
```
### 3. Modifying the Source
The source must inherit from `StatefulIngestionSourceBase`.
#### 3.1 Instantiate RedundantRunSkipHandler in the `__init__` method of the source.
The source should instantiate an instance of the `RedundantRunSkipHandler` in its `__init__` method.
Examples:
Snowflake Usage
```python
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantRunSkipHandler,
)
class SnowflakeUsageSource(StatefulIngestionSourceBase):
#### 3.2 Checking if the current run should be skipped.
The sources can query if the current run should be skipped using `should_skip_this_run` method of `RedundantRunSkipHandler`. This should done from the `get_workunits` method, before doing any other work.
The source should use the `update_state` method of `RedundantRunSkipHandler` to update the current run's state if the run has not been skipped. This step can be performed in the `get_workunits` if the run has not been skipped.