From 104e78776dfb9cd3b9a10063aadb54dcf14d65d1 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 18 Mar 2024 13:59:01 -0700 Subject: [PATCH] fix(ingest): only auto-enable stateful ingestion if pipeline name is set (#10075) --- docs/how/updating-datahub.md | 2 +- .../ingestion/source/state/stateful_ingestion_base.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 9d46fe606f..cc8de2b541 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -20,7 +20,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ### Breaking Changes -- #9934 - Stateful ingestion is now enabled by default if datahub-rest sink is used or if a `datahub_api` is specified. It will still be disabled by default when any other sink type is used. +- #9934 and #10075 - Stateful ingestion is now enabled by default if a `pipeline_name` is set and either a datahub-rest sink or `datahub_api` is specified. It will still be disabled by default when any other sink type is used or if there is no pipeline name set. - #10002 - The `DataHubGraph` client no longer makes a request to the backend during initialization. If you want to preserve the old behavior, call `graph.test_connection()` after constructing the client. ### Potential Downtime diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index 521f8f5ee0..4e9e1425a9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -53,7 +53,7 @@ class StatefulIngestionConfig(ConfigModel): enabled: bool = Field( default=False, description="Whether or not to enable stateful ingest. " - "Default: True if datahub-rest sink is used or if a `datahub_api` is specified, otherwise False", + "Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False", ) max_checkpoint_state_size: pydantic.PositiveInt = Field( default=2**24, # 16 MB @@ -233,9 +233,13 @@ class StateProviderWrapper: IngestionCheckpointingProviderBase ] = None - if self.stateful_ingestion_config is None and self.ctx.graph: + if ( + self.stateful_ingestion_config is None + and self.ctx.graph + and self.ctx.pipeline_name + ): logger.info( - "Stateful ingestion got enabled by default, as datahub-rest sink is used or `datahub_api` is specified" + "Stateful ingestion will be automatically enabled, as datahub-rest sink is used or `datahub_api` is specified" ) self.stateful_ingestion_config = StatefulIngestionConfig( enabled=True,