mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-03 14:16:28 +00:00
docs(ingest): update docs on adding stateful ingestion (#9327)
This commit is contained in:
parent
3d7962cf17
commit
2031bd4de1
@ -5,160 +5,75 @@ the [Redunant Run Elimination](./stateful.md#redundant-run-elimination) use-case
|
||||
capability available for the sources. This document describes how to add support for these two use-cases to new sources.
|
||||
|
||||
## Adding Stale Metadata Removal to a Source
|
||||
Adding the stale metadata removal use-case to a new source involves
|
||||
1. Defining the new checkpoint state that stores the list of entities emitted from a specific ingestion run.
|
||||
2. Modifying the `SourceConfig` associated with the source to use a custom `stateful_ingestion` config param.
|
||||
3. Modifying the `SourceReport` associated with the source to include soft-deleted entities in the report.
|
||||
4. Modifying the `Source` to
|
||||
1. Instantiate the StaleEntityRemovalHandler object
|
||||
2. Add entities from the current run to the state object
|
||||
3. Emit stale metadata removal workunits
|
||||
|
||||
Adding the stale metadata removal use-case to a new source involves modifying the source config, source report, and the source itself.
|
||||
|
||||
For a full example of all changes required: [Adding stale metadata removal to the MongoDB source](https://github.com/datahub-project/datahub/pull/9118).
|
||||
|
||||
The [datahub.ingestion.source.state.stale_entity_removal_handler](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py) module provides the supporting infrastructure for all the steps described
|
||||
above and substantially simplifies the implementation on the source side. Below is a detailed explanation of each of these
|
||||
steps along with examples.
|
||||
|
||||
### 1. Defining the checkpoint state for the source.
|
||||
The checkpoint state class is responsible for tracking the entities emitted from each ingestion run. If none of the existing states do not meet the needs of the new source, a new checkpoint state must be created. The state must
|
||||
inherit from the `StaleEntityCheckpointStateBase` abstract class shown below, and implement each of the abstract methods.
|
||||
```python
|
||||
class StaleEntityCheckpointStateBase(CheckpointStateBase, ABC, Generic[Derived]):
|
||||
"""
|
||||
Defines the abstract interface for the checkpoint states that are used for stale entity removal.
|
||||
Examples include sql_common state for tracking table and & view urns,
|
||||
dbt that tracks node & assertion urns, kafka state tracking topic urns.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def get_supported_types(cls) -> List[str]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def add_checkpoint_urn(self, type: str, urn: str) -> None:
|
||||
"""
|
||||
Adds an urn into the list used for tracking the type.
|
||||
:param type: The type of the urn such as a 'table', 'view',
|
||||
'node', 'topic', 'assertion' that the concrete sub-class understands.
|
||||
:param urn: The urn string
|
||||
:return: None.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_urns_not_in(
|
||||
self, type: str, other_checkpoint_state: Derived
|
||||
) -> Iterable[str]:
|
||||
"""
|
||||
Gets the urns present in this checkpoint but not the other_checkpoint for the given type.
|
||||
:param type: The type of the urn such as a 'table', 'view',
|
||||
'node', 'topic', 'assertion' that the concrete sub-class understands.
|
||||
:param other_checkpoint_state: the checkpoint state to compute the urn set difference against.
|
||||
:return: an iterable to the set of urns present in this checkpoing state but not in the other_checkpoint.
|
||||
"""
|
||||
pass
|
||||
```
|
||||
|
||||
Examples:
|
||||
* [BaseSQLAlchemyCheckpointState](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/sql_common_state.py#L17)
|
||||
|
||||
### 2. Modifying the SourceConfig
|
||||
### 1. Modify the source config
|
||||
|
||||
The source's config must inherit from `StatefulIngestionConfigBase`, and should declare a field named `stateful_ingestion` of type `Optional[StatefulStaleMetadataRemovalConfig]`.
|
||||
|
||||
Examples:
|
||||
- The `KafkaSourceConfig`
|
||||
Example:
|
||||
|
||||
```python
|
||||
from typing import List, Optional
|
||||
import pydantic
|
||||
from datahub.ingestion.source.state.stale_entity_removal_handler import StatefulStaleMetadataRemovalConfig
|
||||
from datahub.ingestion.source.state.stateful_ingestion_base import (
|
||||
from datahub.ingestion.source.state.stale_entity_removal_handler import (
|
||||
StatefulStaleMetadataRemovalConfig,
|
||||
StatefulIngestionConfigBase,
|
||||
)
|
||||
|
||||
class KafkaSourceConfig(StatefulIngestionConfigBase):
|
||||
class MySourceConfig(StatefulIngestionConfigBase):
|
||||
# ...<other config params>...
|
||||
|
||||
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
|
||||
```
|
||||
|
||||
### 3. Modifying the SourceReport
|
||||
The report class of the source should inherit from `StaleEntityRemovalSourceReport` whose definition is shown below.
|
||||
```python
|
||||
from typing import List
|
||||
from dataclasses import dataclass, field
|
||||
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionReport
|
||||
@dataclass
|
||||
class StaleEntityRemovalSourceReport(StatefulIngestionReport):
|
||||
soft_deleted_stale_entities: List[str] = field(default_factory=list)
|
||||
### 2. Modify the source report
|
||||
|
||||
def report_stale_entity_soft_deleted(self, urn: str) -> None:
|
||||
self.soft_deleted_stale_entities.append(urn)
|
||||
The report class of the source should inherit from `StaleEntityRemovalSourceReport` instead of `SourceReport`.
|
||||
|
||||
```python
|
||||
from datahub.ingestion.source.state.stale_entity_removal_handler import (
|
||||
StaleEntityRemovalSourceReport,
|
||||
)
|
||||
|
||||
@dataclass
|
||||
class MySourceReport(StatefulIngestionReport):
|
||||
# <other fields here>
|
||||
pass
|
||||
```
|
||||
|
||||
Examples:
|
||||
* The `KafkaSourceReport`
|
||||
```python
|
||||
from dataclasses import dataclass
|
||||
from datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalSourceReport
|
||||
@dataclass
|
||||
class KafkaSourceReport(StaleEntityRemovalSourceReport):
|
||||
# <rest of kafka source report specific impl
|
||||
```
|
||||
### 3. Modify the source
|
||||
|
||||
### 4. Modifying the Source
|
||||
The source must inherit from `StatefulIngestionSourceBase`.
|
||||
1. The source must inherit from `StatefulIngestionSourceBase` instead of `Source`.
|
||||
2. The source should contain a custom `get_workunit_processors` method.
|
||||
|
||||
#### 4.1 Instantiate StaleEntityRemovalHandler in the `__init__` method of the source.
|
||||
|
||||
Examples:
|
||||
1. The `KafkaSource`
|
||||
```python
|
||||
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionSourceBase
|
||||
from datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalHandler
|
||||
class KafkaSource(StatefulIngestionSourceBase):
|
||||
def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
|
||||
# <Rest of KafkaSource initialization>
|
||||
# Create and register the stateful ingestion stale entity removal handler.
|
||||
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
|
||||
source=self,
|
||||
config=self.source_config,
|
||||
state_type_class=KafkaCheckpointState,
|
||||
pipeline_name=self.ctx.pipeline_name,
|
||||
run_id=self.ctx.run_id,
|
||||
)
|
||||
```
|
||||
#### 4.2 Adding entities from current run to the state object.
|
||||
Use the `add_entity_to_state` method of the `StaleEntityRemovalHandler`.
|
||||
|
||||
Examples:
|
||||
```python
|
||||
# Kafka
|
||||
self.stale_entity_removal_handler.add_entity_to_state(
|
||||
type="topic",
|
||||
urn=topic_urn,)
|
||||
class MySource(StatefulIngestionSourceBase):
|
||||
def __init__(self, config: MySourceConfig, ctx: PipelineContext):
|
||||
super().__init__(config, ctx)
|
||||
|
||||
# DBT
|
||||
self.stale_entity_removal_handler.add_entity_to_state(
|
||||
type="dataset",
|
||||
urn=node_datahub_urn
|
||||
)
|
||||
self.stale_entity_removal_handler.add_entity_to_state(
|
||||
type="assertion",
|
||||
urn=node_datahub_urn,
|
||||
)
|
||||
```
|
||||
self.config = config
|
||||
self.report = MySourceReport()
|
||||
|
||||
#### 4.3 Emitting soft-delete workunits associated with the stale entities.
|
||||
```python
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
#
|
||||
# Emit the rest of the workunits for the source.
|
||||
# NOTE: Populating the current state happens during the execution of this code.
|
||||
# ...
|
||||
|
||||
# Clean up stale entities at the end
|
||||
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()
|
||||
# other initialization code here
|
||||
|
||||
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
|
||||
return [
|
||||
*super().get_workunit_processors(),
|
||||
StaleEntityRemovalHandler.create(
|
||||
self, self.config, self.ctx
|
||||
).workunit_processor,
|
||||
]
|
||||
|
||||
# other methods here
|
||||
```
|
||||
|
||||
## Adding Redundant Run Elimination to a Source
|
||||
@ -168,12 +83,13 @@ as snowflake usage, bigquery usage etc.). It typically involves expensive and lo
|
||||
run elimination to a new source to prevent the expensive reruns for the same time range(potentially due to a user
|
||||
error or a scheduler malfunction), the following steps
|
||||
are required.
|
||||
|
||||
1. Update the `SourceConfig`
|
||||
2. Update the `SourceReport`
|
||||
3. Modify the `Source` to
|
||||
1. Instantiate the RedundantRunSkipHandler object.
|
||||
2. Check if the current run should be skipped.
|
||||
3. Update the state for the current run(start & end times).
|
||||
3. Modify the `Source` to
|
||||
1. Instantiate the RedundantRunSkipHandler object.
|
||||
2. Check if the current run should be skipped.
|
||||
3. Update the state for the current run(start & end times).
|
||||
|
||||
The [datahub.ingestion.source.state.redundant_run_skip_handler](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py)
|
||||
modules provides the supporting infrastructure required for all the steps described above.
|
||||
@ -181,11 +97,15 @@ modules provides the supporting infrastructure required for all the steps descri
|
||||
NOTE: The handler currently uses a simple state,
|
||||
the [BaseUsageCheckpointState](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/usage_common_state.py),
|
||||
across all sources it supports (unlike the StaleEntityRemovalHandler).
|
||||
|
||||
### 1. Modifying the SourceConfig
|
||||
|
||||
The `SourceConfig` must inherit from the [StatefulRedundantRunSkipConfig](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py#L23) class.
|
||||
|
||||
Examples:
|
||||
|
||||
1. Snowflake Usage
|
||||
|
||||
```python
|
||||
from datahub.ingestion.source.state.redundant_run_skip_handler import (
|
||||
StatefulRedundantRunSkipConfig,
|
||||
@ -193,27 +113,36 @@ from datahub.ingestion.source.state.redundant_run_skip_handler import (
|
||||
class SnowflakeStatefulIngestionConfig(StatefulRedundantRunSkipConfig):
|
||||
pass
|
||||
```
|
||||
|
||||
### 2. Modifying the SourceReport
|
||||
|
||||
The `SourceReport` must inherit from the [StatefulIngestionReport](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py#L102) class.
|
||||
Examples:
|
||||
|
||||
1. Snowflake Usage
|
||||
|
||||
```python
|
||||
@dataclass
|
||||
class SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport):
|
||||
# <members specific to snowflake usage report>
|
||||
```
|
||||
|
||||
### 3. Modifying the Source
|
||||
|
||||
The source must inherit from `StatefulIngestionSourceBase`.
|
||||
|
||||
#### 3.1 Instantiate RedundantRunSkipHandler in the `__init__` method of the source.
|
||||
|
||||
The source should instantiate an instance of the `RedundantRunSkipHandler` in its `__init__` method.
|
||||
Examples:
|
||||
Snowflake Usage
|
||||
|
||||
```python
|
||||
from datahub.ingestion.source.state.redundant_run_skip_handler import (
|
||||
RedundantRunSkipHandler,
|
||||
)
|
||||
class SnowflakeUsageSource(StatefulIngestionSourceBase):
|
||||
|
||||
|
||||
def __init__(self, config: SnowflakeUsageConfig, ctx: PipelineContext):
|
||||
super(SnowflakeUsageSource, self).__init__(config, ctx)
|
||||
self.config: SnowflakeUsageConfig = config
|
||||
@ -226,10 +155,13 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase):
|
||||
run_id=self.ctx.run_id,
|
||||
)
|
||||
```
|
||||
|
||||
#### 3.2 Checking if the current run should be skipped.
|
||||
|
||||
The sources can query if the current run should be skipped using `should_skip_this_run` method of `RedundantRunSkipHandler`. This should done from the `get_workunits` method, before doing any other work.
|
||||
|
||||
Example code:
|
||||
|
||||
```python
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
# Skip a redundant run
|
||||
@ -239,10 +171,13 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
return
|
||||
# Generate the workunits.
|
||||
```
|
||||
|
||||
#### 3.3 Updating the state for the current run.
|
||||
|
||||
The source should use the `update_state` method of `RedundantRunSkipHandler` to update the current run's state if the run has not been skipped. This step can be performed in the `get_workunits` if the run has not been skipped.
|
||||
|
||||
Example code:
|
||||
|
||||
```python
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
# Skip a redundant run
|
||||
@ -250,7 +185,7 @@ Example code:
|
||||
cur_start_time_millis=self.config.start_time
|
||||
):
|
||||
return
|
||||
|
||||
|
||||
# Generate the workunits.
|
||||
# <code for generating the workunits>
|
||||
# Update checkpoint state for this run.
|
||||
@ -258,4 +193,4 @@ Example code:
|
||||
start_time_millis=self.config.start_time,
|
||||
end_time_millis=self.config.end_time,
|
||||
)
|
||||
```
|
||||
```
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user