FIX #16284 - Toggle if we want to raise workflow errors (#20969)

* FIX #16284 - Toggle if we want to raise workflow errors

* schema

* schema

* move prop

* fix

* move prop

* improve error handling

* Update openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Add the `Raise on Error` option to the ingestion schedule step

* Revert "Update openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java"

This reverts commit 985b73513a59695c6bb39ad41c2d273bbf4e5d22.

* Update the tests

* Fix sonar issue

---------

Co-authored-by: Aniket Katkar <aniketkatkar97@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Pere Miquel Brull 2025-04-29 08:19:13 +02:00
parent 8e871eac1f
commit 10b408db75
64 changed files with 380 additions and 121 deletions

View File

@ -16,6 +16,7 @@ import sys
import traceback
from pathlib import Path
from metadata.cli.common import execute_workflow
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
@ -46,7 +47,4 @@ def run_classification(config_path: Path) -> None:
)
sys.exit(1)
workflow.execute()
workflow.stop()
workflow.print_status()
workflow.raise_from_status()
execute_workflow(workflow=workflow, config_dict=config_dict)

View File

@ -0,0 +1,26 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Handle workflow execution
"""
from typing import Any, Dict
from metadata.workflow.base import BaseWorkflow
def execute_workflow(workflow: BaseWorkflow, config_dict: Dict[str, Any]) -> None:
"""Execute the workflow and raise if needed"""
workflow.execute()
workflow.print_status()
workflow.stop()
if config_dict.get("workflowConfig", {}).get("raiseOnError", True):
workflow.raise_from_status()

View File

@ -16,6 +16,7 @@ import sys
import traceback
from pathlib import Path
from metadata.cli.common import execute_workflow
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
@ -48,7 +49,4 @@ def run_test(config_path: Path) -> None:
)
sys.exit(1)
workflow.execute()
workflow.stop()
workflow.print_status()
workflow.raise_from_status()
execute_workflow(workflow=workflow, config_dict=workflow_config_dict)

View File

@ -16,6 +16,7 @@ import sys
import traceback
from pathlib import Path
from metadata.cli.common import execute_workflow
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
@ -46,7 +47,4 @@ def run_ingest(config_path: Path) -> None:
)
sys.exit(1)
workflow.execute()
workflow.stop()
workflow.print_status()
workflow.raise_from_status()
execute_workflow(workflow=workflow, config_dict=config_dict)

View File

@ -16,6 +16,7 @@ import sys
import traceback
from pathlib import Path
from metadata.cli.common import execute_workflow
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
@ -48,7 +49,4 @@ def run_profiler(config_path: Path) -> None:
)
sys.exit(1)
workflow.execute()
workflow.stop()
workflow.print_status()
workflow.raise_from_status()
execute_workflow(workflow=workflow, config_dict=workflow_config_dict)

View File

@ -16,6 +16,7 @@ import sys
import traceback
from pathlib import Path
from metadata.cli.common import execute_workflow
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
@ -44,7 +45,4 @@ def run_usage(config_path: Path) -> None:
WorkflowInitErrorHandler.print_init_error(exc, config_dict, PipelineType.usage)
sys.exit(1)
workflow.execute()
workflow.stop()
workflow.print_status()
workflow.raise_from_status()
execute_workflow(workflow=workflow, config_dict=config_dict)

View File

@ -219,7 +219,14 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
if self.workflow_config.successThreshold <= self.calculate_success() < 100:
pipeline_state = PipelineState.partialSuccess
# Any unhandled exception breaking the workflow should update the status
# If there's any steps that should raise a failed status,
# raise it here to set the pipeline status as failed
try:
self.raise_from_status_internal()
except WorkflowExecutionError:
pipeline_state = PipelineState.failed
# Any unhandled exception should blow up the execution
except Exception as err:
pipeline_state = PipelineState.failed
raise err

View File

@ -17,7 +17,6 @@ from datetime import datetime
from enum import Enum
from typing import Optional, Tuple
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
PipelineState,
@ -129,13 +128,8 @@ class WorkflowStatusMixin:
def raise_from_status(self, raise_warnings=False):
"""
Method to raise error if failed execution
and updating Ingestion Pipeline Status
"""
try:
self.raise_from_status_internal(raise_warnings)
except WorkflowExecutionError as err:
self.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def result_status(self) -> WorkflowResultStatus:
"""

