From 78075352e97169d71e7ecd1463c072f219184950 Mon Sep 17 00:00:00 2001 From: IceS2 Date: Wed, 21 May 2025 16:22:28 +0200 Subject: [PATCH] Improve Workflow Logs and Reduce Concurrency (#21345) (cherry picked from commit 04bfb7b85d7457210da2e0fd47364b1ecc2b4c2a) --- .../native/1.7.1/mysql/schemaChanges.sql | 10 ++++++++++ .../native/1.7.1/postgres/schemaChanges.sql | 20 +++++++++++++++++++ .../governance/workflows/WorkflowHandler.java | 1 + .../CreateIngestionPipelineImpl.java | 17 ++++++++++++++-- .../RunIngestionPipelineImpl.java | 13 ++++++++++++ .../automatedTask/runApp/RunAppImpl.java | 20 ++++++++++++++----- .../builders/ParallelGatewayBuilder.java | 2 +- .../configuration/workflowSettings.json | 4 ++-- 8 files changed, 77 insertions(+), 10 deletions(-) diff --git a/bootstrap/sql/migrations/native/1.7.1/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.7.1/mysql/schemaChanges.sql index 02f72c1bda0..190e307cc61 100644 --- a/bootstrap/sql/migrations/native/1.7.1/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.7.1/mysql/schemaChanges.sql @@ -33,3 +33,13 @@ SET json = JSON_SET( true ) WHERE name = 'AutoPilotApplication'; + +-- Update workflow settings with default values if present +UPDATE openmetadata_settings +SET json = JSON_SET( + json, + '$.executorConfiguration.corePoolSize', 10, + '$.executorConfiguration.maxPoolSize', 20, + '$.executorConfiguration.jobLockTimeInMillis', 1296000000 +) +WHERE configType = 'workflowSettings'; diff --git a/bootstrap/sql/migrations/native/1.7.1/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.7.1/postgres/schemaChanges.sql index f2bd776c59d..0f5ace2f751 100644 --- a/bootstrap/sql/migrations/native/1.7.1/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.7.1/postgres/schemaChanges.sql @@ -39,3 +39,23 @@ SET json = jsonb_set( 'true' ) where name = 'AutoPilotApplication'; + +-- Update workflow settings with default values if present +UPDATE openmetadata_settings +SET json = jsonb_set( + jsonb_set( + jsonb_set( + json, + '{executorConfiguration,corePoolSize}', + '10', + true + ), + '{executorConfiguration,maxPoolSize}', + '20', + true + ), + '{executorConfiguration,jobLockTimeInMillis}', + '1296000000', + true + ) +WHERE configType = 'workflowSettings'; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index 7dc6d3ebfba..504f353bead 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -216,6 +216,7 @@ public class WorkflowHandler { public ProcessInstance triggerByKey( String processDefinitionKey, String businessKey, Map variables) { RuntimeService runtimeService = processEngine.getRuntimeService(); + LOG.debug("[GovernanceWorkflows] '{}' triggered with '{}'", processDefinitionKey, variables); return runtimeService.startProcessInstanceByKey(processDefinitionKey, businessKey, variables); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java index 9bdf5f9fb9d..a9b63db3da6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java @@ -155,16 +155,24 @@ public class CreateIngestionPipelineImpl { pipelineType, JsonUtils.getMap(service.getConnection().getConfig()))) { LOG.debug( String.format( - "Service '%s' does not support Ingestion Pipeline of type '%s'", - service.getDisplayName(), pipelineType)); + "[GovernanceWorkflows] Service '%s' does not support Ingestion Pipeline of type '%s'", + service.getName(), pipelineType)); return new CreateIngestionPipelineResult(null, true); } + LOG.info( + "[GovernanceWorkflows] Creating '{}' Agent for '{}'", + pipelineType.value(), + service.getName()); boolean wasSuccessful = true; IngestionPipeline ingestionPipeline = getOrCreateIngestionPipeline(pipelineType, service); if (deploy) { + LOG.info( + "[GovernanceWorkflows] Deploying '{}' for '{}'", + ingestionPipeline.getDisplayName(), + service.getName()); wasSuccessful = deployPipeline(pipelineServiceClient, ingestionPipeline, service); if (wasSuccessful) { // Mark the pipeline as deployed @@ -172,6 +180,11 @@ public class CreateIngestionPipelineImpl { IngestionPipelineRepository repository = (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); repository.createOrUpdate(null, ingestionPipeline, ingestionPipeline.getUpdatedBy()); + } else { + LOG.warn( + "[GovernanceWorkflows] '{}' deployment failed for '{}'", + pipelineType.value(), + service.getName()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/RunIngestionPipelineImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/RunIngestionPipelineImpl.java index 35f9f8639e8..ee4c61c2a1a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/RunIngestionPipelineImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/RunIngestionPipelineImpl.java @@ -4,6 +4,7 @@ import static org.openmetadata.service.util.EntityUtil.Fields.EMPTY_FIELDS; import java.util.List; import java.util.UUID; +import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType; @@ -15,6 +16,7 @@ import org.openmetadata.service.exception.IngestionPipelineDeploymentException; import org.openmetadata.service.jdbi3.IngestionPipelineRepository; import org.openmetadata.service.util.OpenMetadataConnectionBuilder; +@Slf4j public class RunIngestionPipelineImpl { private final PipelineServiceClientInterface pipelineServiceClient; @@ -50,10 +52,21 @@ public class RunIngestionPipelineImpl { long startTime = System.currentTimeMillis(); long timeoutMillis = timeoutSeconds * 1000; + LOG.info( + "[GovernanceWorkflows] '{}' running for '{}'", + ingestionPipeline.getDisplayName(), + ingestionPipeline.getService().getName()); runIngestionPipeline(ingestionPipeline); if (waitForCompletion) { wasSuccessful = waitForCompletion(repository, ingestionPipeline, startTime, timeoutMillis); + + if (!wasSuccessful) { + LOG.warn( + "[GovernanceWorkflows] '{}' failed for '{}'", + ingestionPipeline.getDisplayName(), + ingestionPipeline.getService().getName()); + } } return wasSuccessful; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java index 652a8ef8380..22fc7669b3c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java @@ -49,6 +49,7 @@ public class RunAppImpl { boolean waitForCompletion, long timeoutSeconds, MessageParser.EntityLink entityLink) { + boolean wasSuccessful = true; ServiceEntityInterface service = Entity.getEntity(entityLink, "owners", Include.NON_DELETED); AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); @@ -58,27 +59,36 @@ public class RunAppImpl { appRepository.getByName(null, appName, new EntityUtil.Fields(Set.of("bot", "pipelines"))); } catch (EntityNotFoundException ex) { LOG.warn(String.format("App: '%s' is not Installed. Skipping", appName)); - return true; + return wasSuccessful; } if (!validateAppShouldRun(app, service)) { - return true; + return wasSuccessful; } long startTime = System.currentTimeMillis(); long timeoutMillis = timeoutSeconds * 1000; Map config = getConfig(app, service); + + LOG.info( + "[GovernanceWorkflows] '{}' running for '{}'", app.getDisplayName(), service.getName()); if (app.getAppType().equals(AppType.Internal)) { - return runApp(appRepository, app, config, waitForCompletion, startTime, timeoutMillis); + wasSuccessful = + runApp(appRepository, app, config, waitForCompletion, startTime, timeoutMillis); } else { App updatedApp = JsonUtils.deepCopy(app, App.class); updatedApp.setAppConfiguration(config); - boolean result = + wasSuccessful = runApp(pipelineServiceClient, updatedApp, waitForCompletion, startTime, timeoutMillis); deployIngestionPipeline(pipelineServiceClient, app); - return result; } + + if (!wasSuccessful) { + LOG.warn( + "[GovernanceWorkflows] '{}' failed for '{}'", app.getDisplayName(), service.getName()); + } + return wasSuccessful; } private boolean validateAppShouldRun(App app, ServiceEntityInterface service) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/ParallelGatewayBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/ParallelGatewayBuilder.java index 3721b972f40..1cd161f0dd9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/ParallelGatewayBuilder.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/ParallelGatewayBuilder.java @@ -5,7 +5,7 @@ import org.flowable.bpmn.model.ParallelGateway; public class ParallelGatewayBuilder extends FlowableElementBuilder { private boolean async = true; - private boolean exclusive = true; + private boolean exclusive = false; public ParallelGatewayBuilder setAsync(boolean async) { this.async = async; diff --git a/openmetadata-spec/src/main/resources/json/schema/configuration/workflowSettings.json b/openmetadata-spec/src/main/resources/json/schema/configuration/workflowSettings.json index eda1572fa86..3eec22751ba 100644 --- a/openmetadata-spec/src/main/resources/json/schema/configuration/workflowSettings.json +++ b/openmetadata-spec/src/main/resources/json/schema/configuration/workflowSettings.json @@ -11,12 +11,12 @@ "properties": { "corePoolSize": { "type": "integer", - "default": 50, + "default": 10, "description": "Default worker Pool Size. The Workflow Executor by default has this amount of workers." }, "maxPoolSize": { "type": "integer", - "default": 100, + "default": 20, "description": "Maximum worker Pool Size. The Workflow Executor could grow up to this number of workers." }, "queueSize": {