2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								# Adding Stateful Ingestion to a Source
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Currently, datahub supports the [Stale Metadata Removal ](./stateful.md#stale-entity-removal ) and
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								the [Redunant Run Elimination ](./stateful.md#redundant-run-elimination ) use-cases on top of the more generic stateful ingestion
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								capability available for the sources. This document describes how to add support for these two use-cases to new sources.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								## Adding Stale Metadata Removal to a Source
  
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Adding the stale metadata removal use-case to a new source involves modifying the source config, source report, and the source itself.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								For a full example of all changes required: [Adding stale metadata removal to the MongoDB source ](https://github.com/datahub-project/datahub/pull/9118 ).
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								The [datahub.ingestion.source.state.stale_entity_removal_handler ](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py ) module provides the supporting infrastructure for all the steps described
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								above and substantially simplifies the implementation on the source side. Below is a detailed explanation of each of these
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								steps along with examples.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								### 1. Modify the source config
  
						 
					
						
							
								
									
										
										
										
											2022-11-18 00:09:24 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								The source's config must inherit from `StatefulIngestionConfigBase` , and should declare a field named `stateful_ingestion`  of type `Optional[StatefulStaleMetadataRemovalConfig]` .
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								Example:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								from datahub.ingestion.source.state.stale_entity_removal_handler import (
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    StatefulStaleMetadataRemovalConfig,
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								    StatefulIngestionConfigBase,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								class MySourceConfig(StatefulIngestionConfigBase):
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								    # ...< other  config  params > ...
							 
						 
					
						
							
								
									
										
										
										
											2022-11-18 00:09:24 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								### 2. Modify the source report
  
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								The report class of the source should inherit from `StaleEntityRemovalSourceReport`  instead of `SourceReport` .
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								from datahub.ingestion.source.state.stale_entity_removal_handler import (
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    StaleEntityRemovalSourceReport,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								@dataclass  
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								class MySourceReport(StatefulIngestionReport):
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # < other  fields  here > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pass
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
									
										
										
										
											2023-09-14 11:34:21 +09:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								### 3. Modify the source
  
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								1.  The source must inherit from `StatefulIngestionSourceBase`  instead of `Source` . 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								2.  The source should contain a custom `get_workunit_processors`  method. 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionSourceBase
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalHandler
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								class MySource(StatefulIngestionSourceBase):
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def __init__ (self, config: MySourceConfig, ctx: PipelineContext):
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        super().__init__(config, ctx)
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self.config = config
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self.report = MySourceReport()
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # other initialization code here
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return [
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            *super().get_workunit_processors(),
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            StaleEntityRemovalHandler.create(
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                self, self.config, self.ctx
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            ).workunit_processor,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ]
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # other methods here
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								## Adding Redundant Run Elimination to a Source
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								This use-case applies to the sources that drive ingestion by querying logs over a specified duration via the config(such
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								as snowflake usage, bigquery usage etc.). It typically involves expensive and long-running queries. To add redundant
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								run elimination to a new source to prevent the expensive reruns for the same time range(potentially due to a user
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								error or a scheduler malfunction), the following steps
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								are required.
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								1.  Update the `SourceConfig`  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								2.  Update the `SourceReport`  
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								3.  Modify the `Source`  to 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   1.  Instantiate the RedundantRunSkipHandler object.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   2.  Check if the current run should be skipped.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   3.  Update the state for the current run(start &  end times).
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								The [datahub.ingestion.source.state.redundant_run_skip_handler ](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py )
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								modules provides the supporting infrastructure required for all the steps described above.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								NOTE: The handler currently uses a simple state,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								the [BaseUsageCheckpointState ](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/usage_common_state.py ),
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								across all sources it supports (unlike the StaleEntityRemovalHandler).
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								### 1. Modifying the SourceConfig
  
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								The `SourceConfig`  must inherit from the [StatefulRedundantRunSkipConfig ](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py#L23 ) class.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Examples:
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								1.  Snowflake Usage 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from datahub.ingestion.source.state.redundant_run_skip_handler import (
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    StatefulRedundantRunSkipConfig,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class SnowflakeStatefulIngestionConfig(StatefulRedundantRunSkipConfig):
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pass
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								### 2. Modifying the SourceReport
  
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								The `SourceReport`  must inherit from the [StatefulIngestionReport ](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py#L102 ) class.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Examples:
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								1.  Snowflake Usage 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dataclass  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport):
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # < members  specific  to  snowflake  usage  report > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								### 3. Modifying the Source
  
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								The source must inherit from `StatefulIngestionSourceBase` .
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								#### 3.1 Instantiate RedundantRunSkipHandler in the `__init__` method of the source.
  
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								The source should instantiate an instance of the `RedundantRunSkipHandler`  in its `__init__`  method.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Examples:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Snowflake Usage
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from datahub.ingestion.source.state.redundant_run_skip_handler import (
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    RedundantRunSkipHandler,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class SnowflakeUsageSource(StatefulIngestionSourceBase):
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								    def __init__ (self, config: SnowflakeUsageConfig, ctx: PipelineContext):
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        super(SnowflakeUsageSource, self).__init__(config, ctx)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self.config: SnowflakeUsageConfig = config
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self.report: SnowflakeUsageReport = SnowflakeUsageReport()
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Create and register the stateful ingestion use-case handlers.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self.redundant_run_skip_handler = RedundantRunSkipHandler(
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            source=self,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            config=self.config,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            pipeline_name=self.ctx.pipeline_name,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            run_id=self.ctx.run_id,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        )
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								#### 3.2 Checking if the current run should be skipped.
  
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								The sources can query if the current run should be skipped using `should_skip_this_run`  method of `RedundantRunSkipHandler` . This should done from the `get_workunits`  method, before doing any other work.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Example code:
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def get_workunits(self) -> Iterable[MetadataWorkUnit]:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Skip a redundant run
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if self.redundant_run_skip_handler.should_skip_this_run(
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ):
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            return
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Generate the workunits.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								#### 3.3 Updating the state for the current run.
  
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								The source should use the `update_state`  method of `RedundantRunSkipHandler`  to update the current run's state if the run has not been skipped. This step can be performed in the `get_workunits`  if the run has not been skipped.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Example code:
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def get_workunits(self) -> Iterable[MetadataWorkUnit]:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Skip a redundant run
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if self.redundant_run_skip_handler.should_skip_this_run(
							 
						 
					
						
							
								
									
										
										
										
											2023-08-28 15:03:31 +05:30 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								            cur_start_time_millis=self.config.start_time
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								        ):
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            return
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								        # Generate the workunits.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # < code  for  generating  the  workunits > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Update checkpoint state for this run.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self.redundant_run_skip_handler.update_state(
							 
						 
					
						
							
								
									
										
										
										
											2023-08-28 15:03:31 +05:30 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								            start_time_millis=self.config.start_time,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            end_time_millis=self.config.end_time,
							 
						 
					
						
							
								
									
										
										
										
											2022-09-21 09:02:50 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								        )
							 
						 
					
						
							
								
									
										
										
										
											2023-11-28 18:31:56 -05:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								```