Parametrizing Flowable configurations and adding a way to handle history cleaning (#18990)

This commit is contained in:
IceS2 2024-12-12 10:01:39 +01:00 committed by GitHub
parent 01fb9d1087
commit c24bc870d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 162 additions and 13 deletions

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.governance.workflows;
import static org.openmetadata.service.governance.workflows.elements.TriggerFactory.getTriggerWorkflowId; import static org.openmetadata.service.governance.workflows.elements.TriggerFactory.getTriggerWorkflowId;
import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -14,6 +15,7 @@ import org.flowable.common.engine.api.FlowableObjectNotFoundException;
import org.flowable.engine.HistoryService; import org.flowable.engine.HistoryService;
import org.flowable.engine.ProcessEngine; import org.flowable.engine.ProcessEngine;
import org.flowable.engine.ProcessEngineConfiguration; import org.flowable.engine.ProcessEngineConfiguration;
import org.flowable.engine.ProcessEngines;
import org.flowable.engine.RepositoryService; import org.flowable.engine.RepositoryService;
import org.flowable.engine.RuntimeService; import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService; import org.flowable.engine.TaskService;
@ -23,44 +25,82 @@ import org.flowable.engine.repository.ProcessDefinition;
import org.flowable.engine.runtime.Execution; import org.flowable.engine.runtime.Execution;
import org.flowable.engine.runtime.ProcessInstance; import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.task.api.Task; import org.flowable.task.api.Task;
import org.openmetadata.schema.configuration.WorkflowSettings;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.exception.UnhandledServerException; import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.jdbi3.SystemRepository;
import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.jdbi3.locator.ConnectionType;
@Slf4j @Slf4j
public class WorkflowHandler { public class WorkflowHandler {
private final RepositoryService repositoryService; private ProcessEngine processEngine;
private final RuntimeService runtimeService; private RepositoryService repositoryService;
private final TaskService taskService; private RuntimeService runtimeService;
private final HistoryService historyService; private TaskService taskService;
private HistoryService historyService;
private static WorkflowHandler instance; private static WorkflowHandler instance;
private static volatile boolean initialized = false; private static volatile boolean initialized = false;
private WorkflowHandler(OpenMetadataApplicationConfig config) { private WorkflowHandler(OpenMetadataApplicationConfig config) {
ProcessEngineConfiguration processEngineConfiguration = ProcessEngineConfiguration processEngineConfiguration =
new StandaloneProcessEngineConfiguration() new StandaloneProcessEngineConfiguration()
.setAsyncExecutorActivate(true)
.setAsyncExecutorCorePoolSize(50)
.setAsyncExecutorMaxPoolSize(100)
.setAsyncExecutorThreadPoolQueueSize(1000)
.setAsyncExecutorMaxAsyncJobsDuePerAcquisition(20)
.setJdbcUrl(config.getDataSourceFactory().getUrl()) .setJdbcUrl(config.getDataSourceFactory().getUrl())
.setJdbcUsername(config.getDataSourceFactory().getUser()) .setJdbcUsername(config.getDataSourceFactory().getUser())
.setJdbcPassword(config.getDataSourceFactory().getPassword()) .setJdbcPassword(config.getDataSourceFactory().getPassword())
.setJdbcDriver(config.getDataSourceFactory().getDriverClass()) .setJdbcDriver(config.getDataSourceFactory().getDriverClass())
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE); .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE);
// Add Global Failure Listener
processEngineConfiguration.setEventListeners(List.of(new WorkflowFailureListener()));
if (ConnectionType.MYSQL.label.equals(config.getDataSourceFactory().getDriverClass())) { if (ConnectionType.MYSQL.label.equals(config.getDataSourceFactory().getDriverClass())) {
processEngineConfiguration.setDatabaseType(ProcessEngineConfiguration.DATABASE_TYPE_MYSQL); processEngineConfiguration.setDatabaseType(ProcessEngineConfiguration.DATABASE_TYPE_MYSQL);
} else { } else {
processEngineConfiguration.setDatabaseType(ProcessEngineConfiguration.DATABASE_TYPE_POSTGRES); processEngineConfiguration.setDatabaseType(ProcessEngineConfiguration.DATABASE_TYPE_POSTGRES);
} }
initializeNewProcessEngine(processEngineConfiguration);
}
public void initializeNewProcessEngine(
ProcessEngineConfiguration currentProcessEngineConfiguration) {
ProcessEngines.destroy();
SystemRepository systemRepository = Entity.getSystemRepository();
WorkflowSettings workflowSettings = systemRepository.getWorkflowSettings();
StandaloneProcessEngineConfiguration processEngineConfiguration =
new StandaloneProcessEngineConfiguration();
// Setting Database Configuration
processEngineConfiguration
.setJdbcUrl(currentProcessEngineConfiguration.getJdbcUrl())
.setJdbcUsername(currentProcessEngineConfiguration.getJdbcUsername())
.setJdbcPassword(currentProcessEngineConfiguration.getJdbcPassword())
.setJdbcDriver(currentProcessEngineConfiguration.getJdbcDriver())
.setDatabaseType(currentProcessEngineConfiguration.getDatabaseType())
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE);
// Setting Async Executor Configuration
processEngineConfiguration
.setAsyncExecutorActivate(true)
.setAsyncExecutorCorePoolSize(workflowSettings.getExecutorConfiguration().getCorePoolSize())
.setAsyncExecutorMaxPoolSize(workflowSettings.getExecutorConfiguration().getMaxPoolSize())
.setAsyncExecutorThreadPoolQueueSize(
workflowSettings.getExecutorConfiguration().getQueueSize())
.setAsyncExecutorMaxAsyncJobsDuePerAcquisition(
workflowSettings.getExecutorConfiguration().getTasksDuePerAcquisition());
// Setting History CleanUp
processEngineConfiguration
.setEnableHistoryCleaning(true)
.setCleanInstancesEndedAfter(
Duration.ofDays(
workflowSettings.getHistoryCleanUpConfiguration().getCleanAfterNumberOfDays()));
// Add Global Failure Listener
processEngineConfiguration.setEventListeners(List.of(new WorkflowFailureListener()));
ProcessEngine processEngine = processEngineConfiguration.buildProcessEngine(); ProcessEngine processEngine = processEngineConfiguration.buildProcessEngine();
this.processEngine = processEngine;
this.repositoryService = processEngine.getRepositoryService(); this.repositoryService = processEngine.getRepositoryService();
this.runtimeService = processEngine.getRuntimeService(); this.runtimeService = processEngine.getRuntimeService();
this.taskService = processEngine.getTaskService(); this.taskService = processEngine.getTaskService();
@ -81,6 +121,14 @@ public class WorkflowHandler {
throw new UnhandledServerException("WorkflowHandler is not initialized."); throw new UnhandledServerException("WorkflowHandler is not initialized.");
} }
public ProcessEngineConfiguration getProcessEngineConfiguration() {
if (processEngine != null) {
return processEngine.getProcessEngineConfiguration();
} else {
return null;
}
}
public void deploy(Workflow workflow) { public void deploy(Workflow workflow) {
BpmnXMLConverter bpmnXMLConverter = new BpmnXMLConverter(); BpmnXMLConverter bpmnXMLConverter = new BpmnXMLConverter();

View File

@ -67,6 +67,7 @@ import org.openmetadata.schema.auth.PersonalAccessToken;
import org.openmetadata.schema.auth.RefreshToken; import org.openmetadata.schema.auth.RefreshToken;
import org.openmetadata.schema.auth.TokenType; import org.openmetadata.schema.auth.TokenType;
import org.openmetadata.schema.configuration.AssetCertificationSettings; import org.openmetadata.schema.configuration.AssetCertificationSettings;
import org.openmetadata.schema.configuration.WorkflowSettings;
import org.openmetadata.schema.dataInsight.DataInsightChart; import org.openmetadata.schema.dataInsight.DataInsightChart;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
import org.openmetadata.schema.dataInsight.kpi.Kpi; import org.openmetadata.schema.dataInsight.kpi.Kpi;
@ -5156,6 +5157,7 @@ public interface CollectionDAO {
case SEARCH_SETTINGS -> JsonUtils.readValue(json, SearchSettings.class); case SEARCH_SETTINGS -> JsonUtils.readValue(json, SearchSettings.class);
case ASSET_CERTIFICATION_SETTINGS -> JsonUtils.readValue( case ASSET_CERTIFICATION_SETTINGS -> JsonUtils.readValue(
json, AssetCertificationSettings.class); json, AssetCertificationSettings.class);
case WORKFLOW_SETTINGS -> JsonUtils.readValue(json, WorkflowSettings.class);
case LINEAGE_SETTINGS -> JsonUtils.readValue(json, LineageSettings.class); case LINEAGE_SETTINGS -> JsonUtils.readValue(json, LineageSettings.class);
default -> throw new IllegalArgumentException("Invalid Settings Type " + configType); default -> throw new IllegalArgumentException("Invalid Settings Type " + configType);
}; };

View File

@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction; import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.api.configuration.UiThemePreference; import org.openmetadata.api.configuration.UiThemePreference;
import org.openmetadata.schema.configuration.AssetCertificationSettings; import org.openmetadata.schema.configuration.AssetCertificationSettings;
import org.openmetadata.schema.configuration.WorkflowSettings;
import org.openmetadata.schema.email.SmtpSettings; import org.openmetadata.schema.email.SmtpSettings;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.schema.security.client.OpenMetadataJWTClientConfig; import org.openmetadata.schema.security.client.OpenMetadataJWTClientConfig;
@ -32,6 +33,7 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.exception.CustomExceptionMessage; import org.openmetadata.service.exception.CustomExceptionMessage;
import org.openmetadata.service.fernet.Fernet; import org.openmetadata.service.fernet.Fernet;
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.jdbi3.CollectionDAO.SystemDAO; import org.openmetadata.service.jdbi3.CollectionDAO.SystemDAO;
import org.openmetadata.service.migration.MigrationValidationClient; import org.openmetadata.service.migration.MigrationValidationClient;
import org.openmetadata.service.resources.settings.SettingsCache; import org.openmetadata.service.resources.settings.SettingsCache;
@ -119,6 +121,15 @@ public class SystemRepository {
.orElse(null); .orElse(null);
} }
public WorkflowSettings getWorkflowSettings() {
Optional<Settings> oWorkflowSettings =
Optional.ofNullable(getConfigWithKey(SettingsType.WORKFLOW_SETTINGS.value()));
return oWorkflowSettings
.map(settings -> (WorkflowSettings) settings.getConfigValue())
.orElse(null);
}
public Settings getEmailConfigInternal() { public Settings getEmailConfigInternal() {
try { try {
Settings setting = dao.getConfigWithKey(SettingsType.EMAIL_CONFIGURATION.value()); Settings setting = dao.getConfigWithKey(SettingsType.EMAIL_CONFIGURATION.value());
@ -209,6 +220,13 @@ public class SystemRepository {
return (new RestUtil.PutResponse<>(Response.Status.OK, original, ENTITY_UPDATED)).toResponse(); return (new RestUtil.PutResponse<>(Response.Status.OK, original, ENTITY_UPDATED)).toResponse();
} }
private void postUpdate(SettingsType settingsType) {
if (settingsType == SettingsType.WORKFLOW_SETTINGS) {
WorkflowHandler workflowHandler = WorkflowHandler.getInstance();
workflowHandler.initializeNewProcessEngine(workflowHandler.getProcessEngineConfiguration());
}
}
public void updateSetting(Settings setting) { public void updateSetting(Settings setting) {
try { try {
if (setting.getConfigType() == SettingsType.EMAIL_CONFIGURATION) { if (setting.getConfigType() == SettingsType.EMAIL_CONFIGURATION) {
@ -235,6 +253,7 @@ public class SystemRepository {
setting.getConfigType().toString(), JsonUtils.pojoToJson(setting.getConfigValue())); setting.getConfigType().toString(), JsonUtils.pojoToJson(setting.getConfigValue()));
// Invalidate Cache // Invalidate Cache
SettingsCache.invalidateSettings(setting.getConfigType().value()); SettingsCache.invalidateSettings(setting.getConfigType().value());
postUpdate(setting.getConfigType());
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Failing in Updating Setting.", ex); LOG.error("Failing in Updating Setting.", ex);
throw new CustomExceptionMessage( throw new CustomExceptionMessage(

View File

@ -19,6 +19,7 @@ import static org.openmetadata.schema.settings.SettingsType.EMAIL_CONFIGURATION;
import static org.openmetadata.schema.settings.SettingsType.LINEAGE_SETTINGS; import static org.openmetadata.schema.settings.SettingsType.LINEAGE_SETTINGS;
import static org.openmetadata.schema.settings.SettingsType.LOGIN_CONFIGURATION; import static org.openmetadata.schema.settings.SettingsType.LOGIN_CONFIGURATION;
import static org.openmetadata.schema.settings.SettingsType.SEARCH_SETTINGS; import static org.openmetadata.schema.settings.SettingsType.SEARCH_SETTINGS;
import static org.openmetadata.schema.settings.SettingsType.WORKFLOW_SETTINGS;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
@ -35,6 +36,9 @@ import org.openmetadata.schema.api.lineage.LineageLayer;
import org.openmetadata.schema.api.lineage.LineageSettings; import org.openmetadata.schema.api.lineage.LineageSettings;
import org.openmetadata.schema.api.search.SearchSettings; import org.openmetadata.schema.api.search.SearchSettings;
import org.openmetadata.schema.configuration.AssetCertificationSettings; import org.openmetadata.schema.configuration.AssetCertificationSettings;
import org.openmetadata.schema.configuration.ExecutorConfiguration;
import org.openmetadata.schema.configuration.HistoryCleanUpConfiguration;
import org.openmetadata.schema.configuration.WorkflowSettings;
import org.openmetadata.schema.email.SmtpSettings; import org.openmetadata.schema.email.SmtpSettings;
import org.openmetadata.schema.settings.Settings; import org.openmetadata.schema.settings.Settings;
import org.openmetadata.schema.settings.SettingsType; import org.openmetadata.schema.settings.SettingsType;
@ -144,6 +148,19 @@ public class SettingsCache {
systemRepository.createNewSetting(setting); systemRepository.createNewSetting(setting);
} }
// Initialise Workflow Settings
Settings workflowSettings = systemRepository.getConfigWithKey(WORKFLOW_SETTINGS.toString());
if (workflowSettings == null) {
Settings setting =
new Settings()
.withConfigType(WORKFLOW_SETTINGS)
.withConfigValue(
new WorkflowSettings()
.withExecutorConfiguration(new ExecutorConfiguration())
.withHistoryCleanUpConfiguration(new HistoryCleanUpConfiguration()));
systemRepository.createNewSetting(setting);
}
Settings lineageSettings = systemRepository.getConfigWithKey(LINEAGE_SETTINGS.toString()); Settings lineageSettings = systemRepository.getConfigWithKey(LINEAGE_SETTINGS.toString());
if (lineageSettings == null) { if (lineageSettings == null) {
// Only in case a config doesn't exist in DB we insert it // Only in case a config doesn't exist in DB we insert it

View File

@ -0,0 +1,59 @@
{
"$id": "https://open-metadata.org/schema/configuration/workflowSettings.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "WorkflowSettings",
"description": "This schema defines the Workflow Settings.",
"type": "object",
"javaType": "org.openmetadata.schema.configuration.WorkflowSettings",
"definitions": {
"executorConfiguration": {
"type": "object",
"properties": {
"corePoolSize": {
"type": "integer",
"default": 50,
"description": "Default worker Pool Size. The Workflow Executor by default has this amount of workers."
},
"maxPoolSize": {
"type": "integer",
"default": 100,
"description": "Maximum worker Pool Size. The Workflow Executor could grow up to this number of workers."
},
"queueSize": {
"type": "integer",
"default": 1000,
"description": "Amount of Tasks that can be queued to be picked up by the Workflow Executor."
},
"tasksDuePerAcquisition": {
"type": "integer",
"default": 20,
"description": "The amount of Tasks that the Workflow Executor is able to pick up each time it looks for more."
}
},
"additionalProperties": false
},
"historyCleanUpConfiguration": {
"type": "object",
"properties": {
"cleanAfterNumberOfDays": {
"type": "integer",
"default": 7,
"description": "Cleans the Workflow Task that were finished, after given number of days."
}
},
"additionalProperties": false
}
},
"properties": {
"executorConfiguration": {
"$ref": "#/definitions/executorConfiguration",
"description": "Used to set up the Workflow Executor Settings."
},
"historyCleanUpConfiguration": {
"$ref": "#/definitions/historyCleanUpConfiguration",
"description": "Used to set up the History CleanUp Settings."
}
},
"required": ["allowedClassification", "validityPeriod"],
"additionalProperties": false
}

View File

@ -32,7 +32,8 @@
"profilerConfiguration", "profilerConfiguration",
"searchSettings", "searchSettings",
"assetCertificationSettings", "assetCertificationSettings",
"lineageSettings" "lineageSettings",
"workflowSettings"
] ]
} }
}, },
@ -84,6 +85,9 @@
}, },
{ {
"$ref": "../configuration/lineageSettings.json" "$ref": "../configuration/lineageSettings.json"
},
{
"$ref": "../configuration/workflowSettings.json"
} }
] ]
} }