From a3a26ec29afa03f6e2ad807e2c24cc66d43da4c8 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji <81347100+yan-3005@users.noreply.github.com> Date: Fri, 15 Aug 2025 16:01:34 +0530 Subject: [PATCH] Fix: Flowable and Governance Workflow Error Logging for easy identification of errors and resolutions (#22940) --- ...flowInstanceExecutionIdSetterListener.java | 46 +++-- .../workflows/WorkflowInstanceListener.java | 113 ++++++++++-- .../WorkflowInstanceStageListener.java | 169 ++++++++++++++++-- .../workflows/flowable/BaseDelegate.java | 28 ++- 4 files changed, 317 insertions(+), 39 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceExecutionIdSetterListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceExecutionIdSetterListener.java index 0106f2e276c..993abba8444 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceExecutionIdSetterListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceExecutionIdSetterListener.java @@ -14,24 +14,50 @@ import org.flowable.engine.delegate.JavaDelegate; public class WorkflowInstanceExecutionIdSetterListener implements JavaDelegate { @Override public void execute(DelegateExecution execution) { + String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); + String processInstanceId = execution.getProcessInstanceId(); + + // CRITICAL: Always set the execution ID first - this is mandatory for stage tracking + UUID workflowInstanceExecutionId = UUID.randomUUID(); + execution.setVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE, workflowInstanceExecutionId); + LOG.info( + "[WORKFLOW_EXEC_ID_SET] Workflow: {}, ProcessInstance: {}, ExecutionId: {} - Execution ID initialized", + workflowName, + processInstanceId, + workflowInstanceExecutionId); + WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution); try { - String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); String relatedEntity = (String) varHandler.getNamespacedVariable(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE); - LOG.debug( - String.format( - "New Execution for Workflow '%s'. Related Entity: '%s'", - workflowName, relatedEntity)); - UUID workflowInstanceExecutionId = UUID.randomUUID(); - execution.setVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE, workflowInstanceExecutionId); + if (relatedEntity == null || relatedEntity.isEmpty()) { + LOG.error( + "[WORKFLOW_MISSING_ENTITY] Workflow: {}, ProcessInstance: {}, ExecutionId: {} - RELATED_ENTITY variable is null/empty. Workflow will likely fail.", + workflowName, + processInstanceId, + workflowInstanceExecutionId); + execution.setVariable(Workflow.FAILURE_VARIABLE, true); + execution.setVariable("startupError", "Missing required variable: relatedEntity"); + } else { + LOG.info( + "[WORKFLOW_EXEC_STARTED] Workflow: {}, ProcessInstance: {}, ExecutionId: {}, RelatedEntity: {} - Workflow execution initialized successfully", + workflowName, + processInstanceId, + workflowInstanceExecutionId, + relatedEntity); + } } catch (Exception exc) { LOG.error( - String.format( - "[%s] Failed due to: %s ", - getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()), + "[WORKFLOW_INIT_ERROR] Workflow: {}, ProcessInstance: {}, ExecutionId: {} - Failed to retrieve relatedEntity variable. Error: {}", + workflowName, + processInstanceId, + workflowInstanceExecutionId, + exc.getMessage(), exc); + // Set failure indicator but don't prevent workflow from starting + execution.setVariable(Workflow.FAILURE_VARIABLE, true); + execution.setVariable("startupError", "Failed to get relatedEntity: " + exc.getMessage()); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java index 261afe62ab8..5de9038cbaf 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java @@ -13,25 +13,78 @@ import org.openmetadata.service.jdbi3.WorkflowInstanceRepository; public class WorkflowInstanceListener implements JavaDelegate { @Override public void execute(DelegateExecution execution) { + String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); + String processInstanceId = execution.getProcessInstanceId(); + String eventName = execution.getEventName(); + + WorkflowInstanceRepository workflowInstanceRepository = null; try { - WorkflowInstanceRepository workflowInstanceRepository = + workflowInstanceRepository = (WorkflowInstanceRepository) Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE); - switch (execution.getEventName()) { - case "start" -> addWorkflowInstance(execution, workflowInstanceRepository); - case "end" -> updateWorkflowInstance(execution, workflowInstanceRepository); + switch (eventName) { + case "start" -> { + LOG.info( + "[WORKFLOW_INSTANCE_START] Workflow: {}, ProcessInstance: {} - Creating workflow instance record", + workflowName, + processInstanceId); + addWorkflowInstance(execution, workflowInstanceRepository); + } + case "end" -> { + LOG.info( + "[WORKFLOW_INSTANCE_END] Workflow: {}, ProcessInstance: {} - Updating workflow instance status", + workflowName, + processInstanceId); + updateWorkflowInstance(execution, workflowInstanceRepository); + } default -> LOG.debug( - String.format( - "WorkflowStageUpdaterListener does not support listening for the event: '%s'", - execution.getEventName())); + "[WORKFLOW_INSTANCE_EVENT] Workflow: {}, ProcessInstance: {} - Unsupported event: {}", + workflowName, + processInstanceId, + eventName); } } catch (Exception exc) { LOG.error( - String.format( - "[%s] Failed due to: %s ", - getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()), + "[WORKFLOW_INSTANCE_ERROR] Workflow: {}, ProcessInstance: {}, Event: {} - Failed to process workflow instance. Error: {}", + workflowName, + processInstanceId, + eventName, + exc.getMessage(), exc); + + // CRITICAL: Even on failure, we must record the state in the database + if ("end".equals(eventName) && workflowInstanceRepository != null) { + try { + String businessKey = execution.getProcessInstanceBusinessKey(); + if (businessKey != null && !businessKey.isEmpty()) { + UUID workflowInstanceId = UUID.fromString(businessKey); + java.util.Map errorVariables = new java.util.HashMap<>(); + errorVariables.put("status", "FAILURE"); + errorVariables.put("error", exc.getMessage()); + errorVariables.put("errorClass", exc.getClass().getSimpleName()); + workflowInstanceRepository.updateWorkflowInstance( + workflowInstanceId, System.currentTimeMillis(), errorVariables); + LOG.warn( + "[WORKFLOW_INSTANCE_FAILED] Workflow: {}, ProcessInstance: {}, InstanceId: {} - Workflow marked as FAILED in database", + workflowName, + processInstanceId, + workflowInstanceId); + } else { + LOG.error( + "[WORKFLOW_INSTANCE_NO_KEY] Workflow: {}, ProcessInstance: {} - Cannot update workflow status, business key is missing", + workflowName, + processInstanceId); + } + } catch (Exception updateExc) { + LOG.error( + "[WORKFLOW_INSTANCE_DB_ERROR] Workflow: {}, ProcessInstance: {} - Failed to record workflow failure in database. Error: {}", + workflowName, + processInstanceId, + updateExc.getMessage(), + updateExc); + } + } } } @@ -54,9 +107,11 @@ public class WorkflowInstanceListener implements JavaDelegate { workflowInstanceId, System.currentTimeMillis(), execution.getVariables()); - LOG.debug( - String.format( - "Workflow '%s' Triggered. Instance: '%s'", workflowDefinitionName, workflowInstanceId)); + LOG.info( + "[WORKFLOW_INSTANCE_CREATED] Workflow: {}, InstanceId: {}, ProcessInstance: {} - Workflow instance record created successfully", + workflowDefinitionName, + workflowInstanceId, + execution.getProcessInstanceId()); } private void updateWorkflowInstance( @@ -65,11 +120,35 @@ public class WorkflowInstanceListener implements JavaDelegate { getMainWorkflowDefinitionNameFromTrigger( getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())); UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); + + // Capture all variables including any failure indicators + java.util.Map variables = new java.util.HashMap<>(execution.getVariables()); + + // Determine final status based on what happened during execution + String status = "FINISHED"; // Default + if (Boolean.TRUE.equals(variables.get(Workflow.FAILURE_VARIABLE))) { + status = "FAILURE"; + } else if (variables.containsKey(Workflow.EXCEPTION_VARIABLE)) { + status = "EXCEPTION"; + } + variables.put("status", status); + workflowInstanceRepository.updateWorkflowInstance( - workflowInstanceId, System.currentTimeMillis(), execution.getVariables()); - LOG.debug( - String.format( - "Workflow '%s' Finished. Instance: '%s'", workflowDefinitionName, workflowInstanceId)); + workflowInstanceId, System.currentTimeMillis(), variables); + + if ("FAILURE".equals(status) || "EXCEPTION".equals(status)) { + LOG.warn( + "[WORKFLOW_INSTANCE_COMPLETED_WITH_ERRORS] Workflow: {}, InstanceId: {}, Status: {} - Workflow completed with errors", + workflowDefinitionName, + workflowInstanceId, + status); + } else { + LOG.info( + "[WORKFLOW_INSTANCE_COMPLETED] Workflow: {}, InstanceId: {}, Status: {} - Workflow completed successfully", + workflowDefinitionName, + workflowInstanceId, + status); + } } private String getMainWorkflowDefinitionNameFromTrigger(String triggerWorkflowDefinitionName) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java index 336e671677b..cc86312ef8e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java @@ -17,26 +17,113 @@ import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository; public class WorkflowInstanceStageListener implements JavaDelegate { @Override public void execute(DelegateExecution execution) { + String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); + String processInstanceId = execution.getProcessInstanceId(); + String eventName = execution.getEventName(); + String currentActivity = execution.getCurrentActivityId(); + WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution); try { WorkflowInstanceStateRepository workflowInstanceStateRepository = (WorkflowInstanceStateRepository) Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); - switch (execution.getEventName()) { - case "start" -> addNewStage(varHandler, execution, workflowInstanceStateRepository); - case "end" -> updateStage(varHandler, execution, workflowInstanceStateRepository); + switch (eventName) { + case "start" -> { + LOG.info( + "[STAGE_START] Workflow: {}, ProcessInstance: {}, Activity: {} - Creating new stage record", + workflowName, + processInstanceId, + currentActivity); + addNewStage(varHandler, execution, workflowInstanceStateRepository); + } + case "end" -> { + LOG.info( + "[STAGE_END] Workflow: {}, ProcessInstance: {}, Activity: {} - Updating stage completion", + workflowName, + processInstanceId, + currentActivity); + updateStage(varHandler, execution, workflowInstanceStateRepository); + } default -> LOG.debug( - String.format( - "WorkflowStageUpdaterListener does not support listening for the event: '%s'", - execution.getEventName())); + "[STAGE_EVENT] Workflow: {}, ProcessInstance: {}, Activity: {} - Unsupported event: {}", + workflowName, + processInstanceId, + currentActivity, + eventName); } } catch (Exception exc) { LOG.error( - String.format( - "[%s] Failed due to: %s ", - getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()), + "[STAGE_ERROR] Workflow: {}, ProcessInstance: {}, Activity: {}, Event: {} - Stage processing failed. Error: {}", + workflowName, + processInstanceId, + currentActivity, + eventName, + exc.getMessage(), exc); + + // CRITICAL: Record the stage failure in the database even if processing failed + if ("end".equals(eventName)) { + try { + WorkflowInstanceStateRepository workflowInstanceStateRepository = + (WorkflowInstanceStateRepository) + Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); + + String businessKey = execution.getProcessInstanceBusinessKey(); + if (businessKey != null && !businessKey.isEmpty()) { + UUID workflowInstanceId = UUID.fromString(businessKey); + UUID executionId = + (UUID) execution.getVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE); + if (executionId == null) { + // Use Flowable's execution ID as fallback + executionId = UUID.nameUUIDFromBytes(execution.getId().getBytes()); + LOG.warn( + "[STAGE_FALLBACK_ID] Workflow: {}, ProcessInstance: {} - Using fallback execution ID: {}", + workflowName, + processInstanceId, + executionId); + } + + String stage = Optional.ofNullable(currentActivity).orElse(workflowName); + + // Create a failed stage record + UUID stageId = + workflowInstanceStateRepository.addNewStageToInstance( + stage + "_failed", + executionId, + workflowInstanceId, + workflowName, + System.currentTimeMillis()); + + java.util.Map failureData = new java.util.HashMap<>(); + failureData.put("status", "FAILED"); + failureData.put("error", exc.getMessage()); + failureData.put("errorClass", exc.getClass().getSimpleName()); + + workflowInstanceStateRepository.updateStage( + stageId, System.currentTimeMillis(), failureData); + + LOG.warn( + "[STAGE_FAILED_RECORDED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Failed stage recorded in database", + workflowName, + processInstanceId, + stage, + stageId); + } else { + LOG.error( + "[STAGE_NO_BUSINESS_KEY] Workflow: {}, ProcessInstance: {} - Cannot record stage failure, business key is missing", + workflowName, + processInstanceId); + } + } catch (Exception recordExc) { + LOG.error( + "[STAGE_DB_ERROR] Workflow: {}, ProcessInstance: {} - Failed to record stage failure in database. Error: {}", + workflowName, + processInstanceId, + recordExc.getMessage(), + recordExc); + } + } } } @@ -47,9 +134,42 @@ public class WorkflowInstanceStageListener implements JavaDelegate { execution.removeTransientVariable(FAILURE_VARIABLE); String workflowDefinitionName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); - UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); + String processInstanceId = execution.getProcessInstanceId(); + + // Check business key first - critical for stage tracking + String businessKey = execution.getProcessInstanceBusinessKey(); + if (businessKey == null || businessKey.isEmpty()) { + LOG.error( + "[STAGE_MISSING_KEY] Workflow: {}, ProcessInstance: {} - Business key is missing for stage creation", + workflowDefinitionName, + processInstanceId); + throw new IllegalStateException( + String.format( + "Business key is missing for stage creation in workflow: %s", + workflowDefinitionName)); + } + UUID workflowInstanceId = UUID.fromString(businessKey); + + // Get or create workflow instance execution ID UUID workflowInstanceExecutionId = (UUID) execution.getVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE); + if (workflowInstanceExecutionId == null) { + // This should have been set by WorkflowInstanceExecutionIdSetterListener + LOG.error( + "[STAGE_MISSING_EXEC_ID] Workflow: {}, ProcessInstance: {}, InstanceId: {} - Workflow instance execution ID is null", + workflowDefinitionName, + processInstanceId, + workflowInstanceId); + // Use Flowable's execution ID as fallback + workflowInstanceExecutionId = UUID.nameUUIDFromBytes(execution.getId().getBytes()); + execution.setVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE, workflowInstanceExecutionId); + LOG.warn( + "[STAGE_CREATED_FALLBACK] Workflow: {}, ProcessInstance: {} - Created fallback execution ID: {}", + workflowDefinitionName, + processInstanceId, + workflowInstanceExecutionId); + } + String stage = Optional.ofNullable(execution.getCurrentActivityId()).orElse(workflowDefinitionName); UUID workflowInstanceStateId = @@ -60,15 +180,44 @@ public class WorkflowInstanceStageListener implements JavaDelegate { workflowDefinitionName, System.currentTimeMillis()); varHandler.setNodeVariable(STAGE_INSTANCE_STATE_ID_VARIABLE, workflowInstanceStateId); + LOG.info( + "[STAGE_CREATED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Stage record created successfully", + workflowDefinitionName, + processInstanceId, + stage, + workflowInstanceStateId); } private void updateStage( WorkflowVariableHandler varHandler, DelegateExecution execution, WorkflowInstanceStateRepository workflowInstanceStateRepository) { + String workflowDefinitionName = + getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); + String processInstanceId = execution.getProcessInstanceId(); + String stage = + Optional.ofNullable(execution.getCurrentActivityId()).orElse(workflowDefinitionName); + UUID workflowInstanceStateId = (UUID) varHandler.getNodeVariable(STAGE_INSTANCE_STATE_ID_VARIABLE); + + if (workflowInstanceStateId == null) { + LOG.error( + "[STAGE_UPDATE_NO_ID] Workflow: {}, ProcessInstance: {}, Stage: {} - Cannot update stage, state ID is null", + workflowDefinitionName, + processInstanceId, + stage); + return; + } + workflowInstanceStateRepository.updateStage( workflowInstanceStateId, System.currentTimeMillis(), execution.getVariables()); + + LOG.info( + "[STAGE_UPDATED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Stage completion recorded", + workflowDefinitionName, + processInstanceId, + stage, + workflowInstanceStateId); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/BaseDelegate.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/BaseDelegate.java index 031d32049a5..ca87a584ece 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/BaseDelegate.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/BaseDelegate.java @@ -26,16 +26,40 @@ public abstract class BaseDelegate implements JavaDelegate { @Override public void execute(DelegateExecution execution) { + String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); + String processInstanceId = execution.getProcessInstanceId(); + String activityId = execution.getCurrentActivityId(); + String delegateClass = this.getClass().getSimpleName(); + varHandler = new WorkflowVariableHandler(execution); try { inputNamespaceMap = JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class); configMap = JsonUtils.readOrConvertValue(configMapExpr.getValue(execution), Map.class); + + LOG.debug( + "[DELEGATE_EXECUTE] Workflow: {}, ProcessInstance: {}, Activity: {}, Delegate: {} - Starting delegate execution", + workflowName, + processInstanceId, + activityId, + delegateClass); + innerExecute(execution); + + LOG.debug( + "[DELEGATE_SUCCESS] Workflow: {}, ProcessInstance: {}, Activity: {}, Delegate: {} - Delegate execution completed", + workflowName, + processInstanceId, + activityId, + delegateClass); } catch (Exception exc) { LOG.error( - String.format( - "[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())), + "[DELEGATE_ERROR] Workflow: {}, ProcessInstance: {}, Activity: {}, Delegate: {} - Delegate execution failed. Error: {}", + workflowName, + processInstanceId, + activityId, + delegateClass, + exc.getMessage(), exc); varHandler.setGlobalVariable(EXCEPTION_VARIABLE, exc.toString()); throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());