Improve Workflow Logs and Reduce Concurrency (#21345)

(cherry picked from commit 04bfb7b85d7457210da2e0fd47364b1ecc2b4c2a)
This commit is contained in:
IceS2 2025-05-21 16:22:28 +02:00 committed by OpenMetadata Release Bot
parent b2658160e1
commit 78075352e9
8 changed files with 77 additions and 10 deletions

View File

@ -33,3 +33,13 @@ SET json = JSON_SET(
true
)
WHERE name = 'AutoPilotApplication';
-- Update workflow settings with default values if present
UPDATE openmetadata_settings
SET json = JSON_SET(
json,
'$.executorConfiguration.corePoolSize', 10,
'$.executorConfiguration.maxPoolSize', 20,
'$.executorConfiguration.jobLockTimeInMillis', 1296000000
)
WHERE configType = 'workflowSettings';

View File

@ -39,3 +39,23 @@ SET json = jsonb_set(
'true'
)
where name = 'AutoPilotApplication';
-- Update workflow settings with default values if present
UPDATE openmetadata_settings
SET json = jsonb_set(
jsonb_set(
jsonb_set(
json,
'{executorConfiguration,corePoolSize}',
'10',
true
),
'{executorConfiguration,maxPoolSize}',
'20',
true
),
'{executorConfiguration,jobLockTimeInMillis}',
'1296000000',
true
)
WHERE configType = 'workflowSettings';

View File

@ -216,6 +216,7 @@ public class WorkflowHandler {
public ProcessInstance triggerByKey(
String processDefinitionKey, String businessKey, Map<String, Object> variables) {
RuntimeService runtimeService = processEngine.getRuntimeService();
LOG.debug("[GovernanceWorkflows] '{}' triggered with '{}'", processDefinitionKey, variables);
return runtimeService.startProcessInstanceByKey(processDefinitionKey, businessKey, variables);
}

View File

@ -155,16 +155,24 @@ public class CreateIngestionPipelineImpl {
pipelineType, JsonUtils.getMap(service.getConnection().getConfig()))) {
LOG.debug(
String.format(
"Service '%s' does not support Ingestion Pipeline of type '%s'",
service.getDisplayName(), pipelineType));
"[GovernanceWorkflows] Service '%s' does not support Ingestion Pipeline of type '%s'",
service.getName(), pipelineType));
return new CreateIngestionPipelineResult(null, true);
}
LOG.info(
"[GovernanceWorkflows] Creating '{}' Agent for '{}'",
pipelineType.value(),
service.getName());
boolean wasSuccessful = true;
IngestionPipeline ingestionPipeline = getOrCreateIngestionPipeline(pipelineType, service);
if (deploy) {
LOG.info(
"[GovernanceWorkflows] Deploying '{}' for '{}'",
ingestionPipeline.getDisplayName(),
service.getName());
wasSuccessful = deployPipeline(pipelineServiceClient, ingestionPipeline, service);
if (wasSuccessful) {
// Mark the pipeline as deployed
@ -172,6 +180,11 @@ public class CreateIngestionPipelineImpl {
IngestionPipelineRepository repository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
repository.createOrUpdate(null, ingestionPipeline, ingestionPipeline.getUpdatedBy());
} else {
LOG.warn(
"[GovernanceWorkflows] '{}' deployment failed for '{}'",
pipelineType.value(),
service.getName());
}
}

View File

@ -4,6 +4,7 @@ import static org.openmetadata.service.util.EntityUtil.Fields.EMPTY_FIELDS;
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;
@ -15,6 +16,7 @@ import org.openmetadata.service.exception.IngestionPipelineDeploymentException;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
@Slf4j
public class RunIngestionPipelineImpl {
private final PipelineServiceClientInterface pipelineServiceClient;
@ -50,10 +52,21 @@ public class RunIngestionPipelineImpl {
long startTime = System.currentTimeMillis();
long timeoutMillis = timeoutSeconds * 1000;
LOG.info(
"[GovernanceWorkflows] '{}' running for '{}'",
ingestionPipeline.getDisplayName(),
ingestionPipeline.getService().getName());
runIngestionPipeline(ingestionPipeline);
if (waitForCompletion) {
wasSuccessful = waitForCompletion(repository, ingestionPipeline, startTime, timeoutMillis);
if (!wasSuccessful) {
LOG.warn(
"[GovernanceWorkflows] '{}' failed for '{}'",
ingestionPipeline.getDisplayName(),
ingestionPipeline.getService().getName());
}
}
return wasSuccessful;
}

View File

@ -49,6 +49,7 @@ public class RunAppImpl {
boolean waitForCompletion,
long timeoutSeconds,
MessageParser.EntityLink entityLink) {
boolean wasSuccessful = true;
ServiceEntityInterface service = Entity.getEntity(entityLink, "owners", Include.NON_DELETED);
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
@ -58,27 +59,36 @@ public class RunAppImpl {
appRepository.getByName(null, appName, new EntityUtil.Fields(Set.of("bot", "pipelines")));
} catch (EntityNotFoundException ex) {
LOG.warn(String.format("App: '%s' is not Installed. Skipping", appName));
return true;
return wasSuccessful;
}
if (!validateAppShouldRun(app, service)) {
return true;
return wasSuccessful;
}
long startTime = System.currentTimeMillis();
long timeoutMillis = timeoutSeconds * 1000;
Map<String, Object> config = getConfig(app, service);
LOG.info(
"[GovernanceWorkflows] '{}' running for '{}'", app.getDisplayName(), service.getName());
if (app.getAppType().equals(AppType.Internal)) {
return runApp(appRepository, app, config, waitForCompletion, startTime, timeoutMillis);
wasSuccessful =
runApp(appRepository, app, config, waitForCompletion, startTime, timeoutMillis);
} else {
App updatedApp = JsonUtils.deepCopy(app, App.class);
updatedApp.setAppConfiguration(config);
boolean result =
wasSuccessful =
runApp(pipelineServiceClient, updatedApp, waitForCompletion, startTime, timeoutMillis);
deployIngestionPipeline(pipelineServiceClient, app);
return result;
}
if (!wasSuccessful) {
LOG.warn(
"[GovernanceWorkflows] '{}' failed for '{}'", app.getDisplayName(), service.getName());
}
return wasSuccessful;
}
private boolean validateAppShouldRun(App app, ServiceEntityInterface service) {

View File

@ -5,7 +5,7 @@ import org.flowable.bpmn.model.ParallelGateway;
public class ParallelGatewayBuilder extends FlowableElementBuilder<ParallelGatewayBuilder> {
private boolean async = true;
private boolean exclusive = true;
private boolean exclusive = false;
public ParallelGatewayBuilder setAsync(boolean async) {
this.async = async;

View File

@ -11,12 +11,12 @@
"properties": {
"corePoolSize": {
"type": "integer",
"default": 50,
"default": 10,
"description": "Default worker Pool Size. The Workflow Executor by default has this amount of workers."
},
"maxPoolSize": {
"type": "integer",
"default": 100,
"default": 20,
"description": "Maximum worker Pool Size. The Workflow Executor could grow up to this number of workers."
},
"queueSize": {