From c24bc870d06722ca0d0c52f364dbb50207847780 Mon Sep 17 00:00:00 2001 From: IceS2 Date: Thu, 12 Dec 2024 10:01:39 +0100 Subject: [PATCH] Parametrizing Flowable configurations and adding a way to handle history cleaning (#18990) --- .../governance/workflows/WorkflowHandler.java | 72 +++++++++++++++---- .../service/jdbi3/CollectionDAO.java | 2 + .../service/jdbi3/SystemRepository.java | 19 +++++ .../resources/settings/SettingsCache.java | 17 +++++ .../configuration/workflowSettings.json | 59 +++++++++++++++ .../json/schema/settings/settings.json | 6 +- 6 files changed, 162 insertions(+), 13 deletions(-) create mode 100644 openmetadata-spec/src/main/resources/json/schema/configuration/workflowSettings.json diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index ae3ecab0ac3..899c4b7f9c1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -2,6 +2,7 @@ package org.openmetadata.service.governance.workflows; import static org.openmetadata.service.governance.workflows.elements.TriggerFactory.getTriggerWorkflowId; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -14,6 +15,7 @@ import org.flowable.common.engine.api.FlowableObjectNotFoundException; import org.flowable.engine.HistoryService; import org.flowable.engine.ProcessEngine; import org.flowable.engine.ProcessEngineConfiguration; +import org.flowable.engine.ProcessEngines; import org.flowable.engine.RepositoryService; import org.flowable.engine.RuntimeService; import org.flowable.engine.TaskService; @@ -23,44 +25,82 @@ import org.flowable.engine.repository.ProcessDefinition; import org.flowable.engine.runtime.Execution; import org.flowable.engine.runtime.ProcessInstance; import org.flowable.task.api.Task; +import org.openmetadata.schema.configuration.WorkflowSettings; +import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.exception.UnhandledServerException; +import org.openmetadata.service.jdbi3.SystemRepository; import org.openmetadata.service.jdbi3.locator.ConnectionType; @Slf4j public class WorkflowHandler { - private final RepositoryService repositoryService; - private final RuntimeService runtimeService; - private final TaskService taskService; - private final HistoryService historyService; + private ProcessEngine processEngine; + private RepositoryService repositoryService; + private RuntimeService runtimeService; + private TaskService taskService; + private HistoryService historyService; private static WorkflowHandler instance; private static volatile boolean initialized = false; private WorkflowHandler(OpenMetadataApplicationConfig config) { ProcessEngineConfiguration processEngineConfiguration = new StandaloneProcessEngineConfiguration() - .setAsyncExecutorActivate(true) - .setAsyncExecutorCorePoolSize(50) - .setAsyncExecutorMaxPoolSize(100) - .setAsyncExecutorThreadPoolQueueSize(1000) - .setAsyncExecutorMaxAsyncJobsDuePerAcquisition(20) .setJdbcUrl(config.getDataSourceFactory().getUrl()) .setJdbcUsername(config.getDataSourceFactory().getUser()) .setJdbcPassword(config.getDataSourceFactory().getPassword()) .setJdbcDriver(config.getDataSourceFactory().getDriverClass()) .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE); - // Add Global Failure Listener - processEngineConfiguration.setEventListeners(List.of(new WorkflowFailureListener())); - if (ConnectionType.MYSQL.label.equals(config.getDataSourceFactory().getDriverClass())) { processEngineConfiguration.setDatabaseType(ProcessEngineConfiguration.DATABASE_TYPE_MYSQL); } else { processEngineConfiguration.setDatabaseType(ProcessEngineConfiguration.DATABASE_TYPE_POSTGRES); } + initializeNewProcessEngine(processEngineConfiguration); + } + + public void initializeNewProcessEngine( + ProcessEngineConfiguration currentProcessEngineConfiguration) { + ProcessEngines.destroy(); + SystemRepository systemRepository = Entity.getSystemRepository(); + WorkflowSettings workflowSettings = systemRepository.getWorkflowSettings(); + + StandaloneProcessEngineConfiguration processEngineConfiguration = + new StandaloneProcessEngineConfiguration(); + + // Setting Database Configuration + processEngineConfiguration + .setJdbcUrl(currentProcessEngineConfiguration.getJdbcUrl()) + .setJdbcUsername(currentProcessEngineConfiguration.getJdbcUsername()) + .setJdbcPassword(currentProcessEngineConfiguration.getJdbcPassword()) + .setJdbcDriver(currentProcessEngineConfiguration.getJdbcDriver()) + .setDatabaseType(currentProcessEngineConfiguration.getDatabaseType()) + .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE); + + // Setting Async Executor Configuration + processEngineConfiguration + .setAsyncExecutorActivate(true) + .setAsyncExecutorCorePoolSize(workflowSettings.getExecutorConfiguration().getCorePoolSize()) + .setAsyncExecutorMaxPoolSize(workflowSettings.getExecutorConfiguration().getMaxPoolSize()) + .setAsyncExecutorThreadPoolQueueSize( + workflowSettings.getExecutorConfiguration().getQueueSize()) + .setAsyncExecutorMaxAsyncJobsDuePerAcquisition( + workflowSettings.getExecutorConfiguration().getTasksDuePerAcquisition()); + + // Setting History CleanUp + processEngineConfiguration + .setEnableHistoryCleaning(true) + .setCleanInstancesEndedAfter( + Duration.ofDays( + workflowSettings.getHistoryCleanUpConfiguration().getCleanAfterNumberOfDays())); + + // Add Global Failure Listener + processEngineConfiguration.setEventListeners(List.of(new WorkflowFailureListener())); + ProcessEngine processEngine = processEngineConfiguration.buildProcessEngine(); + this.processEngine = processEngine; this.repositoryService = processEngine.getRepositoryService(); this.runtimeService = processEngine.getRuntimeService(); this.taskService = processEngine.getTaskService(); @@ -81,6 +121,14 @@ public class WorkflowHandler { throw new UnhandledServerException("WorkflowHandler is not initialized."); } + public ProcessEngineConfiguration getProcessEngineConfiguration() { + if (processEngine != null) { + return processEngine.getProcessEngineConfiguration(); + } else { + return null; + } + } + public void deploy(Workflow workflow) { BpmnXMLConverter bpmnXMLConverter = new BpmnXMLConverter(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index a5938ea9f4a..809a53e5d6b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -67,6 +67,7 @@ import org.openmetadata.schema.auth.PersonalAccessToken; import org.openmetadata.schema.auth.RefreshToken; import org.openmetadata.schema.auth.TokenType; import org.openmetadata.schema.configuration.AssetCertificationSettings; +import org.openmetadata.schema.configuration.WorkflowSettings; import org.openmetadata.schema.dataInsight.DataInsightChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.kpi.Kpi; @@ -5156,6 +5157,7 @@ public interface CollectionDAO { case SEARCH_SETTINGS -> JsonUtils.readValue(json, SearchSettings.class); case ASSET_CERTIFICATION_SETTINGS -> JsonUtils.readValue( json, AssetCertificationSettings.class); + case WORKFLOW_SETTINGS -> JsonUtils.readValue(json, WorkflowSettings.class); case LINEAGE_SETTINGS -> JsonUtils.readValue(json, LineageSettings.class); default -> throw new IllegalArgumentException("Invalid Settings Type " + configType); }; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SystemRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SystemRepository.java index d08b2ca6c33..b854c39a6ee 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SystemRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SystemRepository.java @@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.api.configuration.UiThemePreference; import org.openmetadata.schema.configuration.AssetCertificationSettings; +import org.openmetadata.schema.configuration.WorkflowSettings; import org.openmetadata.schema.email.SmtpSettings; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse; import org.openmetadata.schema.security.client.OpenMetadataJWTClientConfig; @@ -32,6 +33,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.exception.CustomExceptionMessage; import org.openmetadata.service.fernet.Fernet; +import org.openmetadata.service.governance.workflows.WorkflowHandler; import org.openmetadata.service.jdbi3.CollectionDAO.SystemDAO; import org.openmetadata.service.migration.MigrationValidationClient; import org.openmetadata.service.resources.settings.SettingsCache; @@ -119,6 +121,15 @@ public class SystemRepository { .orElse(null); } + public WorkflowSettings getWorkflowSettings() { + Optional oWorkflowSettings = + Optional.ofNullable(getConfigWithKey(SettingsType.WORKFLOW_SETTINGS.value())); + + return oWorkflowSettings + .map(settings -> (WorkflowSettings) settings.getConfigValue()) + .orElse(null); + } + public Settings getEmailConfigInternal() { try { Settings setting = dao.getConfigWithKey(SettingsType.EMAIL_CONFIGURATION.value()); @@ -209,6 +220,13 @@ public class SystemRepository { return (new RestUtil.PutResponse<>(Response.Status.OK, original, ENTITY_UPDATED)).toResponse(); } + private void postUpdate(SettingsType settingsType) { + if (settingsType == SettingsType.WORKFLOW_SETTINGS) { + WorkflowHandler workflowHandler = WorkflowHandler.getInstance(); + workflowHandler.initializeNewProcessEngine(workflowHandler.getProcessEngineConfiguration()); + } + } + public void updateSetting(Settings setting) { try { if (setting.getConfigType() == SettingsType.EMAIL_CONFIGURATION) { @@ -235,6 +253,7 @@ public class SystemRepository { setting.getConfigType().toString(), JsonUtils.pojoToJson(setting.getConfigValue())); // Invalidate Cache SettingsCache.invalidateSettings(setting.getConfigType().value()); + postUpdate(setting.getConfigType()); } catch (Exception ex) { LOG.error("Failing in Updating Setting.", ex); throw new CustomExceptionMessage( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/settings/SettingsCache.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/settings/SettingsCache.java index a6785ddb461..16532c563ce 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/settings/SettingsCache.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/settings/SettingsCache.java @@ -19,6 +19,7 @@ import static org.openmetadata.schema.settings.SettingsType.EMAIL_CONFIGURATION; import static org.openmetadata.schema.settings.SettingsType.LINEAGE_SETTINGS; import static org.openmetadata.schema.settings.SettingsType.LOGIN_CONFIGURATION; import static org.openmetadata.schema.settings.SettingsType.SEARCH_SETTINGS; +import static org.openmetadata.schema.settings.SettingsType.WORKFLOW_SETTINGS; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -35,6 +36,9 @@ import org.openmetadata.schema.api.lineage.LineageLayer; import org.openmetadata.schema.api.lineage.LineageSettings; import org.openmetadata.schema.api.search.SearchSettings; import org.openmetadata.schema.configuration.AssetCertificationSettings; +import org.openmetadata.schema.configuration.ExecutorConfiguration; +import org.openmetadata.schema.configuration.HistoryCleanUpConfiguration; +import org.openmetadata.schema.configuration.WorkflowSettings; import org.openmetadata.schema.email.SmtpSettings; import org.openmetadata.schema.settings.Settings; import org.openmetadata.schema.settings.SettingsType; @@ -144,6 +148,19 @@ public class SettingsCache { systemRepository.createNewSetting(setting); } + // Initialise Workflow Settings + Settings workflowSettings = systemRepository.getConfigWithKey(WORKFLOW_SETTINGS.toString()); + if (workflowSettings == null) { + Settings setting = + new Settings() + .withConfigType(WORKFLOW_SETTINGS) + .withConfigValue( + new WorkflowSettings() + .withExecutorConfiguration(new ExecutorConfiguration()) + .withHistoryCleanUpConfiguration(new HistoryCleanUpConfiguration())); + systemRepository.createNewSetting(setting); + } + Settings lineageSettings = systemRepository.getConfigWithKey(LINEAGE_SETTINGS.toString()); if (lineageSettings == null) { // Only in case a config doesn't exist in DB we insert it diff --git a/openmetadata-spec/src/main/resources/json/schema/configuration/workflowSettings.json b/openmetadata-spec/src/main/resources/json/schema/configuration/workflowSettings.json new file mode 100644 index 00000000000..fe2969cb88d --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/configuration/workflowSettings.json @@ -0,0 +1,59 @@ +{ + "$id": "https://open-metadata.org/schema/configuration/workflowSettings.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "WorkflowSettings", + "description": "This schema defines the Workflow Settings.", + "type": "object", + "javaType": "org.openmetadata.schema.configuration.WorkflowSettings", + "definitions": { + "executorConfiguration": { + "type": "object", + "properties": { + "corePoolSize": { + "type": "integer", + "default": 50, + "description": "Default worker Pool Size. The Workflow Executor by default has this amount of workers." + }, + "maxPoolSize": { + "type": "integer", + "default": 100, + "description": "Maximum worker Pool Size. The Workflow Executor could grow up to this number of workers." + }, + "queueSize": { + "type": "integer", + "default": 1000, + "description": "Amount of Tasks that can be queued to be picked up by the Workflow Executor." + }, + "tasksDuePerAcquisition": { + "type": "integer", + "default": 20, + "description": "The amount of Tasks that the Workflow Executor is able to pick up each time it looks for more." + } + }, + "additionalProperties": false + }, + "historyCleanUpConfiguration": { + "type": "object", + "properties": { + "cleanAfterNumberOfDays": { + "type": "integer", + "default": 7, + "description": "Cleans the Workflow Task that were finished, after given number of days." + } + }, + "additionalProperties": false + } + }, + "properties": { + "executorConfiguration": { + "$ref": "#/definitions/executorConfiguration", + "description": "Used to set up the Workflow Executor Settings." + }, + "historyCleanUpConfiguration": { + "$ref": "#/definitions/historyCleanUpConfiguration", + "description": "Used to set up the History CleanUp Settings." + } + }, + "required": ["allowedClassification", "validityPeriod"], + "additionalProperties": false +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/settings/settings.json b/openmetadata-spec/src/main/resources/json/schema/settings/settings.json index 0811f5e8fc8..5a98c9d9552 100644 --- a/openmetadata-spec/src/main/resources/json/schema/settings/settings.json +++ b/openmetadata-spec/src/main/resources/json/schema/settings/settings.json @@ -32,7 +32,8 @@ "profilerConfiguration", "searchSettings", "assetCertificationSettings", - "lineageSettings" + "lineageSettings", + "workflowSettings" ] } }, @@ -84,6 +85,9 @@ }, { "$ref": "../configuration/lineageSettings.json" + }, + { + "$ref": "../configuration/workflowSettings.json" } ] }