View File

@ -18,6 +18,7 @@ from openmetadata_managed_apis.utils.logger import set_operator_logger
from openmetadata_managed_apis.workflows.ingestion.common import (
build_dag,
build_workflow_config_property,
execute_workflow,
)
from metadata.generated.schema.entity.applications.configuration.applicationConfig import (
@ -36,7 +37,9 @@ from metadata.generated.schema.metadataIngestion.applicationPipeline import (
from metadata.workflow.application import ApplicationWorkflow
def application_workflow(workflow_config: OpenMetadataApplicationConfig):
def application_workflow(
workflow_config: OpenMetadataApplicationConfig,
):
"""
Task that creates and runs the ingestion workflow.
@ -52,11 +55,7 @@ def application_workflow(workflow_config: OpenMetadataApplicationConfig):
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = ApplicationWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
execute_workflow(workflow, workflow_config)
def build_application_workflow_config(

View File

@ -15,7 +15,11 @@ import json
from airflow import DAG
from openmetadata_managed_apis.utils.logger import set_operator_logger
from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source
from openmetadata_managed_apis.workflows.ingestion.common import (
build_dag,
build_source,
execute_workflow,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
@ -30,7 +34,9 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.workflow.classification import AutoClassificationWorkflow
def auto_classification_workflow(workflow_config: OpenMetadataWorkflowConfig):
def auto_classification_workflow(
workflow_config: OpenMetadataWorkflowConfig,
):
"""
Task that creates and runs the auto classification workflow.
@ -46,11 +52,7 @@ def auto_classification_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = AutoClassificationWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
execute_workflow(workflow, workflow_config)
def build_auto_classification_workflow_config(

View File

@ -37,6 +37,7 @@ from metadata.generated.schema.metadataIngestion.application import (
from metadata.generated.schema.type.basic import Timestamp, Uuid
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.workflow.base import BaseWorkflow
# pylint: disable=ungrouped-imports
try:
@ -192,6 +193,19 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
)
def execute_workflow(
workflow: BaseWorkflow, workflow_config: OpenMetadataWorkflowConfig
) -> None:
"""
Execute the workflow and handle the status
"""
workflow.execute()
workflow.print_status()
workflow.stop()
if workflow_config.workflowConfig.raiseOnError:
workflow.raise_from_status()
def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""
Task that creates and runs the ingestion workflow.
@ -208,11 +222,7 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = MetadataWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
execute_workflow(workflow, workflow_config)
def build_workflow_config_property(
@ -225,6 +235,7 @@ def build_workflow_config_property(
"""
return WorkflowConfig(
loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO,
raiseOnError=ingestion_pipeline.raiseOnError,
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
)
@ -371,7 +382,9 @@ def build_dag(
CustomPythonOperator(
task_id=task_name,
python_callable=workflow_fn,
op_kwargs={"workflow_config": workflow_config},
op_kwargs={
"workflow_config": workflow_config,
},
# There's no need to retry if we have had an error. Wait until the next schedule or manual rerun.
retries=ingestion_pipeline.airflowConfig.retries or 0,
# each DAG will call its own OpenMetadataWorkflowConfig

View File

@ -42,7 +42,7 @@ def build_dbt_workflow_config(
"""
source = build_source(ingestion_pipeline)
source.type = f"dbt" # Mark the source as dbt
source.type = "dbt" # Mark the source as dbt
workflow_config = OpenMetadataWorkflowConfig(
source=source,

View File

@ -15,7 +15,11 @@ import json
from airflow import DAG
from openmetadata_managed_apis.utils.logger import set_operator_logger
from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source
from openmetadata_managed_apis.workflows.ingestion.common import (
build_dag,
build_source,
execute_workflow,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
@ -30,7 +34,9 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.workflow.profiler import ProfilerWorkflow
def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
def profiler_workflow(
workflow_config: OpenMetadataWorkflowConfig,
):
"""
Task that creates and runs the profiler workflow.
@ -46,11 +52,7 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = ProfilerWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
execute_workflow(workflow, workflow_config)
def build_profiler_workflow_config(

View File

@ -15,7 +15,11 @@ import json
from airflow import DAG
from openmetadata_managed_apis.utils.logger import set_operator_logger
from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source
from openmetadata_managed_apis.workflows.ingestion.common import (
build_dag,
build_source,
execute_workflow,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
@ -30,7 +34,9 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.workflow.data_quality import TestSuiteWorkflow
def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
def test_suite_workflow(
workflow_config: OpenMetadataWorkflowConfig,
):
"""
Task that creates and runs the test suite workflow.
@ -46,11 +52,7 @@ def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = TestSuiteWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
execute_workflow(workflow, workflow_config)
def build_test_suite_workflow_config(

View File

@ -20,6 +20,7 @@ from openmetadata_managed_apis.workflows.ingestion.common import (
build_dag,
build_source,
build_workflow_config_property,
execute_workflow,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
@ -34,7 +35,9 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.workflow.usage import UsageWorkflow
def usage_workflow(workflow_config: OpenMetadataWorkflowConfig):
def usage_workflow(
workflow_config: OpenMetadataWorkflowConfig,
):
"""
Task that creates and runs the ingestion workflow.
@ -50,11 +53,7 @@ def usage_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = UsageWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
execute_workflow(workflow, workflow_config)
def build_usage_config_from_file(

View File

@ -337,6 +337,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
updateLogLevel(original.getLoggerLevel(), updated.getLoggerLevel());
updateEnabled(original.getEnabled(), updated.getEnabled());
updateDeployed(original.getDeployed(), updated.getDeployed());
updateRaiseOnError(original.getRaiseOnError(), updated.getRaiseOnError());
}
private void updateSourceConfig() {
@ -369,6 +370,12 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
}
}
private void updateRaiseOnError(Boolean origRaiseOnError, Boolean updatedRaiseOnError) {
if (updatedRaiseOnError != null && !origRaiseOnError.equals(updatedRaiseOnError)) {
recordChange("raiseOnError", origRaiseOnError, updatedRaiseOnError);
}
}
private void updateEnabled(Boolean origEnabled, Boolean updatedEnabled) {
if (updatedEnabled != null && !origEnabled.equals(updatedEnabled)) {
recordChange("enabled", origEnabled, updatedEnabled);

View File

@ -26,6 +26,7 @@ public class IngestionPipelineMapper
.withOpenMetadataServerConnection(openMetadataServerConnection)
.withSourceConfig(create.getSourceConfig())
.withLoggerLevel(create.getLoggerLevel())
.withRaiseOnError(create.getRaiseOnError())
.withService(create.getService());
}
}

View File

@ -32,6 +32,11 @@
"description": "Set the logging level for the workflow.",
"$ref": "../../../metadataIngestion/workflow.json#/definitions/logLevels"
},
"raiseOnError": {
"description": "Control if we want to flag the workflow as failed if we encounter any processing errors.",
"type": "boolean",
"default": true
},
"service": {
"description": "Link to the service for which ingestion pipeline is ingesting the metadata.",
"$ref": "../../../type/entityReference.json"

View File

@ -176,6 +176,11 @@
"description": "Set the logging level for the workflow.",
"$ref": "../../../metadataIngestion/workflow.json#/definitions/logLevels"
},
"raiseOnError": {
"description": "Control if we want to flag the workflow as failed if we encounter any processing errors.",
"type": "boolean",
"default": true
},
"deployed": {
"description": "Indicates if the workflow has been successfully deployed to Airflow.",
"type": "boolean",

View File

@ -169,6 +169,11 @@
"$ref": "#/definitions/logLevels",
"default": "INFO"
},
"raiseOnError": {
"description": "Control if we want to flag the workflow as failed if we encounter any processing errors.",
"type": "boolean",
"default": true
},
"successThreshold": {
"title": "Success Threshold",
"description": "The percentage of successfully processed records that must be achieved for the pipeline to be considered successful. Otherwise, the pipeline will be marked as failed.",

View File

@ -268,6 +268,18 @@ class ServiceBaseClass {
await expect(page.locator('[data-testid="cron-type"]')).not.toBeVisible();
await expect(page.locator('#root\\/raiseOnError')).toHaveAttribute(
'aria-checked',
'true'
);
await page.click('#root\\/raiseOnError');
await expect(page.locator('#root\\/raiseOnError')).toHaveAttribute(
'aria-checked',
'false'
);
const deployPipelinePromise = page.waitForRequest(
`/api/v1/services/ingestionPipelines/deploy/**`
);

View File

@ -118,3 +118,8 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -386,3 +386,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -54,3 +54,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -150,3 +150,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -133,3 +133,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -51,3 +51,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -57,3 +57,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -63,3 +63,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -35,3 +35,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -28,3 +28,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -51,3 +51,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -79,3 +79,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -45,3 +45,9 @@ $$section
Times to retry the workflow in case it ends with a failure.
$$
$$section
### Raise on Error $(id="raiseOnError")
Mark the workflow as failed or avoid raising exceptions.
$$

View File

@ -44,6 +44,7 @@ export type TestSuiteIngestionDataType = {
testCases?: string[];
name?: string;
selectAllTestCases?: boolean;
raiseOnError?: boolean;
};
export interface AddTestSuitePipelineProps {

View File

@ -124,6 +124,7 @@ const TestSuiteIngestion: React.FC<TestSuiteIngestionProps> = ({
testCases,
name: ingestionPipeline?.displayName,
selectAllTestCases: !isEmpty(ingestionPipeline) && isUndefined(testCases),
raiseOnError: ingestionPipeline?.raiseOnError ?? true,
};
}, [ingestionPipeline]);
@ -173,6 +174,7 @@ const TestSuiteIngestion: React.FC<TestSuiteIngestionProps> = ({
name: generateUUID(),
loggerLevel: data.enableDebugLog ? LogLevels.Debug : LogLevels.Info,
pipelineType: PipelineType.TestSuite,
raiseOnError: data.raiseOnError ?? true,
service: {
id: testSuite.id ?? '',
type: camelCase(PipelineType.TestSuite),
@ -210,6 +212,7 @@ const TestSuiteIngestion: React.FC<TestSuiteIngestionProps> = ({
...ingestionPipeline?.airflowConfig,
scheduleInterval: data.cron,
},
raiseOnError: data.raiseOnError ?? true,
loggerLevel: data.enableDebugLog ? LogLevels.Debug : LogLevels.Info,
sourceConfig: {
...ingestionPipeline?.sourceConfig,

View File

@ -50,6 +50,10 @@ jest.mock('react-router-dom', () => ({
useHistory: jest.fn().mockImplementation(() => mockUseHistory),
}));
jest.mock('../../../../utils/SchedularUtils', () => ({
getRaiseOnErrorFormField: jest.fn().mockReturnValue({}),
}));
const mockProps: AddTestSuitePipelineProps = {
isLoading: false,
onSubmit: jest.fn(),

View File

@ -27,6 +27,7 @@ import {
FormItemLayout,
} from '../../../../interface/FormUtils.interface';
import { generateFormFields } from '../../../../utils/formUtils';
import { getRaiseOnErrorFormField } from '../../../../utils/SchedularUtils';
import { escapeESReservedCharacters } from '../../../../utils/StringsUtils';
import ScheduleInterval from '../../../Settings/Services/AddIngestion/Steps/ScheduleInterval';
import { WorkflowExtraConfig } from '../../../Settings/Services/AddIngestion/Steps/ScheduleInterval.interface';
@ -105,8 +106,14 @@ const AddTestSuitePipeline = ({
const onFinish = (
values: WorkflowExtraConfig & TestSuiteIngestionDataType
) => {
const { cron, enableDebugLog, testCases, name, selectAllTestCases } =
values;
const {
cron,
enableDebugLog,
testCases,
name,
selectAllTestCases,
raiseOnError,
} = values;
onSubmit({
cron,
enableDebugLog,
@ -115,6 +122,7 @@ const AddTestSuitePipeline = ({
testCases: testCases?.map((testCase: TestCase | string) =>
isString(testCase) ? testCase : testCase.name
),
raiseOnError,
});
};
@ -130,6 +138,8 @@ const AddTestSuitePipeline = ({
}
};
const raiseOnErrorFormField = useMemo(() => getRaiseOnErrorFormField(), []);
return (
<Form.Provider onFormChange={handleFromChange}>
<ScheduleInterval
@ -151,6 +161,8 @@ const AddTestSuitePipeline = ({
}
onBack={onCancel ?? handleCancelBtn}
onDeploy={onFinish}>
<Col span={24}>{generateFormFields([raiseOnErrorFormField])}</Col>
<Col span={24}>
<Row className="add-test-case-container" gutter={[0, 16]}>
<Col span={24}>{generateFormFields(testCaseFormFields)}</Col>
{!selectAllTestCases && (
@ -182,6 +194,7 @@ const AddTestSuitePipeline = ({
</Col>
)}
</Row>
</Col>
</ScheduleInterval>
</Form.Provider>
);

View File

@ -11,7 +11,7 @@
* limitations under the License.
*/
import { Form, Input, Typography } from 'antd';
import { Col, Form, Input, Typography } from 'antd';
import { isEmpty, isUndefined, omit, trim } from 'lodash';
import React, { useMemo, useState } from 'react';
import { useTranslation } from 'react-i18next';
@ -29,12 +29,16 @@ import { IngestionPipeline } from '../../../../generated/entity/services/ingesti
import { useApplicationStore } from '../../../../hooks/useApplicationStore';
import { useFqn } from '../../../../hooks/useFqn';
import { IngestionWorkflowData } from '../../../../interface/service.interface';
import { generateFormFields } from '../../../../utils/formUtils';
import {
getDefaultFilterPropertyValues,
getSuccessMessage,
} from '../../../../utils/IngestionUtils';
import { cleanWorkFlowData } from '../../../../utils/IngestionWorkflowUtils';
import { getScheduleOptionsFromSchedules } from '../../../../utils/SchedularUtils';
import {
getRaiseOnErrorFormField,
getScheduleOptionsFromSchedules,
} from '../../../../utils/SchedularUtils';
import { getIngestionName } from '../../../../utils/ServiceUtils';
import { generateUUID } from '../../../../utils/StringsUtils';
import SuccessScreen from '../../../common/SuccessScreen/SuccessScreen';
@ -108,6 +112,7 @@ const AddIngestion = ({
displayName:
data?.displayName ?? getIngestionName(serviceData.name, pipelineType),
enableDebugLog: data?.loggerLevel === LogLevels.Debug,
raiseOnError: data?.raiseOnError ?? true,
})
);
@ -167,6 +172,7 @@ const AddIngestion = ({
name = '',
enableDebugLog,
displayName,
raiseOnError,
...rest
} = workflowData ?? {};
const ingestionName = trim(name);
@ -183,6 +189,7 @@ const AddIngestion = ({
startDate: date,
retries: extraData.retries,
},
raiseOnError: extraData.raiseOnError ?? true,
loggerLevel: enableDebugLog ? LogLevels.Debug : LogLevels.Info,
name: ingestionName,
displayName: displayName,
@ -234,6 +241,7 @@ const AddIngestion = ({
scheduleInterval: extraData.cron,
retries: extraData.retries,
},
raiseOnError: extraData.raiseOnError ?? true,
displayName: workflowData?.displayName,
loggerLevel: workflowData?.enableDebugLog
? LogLevels.Debug
@ -242,8 +250,12 @@ const AddIngestion = ({
config: {
// clean the data to remove empty fields
...cleanWorkFlowData(
omit(workflowData, ['name', 'enableDebugLog', 'displayName']) ??
{}
omit(workflowData, [
'name',
'enableDebugLog',
'displayName',
'raiseOnError',
]) ?? {}
),
},
},
@ -286,6 +298,11 @@ const AddIngestion = ({
}
};
const raiseOnErrorFormField = useMemo(
() => getRaiseOnErrorFormField(onFocus),
[onFocus]
);
return (
<div data-testid="add-ingestion-container">
<Typography.Title className="font-normal" level={5}>
@ -324,11 +341,15 @@ const AddIngestion = ({
defaultSchedule={DEFAULT_SCHEDULE_CRON_DAILY}
disabled={pipelineType === PipelineType.DataInsight}
includePeriodOptions={periodOptions}
initialData={{ cron: data?.airflowConfig.scheduleInterval }}
initialData={{
cron: data?.airflowConfig.scheduleInterval,
raiseOnError: data?.raiseOnError ?? true,
}}
isEditMode={isEditMode}
status={saveState}
onBack={() => handlePrev(1)}
onDeploy={handleScheduleIntervalDeployClick}>
<Col span={24}>
<Form.Item
colon={false}
initialValue={retries}
@ -340,6 +361,8 @@ const AddIngestion = ({
onFocus={() => onFocus('root/retries')}
/>
</Form.Item>
</Col>
<Col span={24}>{generateFormFields([raiseOnErrorFormField])}</Col>
</ScheduleInterval>
)}

View File

@ -61,6 +61,11 @@ jest.mock('../Ingestion/IngestionWorkflowForm/IngestionWorkflowForm', () => {
return jest.fn().mockImplementation(() => <div>Ingestion workflow form</div>);
});
jest.mock('../../../../utils/SchedularUtils', () => ({
getScheduleOptionsFromSchedules: jest.fn().mockReturnValue([]),
getRaiseOnErrorFormField: jest.fn().mockReturnValue({}),
}));
describe('Test AddIngestion component', () => {
it('AddIngestion component should render', async () => {
const { container } = render(<AddIngestion {...mockAddIngestionProps} />);

View File

@ -49,6 +49,7 @@ export interface WorkflowExtraConfig {
export interface IngestionExtraConfig {
retries?: number;
raiseOnError?: boolean;
}
export interface Combination {

View File

@ -394,7 +394,7 @@ const ScheduleInterval = <T,>({
<Col span={24}>{generateFormFields(formFields)}</Col>
)}
{children && <Col span={24}>{children}</Col>}
{children}
{showActionButtons && (
<Col className="d-flex justify-end" span={24}>

View File

@ -15,14 +15,16 @@ import i18next from 'i18next';
import { StepperStepType } from 'Models';
import { PipelineType } from '../generated/entity/services/ingestionPipelines/ingestionPipeline';
const { t } = i18next;
export const STEPS_FOR_ADD_INGESTION: Array<StepperStepType> = [
{
name: i18next.t('label.configure-entity', {
entity: i18next.t('label.ingestion'),
name: t('label.configure-entity', {
entity: t('label.ingestion'),
}),
step: 1,
},
{ name: i18next.t('label.schedule-interval'), step: 2 },
{ name: t('label.schedule-interval'), step: 2 },
];
export const INGESTION_ACTION_TYPE = {

View File

@ -40,6 +40,10 @@ export interface CreateIngestionPipeline {
*/
owners?: EntityReference[];
pipelineType: PipelineType;
/**
* Control if we want to flag the workflow as failed if we encounter any processing errors.
*/
raiseOnError?: boolean;
/**
* Link to the service for which ingestion pipeline is ingesting the metadata.
*/
@ -687,6 +691,10 @@ export interface Pipeline {
* like endpoints, etc., with that collection will be deleted
*/
markDeletedApiCollections?: boolean;
/**
* Optional value of the ingestion runner name responsible for running the workflow
*/
ingestionRunner?: string;
/**
* List of operations to be performed on the service
*/

View File

@ -88,6 +88,10 @@ export interface IngestionPipeline {
pipelineStatuses?: PipelineStatus;
pipelineType: PipelineType;
provider?: ProviderType;
/**
* Control if we want to flag the workflow as failed if we encounter any processing errors.
*/
raiseOnError?: boolean;
/**
* Link to the service (such as database, messaging, storage services, etc. for which this
* ingestion pipeline ingests the metadata from.
@ -1247,6 +1251,10 @@ export interface Pipeline {
* like endpoints, etc., with that collection will be deleted
*/
markDeletedApiCollections?: boolean;
/**
* Optional value of the ingestion runner name responsible for running the workflow
*/
ingestionRunner?: string;
/**
* List of operations to be performed on the service
*/

View File

@ -4332,6 +4332,10 @@ export interface Pipeline {
* like endpoints, etc., with that collection will be deleted
*/
markDeletedApiCollections?: boolean;
/**
* Optional value of the ingestion runner name responsible for running the workflow
*/
ingestionRunner?: string;
/**
* List of operations to be performed on the service
*/
@ -5412,6 +5416,10 @@ export interface WorkflowConfig {
config?: { [key: string]: any };
loggerLevel?: LogLevels;
openMetadataServerConfig: OpenMetadataConnection;
/**
* Control if we want to flag the workflow as failed if we encounter any processing errors.
*/
raiseOnError?: boolean;
/**
* The percentage of successfully processed records that must be achieved for the pipeline
* to be considered successful. Otherwise, the pipeline will be marked as failed.

View File

@ -122,6 +122,7 @@ export type IngestionWorkflowData = Pipeline & {
name: string;
enableDebugLog?: boolean;
displayName?: string;
raiseOnError?: boolean;
};
export interface IngestionWorkflowFormProps {

View File

@ -1054,6 +1054,7 @@
"query-plural": "Abfragen",
"query-used-in": "Verwendete Abfragen in",
"queued": "Eingereiht",
"raise-on-error": "Fehler beim Ausführen anzeigen",
"range": "Bereich",
"range-condition": "Bereichsbedingung",
"re-assign": "Reassign",

View File

@ -1054,6 +1054,7 @@
"query-plural": "Queries",
"query-used-in": "Query Used In",
"queued": "Queued",
"raise-on-error": "Raise on Error",
"range": "Range",
"range-condition": "Range Condition",
"re-assign": "Reassign",

View File

@ -1054,6 +1054,7 @@
"query-plural": "Consultas",
"query-used-in": "Consulta usada en",
"queued": "Queued",
"raise-on-error": "Mostrar error al ejecutar",
"range": "Rango",
"range-condition": "Condición de Rango",
"re-assign": "Reasignar",

View File

@ -1054,6 +1054,7 @@
"query-plural": "Requêtes",
"query-used-in": "Requêtes Utilisées dans",
"queued": "Queued",
"raise-on-error": "Afficher l'erreur lors de l'exécution",
"range": "Amplitude",
"range-condition": "Condition de Plage",
"re-assign": "Réaffecter",

View File

@ -1054,6 +1054,7 @@
"query-plural": "Consultas",
"query-used-in": "Consulta utilizada en",
"queued": "Queued",
"raise-on-error": "Mostrar erro ao executar",
"range": "Intervalo",
"range-condition": "Condición de Rango",
"re-assign": "Reasignar",

View File

@ -1054,6 +1054,7 @@
"query-plural": "שאילתות",
"query-used-in": "שאילתה שמשמשת ב",
"queued": "Queued",
"raise-on-error": "הצג שגיאה בעת ביצוע",
"range": "טווח",
"range-condition": "תנאי טווח",
"re-assign": "Reassign",

View File

@ -1054,6 +1054,7 @@
"query-plural": "クエリ",
"query-used-in": "Query Used In",
"queued": "Queued",
"raise-on-error": "エラー時に表示",
"range": "Range",
"range-condition": "範囲条件",
"re-assign": "Reassign",

View File

@ -1054,6 +1054,7 @@
"query-plural": "쿼리들",
"query-used-in": "사용된 쿼리",
"queued": "Queued",
"raise-on-error": "오류 발생 시 표시",
"range": "범위",
"range-condition": "Range Condition",
"re-assign": "재할당",

View File

@ -1054,6 +1054,7 @@
"query-plural": "क्वेरी",
"query-used-in": "क्वेरी वापरले",
"queued": "Queued",
"raise-on-error": "त्रुटी आल्यास दर्शवा",
"range": "श्रेणी",
"range-condition": "श्रेणी अट",
"re-assign": "पुन्हा नियुक्त करा",

View File

@ -1054,6 +1054,7 @@
"query-plural": "Query's",
"query-used-in": "Query gebruikt in",
"queued": "Queued",
"raise-on-error": "Fout bij uitvoeren weergeven",
"range": "Bereik",
"range-condition": "Bereikvoorwaarde",
"re-assign": "Opnieuw toewijzen",

View File

@ -1054,6 +1054,7 @@
"query-plural": "پرس و جوها",
"query-used-in": "پرس و جو استفاده شده در",
"queued": "Queued",
"raise-on-error": "نمایش در صورت خطا",
"range": "محدوده",
"range-condition": "شرط محدوده",
"re-assign": "دوباره تخصیص دهید",

View File

@ -1054,6 +1054,7 @@
"query-plural": "Consultas",
"query-used-in": "Consulta Usada em",
"queued": "Queued",
"raise-on-error": "Mostrar erro ao executar",
"range": "Faixa",
"range-condition": "Condição de Intervalo",
"re-assign": "Reassign",

View File

@ -1054,6 +1054,7 @@
"query-plural": "Consultas",
"query-used-in": "Consulta Usada em",
"queued": "Queued",
"raise-on-error": "Mostrar erro ao executar",
"range": "Faixa",
"range-condition": "Condição de Intervalo",
"re-assign": "Reassign",

View File

@ -1054,6 +1054,7 @@
"query-plural": "Запросы",
"query-used-in": "Запрос, используемый в",
"queued": "Queued",
"raise-on-error": "Показывать ошибку при выполнении",
"range": "Диапазон",
"range-condition": "Условие Диапазона",
"re-assign": "Reassign",

View File

@ -1054,6 +1054,7 @@
"query-plural": "การสอบถาม",
"query-used-in": "การสอบถามที่ใช้ใน",
"queued": "Queued",
"raise-on-error": "ยกเลิกการทำงาน",
"range": "ช่วง",
"range-condition": "เงื่อนไขช่วง",
"re-assign": "มอบหมายใหม่",

View File

@ -1054,6 +1054,7 @@
"query-plural": "查询",
"query-used-in": "查询语句被使用在",
"queued": "排队中",
"raise-on-error": "出错时提示",
"range": "范围",
"range-condition": "范围条件",
"re-assign": "重新分配",

View File

@ -31,6 +31,7 @@ import {
DEFAULT_SCHEDULE_CRON_WEEKLY,
} from '../constants/Schedular.constants';
import { CronTypes } from '../enums/Schedular.enum';
import { FieldTypes, FormItemLayout } from '../interface/FormUtils.interface';
export const getScheduleOptionsFromSchedules = (
scheduleOptions: string[]
@ -330,3 +331,22 @@ export const cronValidator = async (_: RuleObject, value: string) => {
return Promise.resolve();
};
export const getRaiseOnErrorFormField = (
onFocus?: (fieldName: string) => void
) => {
return {
name: 'raiseOnError',
label: t('label.raise-on-error'),
type: FieldTypes.SWITCH,
required: false,
formItemProps: {
valuePropName: 'checked',
},
props: {
onFocus: () => onFocus?.('raiseOnError'),
},
formItemLayout: FormItemLayout.HORIZONTAL,
id: 'root/raiseOnError',
};
};