Fix #10344 - Use ingestionPipeline retries when deploying (#12926)

This commit is contained in:
Pere Miquel Brull 2023-08-23 11:22:54 +02:00 committed by GitHub
parent cc0653adb1
commit 4862b0df28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 150 additions and 17 deletions

View File

@ -52,3 +52,8 @@ CREATE TABLE IF NOT EXISTS search_index_entity (
PRIMARY KEY (id),
UNIQUE (fqnHash)
);
-- We were hardcoding retries to 0. Since we are now using the IngestionPipeline to set them, keep existing ones to 0.
UPDATE ingestion_pipeline_entity
SET json = JSON_REPLACE(json, '$.airflowConfig.retries', 0)
WHERE JSON_EXTRACT(json, '$.airflowConfig.retries') IS NOT NULL;

View File

@ -52,3 +52,7 @@ CREATE TABLE IF NOT EXISTS search_index_entity (
PRIMARY KEY (id),
UNIQUE (fqnHash)
);
-- We were hardcoding retries to 0. Since we are now using the IngestionPipeline to set them, keep existing ones to 0.
UPDATE ingestion_pipeline_entity
SET json = jsonb_set(json::jsonb, '{airflowConfig.retries}', '0', true);

View File

@ -353,7 +353,7 @@ def build_dag(
python_callable=workflow_fn,
op_kwargs={"workflow_config": workflow_config},
# There's no need to retry if we have had an error. Wait until the next schedule or manual rerun.
retries=0,
retries=ingestion_pipeline.airflowConfig.retries or 0,
# each DAG will call its own OpenMetadataWorkflowConfig
on_failure_callback=partial(send_failed_status_callback, workflow_config),
# Add tag and ownership to easily identify DAGs generated by OM

View File

@ -74,7 +74,7 @@
"retries": {
"description": "Retry pipeline in case of failure.",
"type": "integer",
"default": 3
"default": 0
},
"retryDelay": {
"description": "Delay between retries in seconds.",

View File

@ -28,6 +28,7 @@ const RETRY_TIMES = 4;
const BASE_WAIT_TIME = 20000;
const ADMIN = 'admin';
const RETRIES_COUNT = 4;
const TEAM_TYPES = ['BusinessUnit', 'Department', 'Division', 'Group'];
@ -168,7 +169,7 @@ export const handleIngestionRetry = (
checkSuccessState();
};
export const scheduleIngestion = () => {
export const scheduleIngestion = (hasRetryCount = true) => {
interceptURL(
'POST',
'/api/v1/services/ingestionPipelines',
@ -187,6 +188,11 @@ export const scheduleIngestion = () => {
// Schedule & Deploy
cy.get('[data-testid="cron-type"]').should('be.visible').click();
cy.get('.ant-select-item-option-content').contains('Hour').click();
if (hasRetryCount) {
cy.get('#retries').scrollIntoView().clear().type(RETRIES_COUNT);
}
cy.get('[data-testid="deploy-button"]').should('be.visible').click();
verifyResponseStatusCode('@createIngestionPipelines', 201);

View File

@ -134,7 +134,7 @@ describe('Data Quality and Profiler should work properly', () => {
.should('be.visible')
.click();
scheduleIngestion();
scheduleIngestion(false);
cy.wait('@deployIngestion').then(() => {
cy.get('[data-testid="view-service-button"]')
@ -181,7 +181,7 @@ describe('Data Quality and Profiler should work properly', () => {
.scrollIntoView()
.should('be.visible');
cy.get('[data-testid="add-ingestion-button"]').should('be.visible').click();
scheduleIngestion();
scheduleIngestion(false);
cy.get('[data-testid="success-line"]')
.scrollIntoView()

View File

@ -92,3 +92,10 @@ $$section
Optional configuration to soft delete `dashboards` in OpenMetadata if the source `dashboards` are deleted. After deleting, all the associated entities like lineage, etc., with that `dashboard` will be deleted.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -349,4 +349,10 @@ $$section
### Query Parsing Timeout Limit $(id="parsingTimeoutLimit")
Specify the timeout limit for parsing the sql queries to perform the lineage analysis.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -47,4 +47,10 @@ $$section
### Query Parsing Timeout Limit $(id="parsingTimeoutLimit")
Specify the timeout limit for parsing the sql queries to perform the lineage analysis.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -126,4 +126,10 @@ $$section
### View Definition Parsing Timeout Limit $(id="viewParsingTimeoutLimit")
Specify the timeout limit for parsing the view definition sql queries to perform the lineage analysis.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -103,3 +103,9 @@ Set the `Auto Tag PII` toggle to control whether to automatically tag columns th
If `Ingest Sample Data` is enabled, OpenMetadata will leverage machine learning to infer which column may contain PII sensitive data. If disabled, OpenMetadata will infer this information from the column name.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -44,4 +44,10 @@ For example: `query_text not ilike '--- metabase query %'`
Checkout [this](https://docs.open-metadata.org/connectors/ingestion/workflows/usage/filter-query-set) document for further examples on filter conditions.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -38,3 +38,9 @@ $$section
Optional configuration to soft delete `topics` in OpenMetadata if the source `topics` are deleted. After deleting, all the associated entities like lineage, etc., with that `topic` will be deleted.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -56,4 +56,10 @@ $$section
### Enable Debug Logs $(id="verifyCerts")
This indicates whether to verify certificates when using SSL connection to Elasticsearch. It's ignored by default and is set to true. Ensure that you send the certificates in the property `CA Certificates`.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -28,4 +28,10 @@ $$section
### Recreate Index $(id="recreateIndex")
This option if enabled, will delete the existing indexes and create them again.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -9,3 +9,9 @@ $$section
Set the `Enable Debug Log` toggle to set the logging level of the process to debug. You can check these logs in the Ingestion tab of the service and dig deeper into any errors you might find.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -32,3 +32,9 @@ $$section
Optional configuration to soft delete ML Models in OpenMetadata if the source models are deleted. After deleting, all the associated entities like lineage, etc., with that ML Model will be deleted.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -54,3 +54,9 @@ $$section
Optional configuration to soft delete `pipelines` in OpenMetadata if the source `pipelines` are deleted. After deleting, all the associated entities like lineage, etc., with that `pipeline` will be deleted.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -26,3 +26,9 @@ $$section
Set the `Enable Debug Log` toggle to set the logging level of the process to debug. You can check these logs in the Ingestion tab of the service and dig deeper into any errors you might find.
$$
$$section
### Number of Retries $(id="retries")
Times to retry the workflow in case it ends with a failure.
$$

View File

@ -11,7 +11,7 @@
* limitations under the License.
*/
import { Typography } from 'antd';
import { Form, Input, Typography } from 'antd';
import IngestionWorkflowForm from 'components/IngestionWorkflowForm/IngestionWorkflowForm';
import { LOADING_STATE } from 'enums/common.enum';
import { isUndefined, omit, trim } from 'lodash';
@ -35,7 +35,10 @@ import {
import SuccessScreen from '../common/success-screen/SuccessScreen';
import IngestionStepper from '../IngestionStepper/IngestionStepper.component';
import DeployIngestionLoaderModal from '../Modals/DeployIngestionLoaderModal/DeployIngestionLoaderModal';
import { AddIngestionProps } from './IngestionWorkflow.interface';
import {
AddIngestionProps,
WorkflowExtraConfig,
} from './IngestionWorkflow.interface';
import ScheduleInterval from './Steps/ScheduleInterval';
const AddIngestion = ({
@ -69,11 +72,12 @@ const AddIngestion = ({
getIngestionFrequency(pipelineType)
);
const { sourceConfig, ingestionName } = useMemo(
const { sourceConfig, ingestionName, retries } = useMemo(
() => ({
sourceConfig: data?.sourceConfig.config,
ingestionName:
data?.name ?? getIngestionName(serviceData.name, pipelineType),
retries: data?.airflowConfig.retries ?? 0,
}),
[data, pipelineType, serviceData]
);
@ -116,7 +120,7 @@ const AddIngestion = ({
handleNext(2);
};
const createNewIngestion = () => {
const createNewIngestion = (extraData: WorkflowExtraConfig) => {
const { name = '', enableDebugLog, ...rest } = workflowData ?? {};
const ingestionName = trim(name);
setSaveState(LOADING_STATE.WAITING);
@ -130,6 +134,7 @@ const AddIngestion = ({
airflowConfig: {
scheduleInterval,
startDate: date,
retries: extraData.retries,
},
loggerLevel: enableDebugLog ? LogLevels.Debug : LogLevels.Info,
name: ingestionName,
@ -168,13 +173,14 @@ const AddIngestion = ({
}
};
const updateIngestion = () => {
const updateIngestion = (extraData: WorkflowExtraConfig) => {
if (data) {
const updatedData: IngestionPipeline = {
...data,
airflowConfig: {
...data.airflowConfig,
scheduleInterval,
retries: extraData.retries,
},
loggerLevel: workflowData?.enableDebugLog
? LogLevels.Debug
@ -213,11 +219,13 @@ const AddIngestion = ({
});
};
const handleScheduleIntervalDeployClick = () => {
const handleScheduleIntervalDeployClick = (
extraData: WorkflowExtraConfig
) => {
if (status === FormSubmitType.ADD) {
createNewIngestion();
createNewIngestion(extraData);
} else {
updateIngestion();
updateIngestion(extraData);
}
};
@ -288,8 +296,20 @@ const AddIngestion = ({
}
onBack={() => handlePrev(1)}
onChange={(data) => setScheduleInterval(data)}
onDeploy={handleScheduleIntervalDeployClick}
/>
onDeploy={handleScheduleIntervalDeployClick}>
<Form.Item
className="m-t-xs"
colon={false}
initialValue={retries}
label={t('label.number-of-retries')}
name="retries">
<Input
min={0}
type="number"
onFocus={() => onFocus('root/retries')}
/>
</Form.Item>
</ScheduleInterval>
)}
{activeIngestionStep > 2 && handleViewServiceClick && (

View File

@ -12,6 +12,7 @@
*/
import { LoadingState, ServicesUpdateRequest } from 'Models';
import { ReactNode } from 'react';
import { FormSubmitType } from '../../enums/form.enum';
import { ServiceCategory } from '../../enums/service.enum';
import { CreateIngestionPipeline } from '../../generated/api/services/ingestionPipelines/createIngestionPipeline';
@ -56,7 +57,12 @@ export type ScheduleIntervalProps = {
scheduleInterval: string;
includePeriodOptions?: string[];
submitButtonLabel: string;
children?: ReactNode;
disabledCronChange?: boolean;
onBack: () => void;
onDeploy: () => void;
onDeploy: (values: WorkflowExtraConfig) => void;
};
export interface WorkflowExtraConfig {
retries: number;
}

View File

@ -28,11 +28,15 @@ const ScheduleInterval = ({
scheduleInterval,
status,
submitButtonLabel,
children,
}: ScheduleIntervalProps) => {
const { t } = useTranslation();
return (
<Form data-testid="schedule-intervel-container" onFinish={onDeploy}>
<Form
data-testid="schedule-intervel-container"
layout="vertical"
onFinish={onDeploy}>
<CronEditor
disabledCronChange={disabledCronChange}
includePeriodOptions={includePeriodOptions}
@ -40,6 +44,8 @@ const ScheduleInterval = ({
onChange={onChange}
/>
{children}
<Col className="d-flex justify-end mt-4" span={24}>
<Button
className="m-r-xs"

View File

@ -612,6 +612,7 @@
"november": "November",
"null": "Null",
"number-of-object-plural": "Number of Objects",
"number-of-retries": "Number of Retries",
"number-of-rows": "Number of rows",
"object-plural": "Objects",
"october": "October",

View File

@ -612,6 +612,7 @@
"november": "Noviembre",
"null": "Nulo",
"number-of-object-plural": "Número de objetos",
"number-of-retries": "Number of Retries",
"number-of-rows": "Número de filas",
"object-plural": "Objects",
"october": "Octubre",

View File

@ -612,6 +612,7 @@
"november": "Novembre",
"null": "Null",
"number-of-object-plural": "Nombre d'Objects",
"number-of-retries": "Number of Retries",
"number-of-rows": "Nombre de lignes",
"object-plural": "Objects",
"october": "Octobre",

View File

@ -612,6 +612,7 @@
"november": "11月",
"null": "Null",
"number-of-object-plural": "Number of Objects",
"number-of-retries": "Number of Retries",
"number-of-rows": "行数",
"object-plural": "Objects",
"october": "10月",

View File

@ -612,6 +612,7 @@
"november": "Novembro",
"null": "Nulo",
"number-of-object-plural": "Number of Objects",
"number-of-retries": "Number of Retries",
"number-of-rows": "Número de linhas",
"object-plural": "Objects",
"october": "Outubro",

View File

@ -612,6 +612,7 @@
"november": "Ноябрь",
"null": "Пустые значения",
"number-of-object-plural": "Количество объектов",
"number-of-retries": "Number of Retries",
"number-of-rows": "Количество строк",
"object-plural": "Объекты",
"october": "Октябрь",

View File

@ -612,6 +612,7 @@
"november": "十一月",
"null": "Null",
"number-of-object-plural": "对象个数",
"number-of-retries": "Number of Retries",
"number-of-rows": "行数",
"object-plural": "Objects",
"october": "十月",