fix(ingest): only auto-enable stateful ingestion if pipeline name is set (#10075)

This commit is contained in:
Harshal Sheth 2024-03-18 13:59:01 -07:00 committed by GitHub
parent 3a4bdef44a
commit 104e78776d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 8 additions and 4 deletions

View File

@ -20,7 +20,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Breaking Changes
- #9934 - Stateful ingestion is now enabled by default if datahub-rest sink is used or if a `datahub_api` is specified. It will still be disabled by default when any other sink type is used.
- #9934 and #10075 - Stateful ingestion is now enabled by default if a `pipeline_name` is set and either a datahub-rest sink or `datahub_api` is specified. It will still be disabled by default when any other sink type is used or if there is no pipeline name set.
- #10002 - The `DataHubGraph` client no longer makes a request to the backend during initialization. If you want to preserve the old behavior, call `graph.test_connection()` after constructing the client.
### Potential Downtime

View File

@ -53,7 +53,7 @@ class StatefulIngestionConfig(ConfigModel):
enabled: bool = Field(
default=False,
description="Whether or not to enable stateful ingest. "
"Default: True if datahub-rest sink is used or if a `datahub_api` is specified, otherwise False",
"Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
)
max_checkpoint_state_size: pydantic.PositiveInt = Field(
default=2**24, # 16 MB
@ -233,9 +233,13 @@ class StateProviderWrapper:
IngestionCheckpointingProviderBase
] = None
if self.stateful_ingestion_config is None and self.ctx.graph:
if (
self.stateful_ingestion_config is None
and self.ctx.graph
and self.ctx.pipeline_name
):
logger.info(
"Stateful ingestion got enabled by default, as datahub-rest sink is used or `datahub_api` is specified"
"Stateful ingestion will be automatically enabled, as datahub-rest sink is used or `datahub_api` is specified"
)
self.stateful_ingestion_config = StatefulIngestionConfig(
enabled=True,