7.6 KiB
		
	
	
	
	
	
	
	
			
		
		
	
	Adding Stateful Ingestion to a Source
Currently, datahub supports the Stale Metadata Removal and the Redunant Run Elimination use-cases on top of the more generic stateful ingestion capability available for the sources. This document describes how to add support for these two use-cases to new sources.
Adding Stale Metadata Removal to a Source
Adding the stale metadata removal use-case to a new source involves modifying the source config, source report, and the source itself.
For a full example of all changes required: Adding stale metadata removal to the MongoDB source.
The datahub.ingestion.source.state.stale_entity_removal_handler module provides the supporting infrastructure for all the steps described above and substantially simplifies the implementation on the source side. Below is a detailed explanation of each of these steps along with examples.
1. Modify the source config
The source's config must inherit from StatefulIngestionConfigBase, and should declare a field named stateful_ingestion of type Optional[StatefulStaleMetadataRemovalConfig].
Example:
from datahub.ingestion.source.state.stale_entity_removal_handler import (
    StatefulStaleMetadataRemovalConfig,
    StatefulIngestionConfigBase,
)
class MySourceConfig(StatefulIngestionConfigBase):
    # ...<other config params>...
    stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
2. Modify the source report
The report class of the source should inherit from StaleEntityRemovalSourceReport instead of SourceReport.
from datahub.ingestion.source.state.stale_entity_removal_handler import (
    StaleEntityRemovalSourceReport,
)
@dataclass
class MySourceReport(StatefulIngestionReport):
    # <other fields here>
    pass
3. Modify the source
- The source must inherit from StatefulIngestionSourceBaseinstead ofSource.
- The source should contain a custom get_workunit_processorsmethod.
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionSourceBase
from datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalHandler
class MySource(StatefulIngestionSourceBase):
    def __init__(self, config: MySourceConfig, ctx: PipelineContext):
        super().__init__(config, ctx)
        self.config = config
        self.report = MySourceReport()
        # other initialization code here
    def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
        return [
            *super().get_workunit_processors(),
            StaleEntityRemovalHandler.create(
                self, self.config, self.ctx
            ).workunit_processor,
        ]
    # other methods here
Adding Redundant Run Elimination to a Source
This use-case applies to the sources that drive ingestion by querying logs over a specified duration via the config(such as snowflake usage, bigquery usage etc.). It typically involves expensive and long-running queries. To add redundant run elimination to a new source to prevent the expensive reruns for the same time range(potentially due to a user error or a scheduler malfunction), the following steps are required.
- Update the SourceConfig
- Update the SourceReport
- Modify the Sourceto- Instantiate the RedundantRunSkipHandler object.
- Check if the current run should be skipped.
- Update the state for the current run(start & end times).
 
The datahub.ingestion.source.state.redundant_run_skip_handler modules provides the supporting infrastructure required for all the steps described above.
NOTE: The handler currently uses a simple state, the BaseUsageCheckpointState, across all sources it supports (unlike the StaleEntityRemovalHandler).
1. Modifying the SourceConfig
The SourceConfig must inherit from the StatefulRedundantRunSkipConfig class.
Examples:
- Snowflake Usage
from datahub.ingestion.source.state.redundant_run_skip_handler import (
    StatefulRedundantRunSkipConfig,
)
class SnowflakeStatefulIngestionConfig(StatefulRedundantRunSkipConfig):
    pass
2. Modifying the SourceReport
The SourceReport must inherit from the StatefulIngestionReport class.
Examples:
- Snowflake Usage
@dataclass
class SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport):
    # <members specific to snowflake usage report>
3. Modifying the Source
The source must inherit from StatefulIngestionSourceBase.
3.1 Instantiate RedundantRunSkipHandler in the __init__ method of the source.
The source should instantiate an instance of the RedundantRunSkipHandler in its __init__ method.
Examples:
Snowflake Usage
from datahub.ingestion.source.state.redundant_run_skip_handler import (
    RedundantRunSkipHandler,
)
class SnowflakeUsageSource(StatefulIngestionSourceBase):
    def __init__(self, config: SnowflakeUsageConfig, ctx: PipelineContext):
        super(SnowflakeUsageSource, self).__init__(config, ctx)
        self.config: SnowflakeUsageConfig = config
        self.report: SnowflakeUsageReport = SnowflakeUsageReport()
        # Create and register the stateful ingestion use-case handlers.
        self.redundant_run_skip_handler = RedundantRunSkipHandler(
            source=self,
            config=self.config,
            pipeline_name=self.ctx.pipeline_name,
            run_id=self.ctx.run_id,
        )
3.2 Checking if the current run should be skipped.
The sources can query if the current run should be skipped using should_skip_this_run method of RedundantRunSkipHandler. This should done from the get_workunits method, before doing any other work.
Example code:
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
        # Skip a redundant run
        if self.redundant_run_skip_handler.should_skip_this_run(
            cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
        ):
            return
        # Generate the workunits.
3.3 Updating the state for the current run.
The source should use the update_state method of RedundantRunSkipHandler to update the current run's state if the run has not been skipped. This step can be performed in the get_workunits if the run has not been skipped.
Example code:
    def get_workunits(self) -> Iterable[MetadataWorkUnit]:
        # Skip a redundant run
        if self.redundant_run_skip_handler.should_skip_this_run(
            cur_start_time_millis=self.config.start_time
        ):
            return
        # Generate the workunits.
        # <code for generating the workunits>
        # Update checkpoint state for this run.
        self.redundant_run_skip_handler.update_state(
            start_time_millis=self.config.start_time,
            end_time_millis=self.config.end_time,
        )
