2022-09-21 09:02:50 -07:00
# Adding Stateful Ingestion to a Source
Currently, datahub supports the [Stale Metadata Removal ](./stateful.md#stale-entity-removal ) and
the [Redunant Run Elimination ](./stateful.md#redundant-run-elimination ) use-cases on top of the more generic stateful ingestion
capability available for the sources. This document describes how to add support for these two use-cases to new sources.
## Adding Stale Metadata Removal to a Source
2023-11-28 18:31:56 -05:00
Adding the stale metadata removal use-case to a new source involves modifying the source config, source report, and the source itself.
For a full example of all changes required: [Adding stale metadata removal to the MongoDB source ](https://github.com/datahub-project/datahub/pull/9118 ).
2022-09-21 09:02:50 -07:00
The [datahub.ingestion.source.state.stale_entity_removal_handler ](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py ) module provides the supporting infrastructure for all the steps described
above and substantially simplifies the implementation on the source side. Below is a detailed explanation of each of these
steps along with examples.
2023-11-28 18:31:56 -05:00
### 1. Modify the source config
2022-11-18 00:09:24 -05:00
The source's config must inherit from `StatefulIngestionConfigBase` , and should declare a field named `stateful_ingestion` of type `Optional[StatefulStaleMetadataRemovalConfig]` .
2022-09-21 09:02:50 -07:00
2023-11-28 18:31:56 -05:00
Example:
2022-09-21 09:02:50 -07:00
```python
2023-11-28 18:31:56 -05:00
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
2022-09-21 09:02:50 -07:00
StatefulIngestionConfigBase,
)
2023-11-28 18:31:56 -05:00
class MySourceConfig(StatefulIngestionConfigBase):
2022-09-21 09:02:50 -07:00
# ...< other config params > ...
2022-11-18 00:09:24 -05:00
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
2022-09-21 09:02:50 -07:00
```
2023-11-28 18:31:56 -05:00
### 2. Modify the source report
2022-09-21 09:02:50 -07:00
2023-11-28 18:31:56 -05:00
The report class of the source should inherit from `StaleEntityRemovalSourceReport` instead of `SourceReport` .
2022-09-21 09:02:50 -07:00
```python
2023-11-28 18:31:56 -05:00
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
)
2022-09-21 09:02:50 -07:00
@dataclass
2023-11-28 18:31:56 -05:00
class MySourceReport(StatefulIngestionReport):
# < other fields here >
pass
2022-09-21 09:02:50 -07:00
```
2023-09-14 11:34:21 +09:00
2023-11-28 18:31:56 -05:00
### 3. Modify the source
2022-09-21 09:02:50 -07:00
2023-11-28 18:31:56 -05:00
1. The source must inherit from `StatefulIngestionSourceBase` instead of `Source` .
2. The source should contain a custom `get_workunit_processors` method.
2022-09-21 09:02:50 -07:00
```python
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionSourceBase
from datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalHandler
2023-11-28 18:31:56 -05:00
class MySource(StatefulIngestionSourceBase):
def __init__ (self, config: MySourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
2022-09-21 09:02:50 -07:00
2023-11-28 18:31:56 -05:00
self.config = config
self.report = MySourceReport()
# other initialization code here
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]
# other methods here
2022-09-21 09:02:50 -07:00
```
## Adding Redundant Run Elimination to a Source
This use-case applies to the sources that drive ingestion by querying logs over a specified duration via the config(such
as snowflake usage, bigquery usage etc.). It typically involves expensive and long-running queries. To add redundant
run elimination to a new source to prevent the expensive reruns for the same time range(potentially due to a user
error or a scheduler malfunction), the following steps
are required.
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
1. Update the `SourceConfig`
2. Update the `SourceReport`
2023-11-28 18:31:56 -05:00
3. Modify the `Source` to
1. Instantiate the RedundantRunSkipHandler object.
2. Check if the current run should be skipped.
3. Update the state for the current run(start & end times).
2022-09-21 09:02:50 -07:00
The [datahub.ingestion.source.state.redundant_run_skip_handler ](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py )
modules provides the supporting infrastructure required for all the steps described above.
NOTE: The handler currently uses a simple state,
the [BaseUsageCheckpointState ](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/usage_common_state.py ),
across all sources it supports (unlike the StaleEntityRemovalHandler).
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
### 1. Modifying the SourceConfig
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
The `SourceConfig` must inherit from the [StatefulRedundantRunSkipConfig ](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py#L23 ) class.
Examples:
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
1. Snowflake Usage
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
```python
from datahub.ingestion.source.state.redundant_run_skip_handler import (
StatefulRedundantRunSkipConfig,
)
class SnowflakeStatefulIngestionConfig(StatefulRedundantRunSkipConfig):
pass
```
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
### 2. Modifying the SourceReport
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
The `SourceReport` must inherit from the [StatefulIngestionReport ](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py#L102 ) class.
Examples:
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
1. Snowflake Usage
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
```python
@dataclass
class SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport):
# < members specific to snowflake usage report >
```
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
### 3. Modifying the Source
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
The source must inherit from `StatefulIngestionSourceBase` .
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
#### 3.1 Instantiate RedundantRunSkipHandler in the `__init__` method of the source.
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
The source should instantiate an instance of the `RedundantRunSkipHandler` in its `__init__` method.
Examples:
Snowflake Usage
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
```python
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantRunSkipHandler,
)
class SnowflakeUsageSource(StatefulIngestionSourceBase):
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
def __init__ (self, config: SnowflakeUsageConfig, ctx: PipelineContext):
super(SnowflakeUsageSource, self).__init__(config, ctx)
self.config: SnowflakeUsageConfig = config
self.report: SnowflakeUsageReport = SnowflakeUsageReport()
# Create and register the stateful ingestion use-case handlers.
self.redundant_run_skip_handler = RedundantRunSkipHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
```
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
#### 3.2 Checking if the current run should be skipped.
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
The sources can query if the current run should be skipped using `should_skip_this_run` method of `RedundantRunSkipHandler` . This should done from the `get_workunits` method, before doing any other work.
Example code:
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
```python
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Skip a redundant run
if self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
):
return
# Generate the workunits.
```
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
#### 3.3 Updating the state for the current run.
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
The source should use the `update_state` method of `RedundantRunSkipHandler` to update the current run's state if the run has not been skipped. This step can be performed in the `get_workunits` if the run has not been skipped.
Example code:
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
```python
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Skip a redundant run
if self.redundant_run_skip_handler.should_skip_this_run(
2023-08-28 15:03:31 +05:30
cur_start_time_millis=self.config.start_time
2022-09-21 09:02:50 -07:00
):
return
2023-11-28 18:31:56 -05:00
2022-09-21 09:02:50 -07:00
# Generate the workunits.
# < code for generating the workunits >
# Update checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
2023-08-28 15:03:31 +05:30
start_time_millis=self.config.start_time,
end_time_millis=self.config.end_time,
2022-09-21 09:02:50 -07:00
)
2023-11-28 18:31:56 -05:00
```