From 2031bd4de12d0e42974fb46e1839145dd86cb40e Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 28 Nov 2023 18:31:56 -0500 Subject: [PATCH] docs(ingest): update docs on adding stateful ingestion (#9327) --- .../add_stateful_ingestion_to_source.md | 201 ++++++------------ 1 file changed, 68 insertions(+), 133 deletions(-) diff --git a/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source.md b/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source.md index 9e39d24fb8..a152697988 100644 --- a/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source.md +++ b/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source.md @@ -5,160 +5,75 @@ the [Redunant Run Elimination](./stateful.md#redundant-run-elimination) use-case capability available for the sources. This document describes how to add support for these two use-cases to new sources. ## Adding Stale Metadata Removal to a Source -Adding the stale metadata removal use-case to a new source involves -1. Defining the new checkpoint state that stores the list of entities emitted from a specific ingestion run. -2. Modifying the `SourceConfig` associated with the source to use a custom `stateful_ingestion` config param. -3. Modifying the `SourceReport` associated with the source to include soft-deleted entities in the report. -4. Modifying the `Source` to - 1. Instantiate the StaleEntityRemovalHandler object - 2. Add entities from the current run to the state object - 3. Emit stale metadata removal workunits + +Adding the stale metadata removal use-case to a new source involves modifying the source config, source report, and the source itself. + +For a full example of all changes required: [Adding stale metadata removal to the MongoDB source](https://github.com/datahub-project/datahub/pull/9118). The [datahub.ingestion.source.state.stale_entity_removal_handler](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py) module provides the supporting infrastructure for all the steps described above and substantially simplifies the implementation on the source side. Below is a detailed explanation of each of these steps along with examples. -### 1. Defining the checkpoint state for the source. -The checkpoint state class is responsible for tracking the entities emitted from each ingestion run. If none of the existing states do not meet the needs of the new source, a new checkpoint state must be created. The state must -inherit from the `StaleEntityCheckpointStateBase` abstract class shown below, and implement each of the abstract methods. -```python -class StaleEntityCheckpointStateBase(CheckpointStateBase, ABC, Generic[Derived]): - """ - Defines the abstract interface for the checkpoint states that are used for stale entity removal. - Examples include sql_common state for tracking table and & view urns, - dbt that tracks node & assertion urns, kafka state tracking topic urns. - """ - - @classmethod - @abstractmethod - def get_supported_types(cls) -> List[str]: - pass - - @abstractmethod - def add_checkpoint_urn(self, type: str, urn: str) -> None: - """ - Adds an urn into the list used for tracking the type. - :param type: The type of the urn such as a 'table', 'view', - 'node', 'topic', 'assertion' that the concrete sub-class understands. - :param urn: The urn string - :return: None. - """ - pass - - @abstractmethod - def get_urns_not_in( - self, type: str, other_checkpoint_state: Derived - ) -> Iterable[str]: - """ - Gets the urns present in this checkpoint but not the other_checkpoint for the given type. - :param type: The type of the urn such as a 'table', 'view', - 'node', 'topic', 'assertion' that the concrete sub-class understands. - :param other_checkpoint_state: the checkpoint state to compute the urn set difference against. - :return: an iterable to the set of urns present in this checkpoing state but not in the other_checkpoint. - """ - pass -``` - -Examples: -* [BaseSQLAlchemyCheckpointState](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/sql_common_state.py#L17) - -### 2. Modifying the SourceConfig +### 1. Modify the source config The source's config must inherit from `StatefulIngestionConfigBase`, and should declare a field named `stateful_ingestion` of type `Optional[StatefulStaleMetadataRemovalConfig]`. -Examples: -- The `KafkaSourceConfig` +Example: + ```python -from typing import List, Optional -import pydantic -from datahub.ingestion.source.state.stale_entity_removal_handler import StatefulStaleMetadataRemovalConfig -from datahub.ingestion.source.state.stateful_ingestion_base import ( +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StatefulStaleMetadataRemovalConfig, StatefulIngestionConfigBase, ) -class KafkaSourceConfig(StatefulIngestionConfigBase): +class MySourceConfig(StatefulIngestionConfigBase): # ...... stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None ``` -### 3. Modifying the SourceReport -The report class of the source should inherit from `StaleEntityRemovalSourceReport` whose definition is shown below. -```python -from typing import List -from dataclasses import dataclass, field -from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionReport -@dataclass -class StaleEntityRemovalSourceReport(StatefulIngestionReport): - soft_deleted_stale_entities: List[str] = field(default_factory=list) +### 2. Modify the source report - def report_stale_entity_soft_deleted(self, urn: str) -> None: - self.soft_deleted_stale_entities.append(urn) +The report class of the source should inherit from `StaleEntityRemovalSourceReport` instead of `SourceReport`. + +```python +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalSourceReport, +) + +@dataclass +class MySourceReport(StatefulIngestionReport): + # + pass ``` -Examples: -* The `KafkaSourceReport` -```python -from dataclasses import dataclass -from datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalSourceReport -@dataclass -class KafkaSourceReport(StaleEntityRemovalSourceReport): - # - # Create and register the stateful ingestion stale entity removal handler. - self.stale_entity_removal_handler = StaleEntityRemovalHandler( - source=self, - config=self.source_config, - state_type_class=KafkaCheckpointState, - pipeline_name=self.ctx.pipeline_name, - run_id=self.ctx.run_id, - ) -``` -#### 4.2 Adding entities from current run to the state object. -Use the `add_entity_to_state` method of the `StaleEntityRemovalHandler`. -Examples: -```python -# Kafka -self.stale_entity_removal_handler.add_entity_to_state( - type="topic", - urn=topic_urn,) +class MySource(StatefulIngestionSourceBase): + def __init__(self, config: MySourceConfig, ctx: PipelineContext): + super().__init__(config, ctx) -# DBT -self.stale_entity_removal_handler.add_entity_to_state( - type="dataset", - urn=node_datahub_urn -) -self.stale_entity_removal_handler.add_entity_to_state( - type="assertion", - urn=node_datahub_urn, -) -``` + self.config = config + self.report = MySourceReport() -#### 4.3 Emitting soft-delete workunits associated with the stale entities. -```python -def get_workunits(self) -> Iterable[MetadataWorkUnit]: - # - # Emit the rest of the workunits for the source. - # NOTE: Populating the current state happens during the execution of this code. - # ... - - # Clean up stale entities at the end - yield from self.stale_entity_removal_handler.gen_removed_entity_workunits() + # other initialization code here + + def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: + return [ + *super().get_workunit_processors(), + StaleEntityRemovalHandler.create( + self, self.config, self.ctx + ).workunit_processor, + ] + + # other methods here ``` ## Adding Redundant Run Elimination to a Source @@ -168,12 +83,13 @@ as snowflake usage, bigquery usage etc.). It typically involves expensive and lo run elimination to a new source to prevent the expensive reruns for the same time range(potentially due to a user error or a scheduler malfunction), the following steps are required. + 1. Update the `SourceConfig` 2. Update the `SourceReport` -3. Modify the `Source` to - 1. Instantiate the RedundantRunSkipHandler object. - 2. Check if the current run should be skipped. - 3. Update the state for the current run(start & end times). +3. Modify the `Source` to + 1. Instantiate the RedundantRunSkipHandler object. + 2. Check if the current run should be skipped. + 3. Update the state for the current run(start & end times). The [datahub.ingestion.source.state.redundant_run_skip_handler](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py) modules provides the supporting infrastructure required for all the steps described above. @@ -181,11 +97,15 @@ modules provides the supporting infrastructure required for all the steps descri NOTE: The handler currently uses a simple state, the [BaseUsageCheckpointState](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/usage_common_state.py), across all sources it supports (unlike the StaleEntityRemovalHandler). + ### 1. Modifying the SourceConfig + The `SourceConfig` must inherit from the [StatefulRedundantRunSkipConfig](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py#L23) class. Examples: + 1. Snowflake Usage + ```python from datahub.ingestion.source.state.redundant_run_skip_handler import ( StatefulRedundantRunSkipConfig, @@ -193,27 +113,36 @@ from datahub.ingestion.source.state.redundant_run_skip_handler import ( class SnowflakeStatefulIngestionConfig(StatefulRedundantRunSkipConfig): pass ``` + ### 2. Modifying the SourceReport + The `SourceReport` must inherit from the [StatefulIngestionReport](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py#L102) class. Examples: + 1. Snowflake Usage + ```python @dataclass class SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport): # ``` + ### 3. Modifying the Source + The source must inherit from `StatefulIngestionSourceBase`. + #### 3.1 Instantiate RedundantRunSkipHandler in the `__init__` method of the source. + The source should instantiate an instance of the `RedundantRunSkipHandler` in its `__init__` method. Examples: Snowflake Usage + ```python from datahub.ingestion.source.state.redundant_run_skip_handler import ( RedundantRunSkipHandler, ) class SnowflakeUsageSource(StatefulIngestionSourceBase): - + def __init__(self, config: SnowflakeUsageConfig, ctx: PipelineContext): super(SnowflakeUsageSource, self).__init__(config, ctx) self.config: SnowflakeUsageConfig = config @@ -226,10 +155,13 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase): run_id=self.ctx.run_id, ) ``` + #### 3.2 Checking if the current run should be skipped. + The sources can query if the current run should be skipped using `should_skip_this_run` method of `RedundantRunSkipHandler`. This should done from the `get_workunits` method, before doing any other work. Example code: + ```python def get_workunits(self) -> Iterable[MetadataWorkUnit]: # Skip a redundant run @@ -239,10 +171,13 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: return # Generate the workunits. ``` + #### 3.3 Updating the state for the current run. + The source should use the `update_state` method of `RedundantRunSkipHandler` to update the current run's state if the run has not been skipped. This step can be performed in the `get_workunits` if the run has not been skipped. Example code: + ```python def get_workunits(self) -> Iterable[MetadataWorkUnit]: # Skip a redundant run @@ -250,7 +185,7 @@ Example code: cur_start_time_millis=self.config.start_time ): return - + # Generate the workunits. # # Update checkpoint state for this run. @@ -258,4 +193,4 @@ Example code: start_time_millis=self.config.start_time, end_time_millis=self.config.end_time, ) -``` \ No newline at end of file +```