Fix: Flowable and Governance Workflow Error Logging for easy identification of errors and resolutions (#22940)

This commit is contained in:
Ram Narayan Balaji 2025-08-15 16:01:34 +05:30 committed by GitHub
parent 63309e6454
commit a3a26ec29a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 317 additions and 39 deletions

View File

@ -14,24 +14,50 @@ import org.flowable.engine.delegate.JavaDelegate;
public class WorkflowInstanceExecutionIdSetterListener implements JavaDelegate {
@Override
public void execute(DelegateExecution execution) {
String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String processInstanceId = execution.getProcessInstanceId();
// CRITICAL: Always set the execution ID first - this is mandatory for stage tracking
UUID workflowInstanceExecutionId = UUID.randomUUID();
execution.setVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE, workflowInstanceExecutionId);
LOG.info(
"[WORKFLOW_EXEC_ID_SET] Workflow: {}, ProcessInstance: {}, ExecutionId: {} - Execution ID initialized",
workflowName,
processInstanceId,
workflowInstanceExecutionId);
WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution);
try {
String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String relatedEntity =
(String) varHandler.getNamespacedVariable(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE);
LOG.debug(
String.format(
"New Execution for Workflow '%s'. Related Entity: '%s'",
workflowName, relatedEntity));
UUID workflowInstanceExecutionId = UUID.randomUUID();
execution.setVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE, workflowInstanceExecutionId);
if (relatedEntity == null || relatedEntity.isEmpty()) {
LOG.error(
"[WORKFLOW_MISSING_ENTITY] Workflow: {}, ProcessInstance: {}, ExecutionId: {} - RELATED_ENTITY variable is null/empty. Workflow will likely fail.",
workflowName,
processInstanceId,
workflowInstanceExecutionId);
execution.setVariable(Workflow.FAILURE_VARIABLE, true);
execution.setVariable("startupError", "Missing required variable: relatedEntity");
} else {
LOG.info(
"[WORKFLOW_EXEC_STARTED] Workflow: {}, ProcessInstance: {}, ExecutionId: {}, RelatedEntity: {} - Workflow execution initialized successfully",
workflowName,
processInstanceId,
workflowInstanceExecutionId,
relatedEntity);
}
} catch (Exception exc) {
LOG.error(
String.format(
"[%s] Failed due to: %s ",
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()),
"[WORKFLOW_INIT_ERROR] Workflow: {}, ProcessInstance: {}, ExecutionId: {} - Failed to retrieve relatedEntity variable. Error: {}",
workflowName,
processInstanceId,
workflowInstanceExecutionId,
exc.getMessage(),
exc);
// Set failure indicator but don't prevent workflow from starting
execution.setVariable(Workflow.FAILURE_VARIABLE, true);
execution.setVariable("startupError", "Failed to get relatedEntity: " + exc.getMessage());
}
}
}

View File

@ -13,25 +13,78 @@ import org.openmetadata.service.jdbi3.WorkflowInstanceRepository;
public class WorkflowInstanceListener implements JavaDelegate {
@Override
public void execute(DelegateExecution execution) {
String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String processInstanceId = execution.getProcessInstanceId();
String eventName = execution.getEventName();
WorkflowInstanceRepository workflowInstanceRepository = null;
try {
WorkflowInstanceRepository workflowInstanceRepository =
workflowInstanceRepository =
(WorkflowInstanceRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE);
switch (execution.getEventName()) {
case "start" -> addWorkflowInstance(execution, workflowInstanceRepository);
case "end" -> updateWorkflowInstance(execution, workflowInstanceRepository);
switch (eventName) {
case "start" -> {
LOG.info(
"[WORKFLOW_INSTANCE_START] Workflow: {}, ProcessInstance: {} - Creating workflow instance record",
workflowName,
processInstanceId);
addWorkflowInstance(execution, workflowInstanceRepository);
}
case "end" -> {
LOG.info(
"[WORKFLOW_INSTANCE_END] Workflow: {}, ProcessInstance: {} - Updating workflow instance status",
workflowName,
processInstanceId);
updateWorkflowInstance(execution, workflowInstanceRepository);
}
default -> LOG.debug(
String.format(
"WorkflowStageUpdaterListener does not support listening for the event: '%s'",
execution.getEventName()));
"[WORKFLOW_INSTANCE_EVENT] Workflow: {}, ProcessInstance: {} - Unsupported event: {}",
workflowName,
processInstanceId,
eventName);
}
} catch (Exception exc) {
LOG.error(
String.format(
"[%s] Failed due to: %s ",
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()),
"[WORKFLOW_INSTANCE_ERROR] Workflow: {}, ProcessInstance: {}, Event: {} - Failed to process workflow instance. Error: {}",
workflowName,
processInstanceId,
eventName,
exc.getMessage(),
exc);
// CRITICAL: Even on failure, we must record the state in the database
if ("end".equals(eventName) && workflowInstanceRepository != null) {
try {
String businessKey = execution.getProcessInstanceBusinessKey();
if (businessKey != null && !businessKey.isEmpty()) {
UUID workflowInstanceId = UUID.fromString(businessKey);
java.util.Map<String, Object> errorVariables = new java.util.HashMap<>();
errorVariables.put("status", "FAILURE");
errorVariables.put("error", exc.getMessage());
errorVariables.put("errorClass", exc.getClass().getSimpleName());
workflowInstanceRepository.updateWorkflowInstance(
workflowInstanceId, System.currentTimeMillis(), errorVariables);
LOG.warn(
"[WORKFLOW_INSTANCE_FAILED] Workflow: {}, ProcessInstance: {}, InstanceId: {} - Workflow marked as FAILED in database",
workflowName,
processInstanceId,
workflowInstanceId);
} else {
LOG.error(
"[WORKFLOW_INSTANCE_NO_KEY] Workflow: {}, ProcessInstance: {} - Cannot update workflow status, business key is missing",
workflowName,
processInstanceId);
}
} catch (Exception updateExc) {
LOG.error(
"[WORKFLOW_INSTANCE_DB_ERROR] Workflow: {}, ProcessInstance: {} - Failed to record workflow failure in database. Error: {}",
workflowName,
processInstanceId,
updateExc.getMessage(),
updateExc);
}
}
}
}
@ -54,9 +107,11 @@ public class WorkflowInstanceListener implements JavaDelegate {
workflowInstanceId,
System.currentTimeMillis(),
execution.getVariables());
LOG.debug(
String.format(
"Workflow '%s' Triggered. Instance: '%s'", workflowDefinitionName, workflowInstanceId));
LOG.info(
"[WORKFLOW_INSTANCE_CREATED] Workflow: {}, InstanceId: {}, ProcessInstance: {} - Workflow instance record created successfully",
workflowDefinitionName,
workflowInstanceId,
execution.getProcessInstanceId());
}
private void updateWorkflowInstance(
@ -65,11 +120,35 @@ public class WorkflowInstanceListener implements JavaDelegate {
getMainWorkflowDefinitionNameFromTrigger(
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()));
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());
// Capture all variables including any failure indicators
java.util.Map<String, Object> variables = new java.util.HashMap<>(execution.getVariables());
// Determine final status based on what happened during execution
String status = "FINISHED"; // Default
if (Boolean.TRUE.equals(variables.get(Workflow.FAILURE_VARIABLE))) {
status = "FAILURE";
} else if (variables.containsKey(Workflow.EXCEPTION_VARIABLE)) {
status = "EXCEPTION";
}
variables.put("status", status);
workflowInstanceRepository.updateWorkflowInstance(
workflowInstanceId, System.currentTimeMillis(), execution.getVariables());
LOG.debug(
String.format(
"Workflow '%s' Finished. Instance: '%s'", workflowDefinitionName, workflowInstanceId));
workflowInstanceId, System.currentTimeMillis(), variables);
if ("FAILURE".equals(status) || "EXCEPTION".equals(status)) {
LOG.warn(
"[WORKFLOW_INSTANCE_COMPLETED_WITH_ERRORS] Workflow: {}, InstanceId: {}, Status: {} - Workflow completed with errors",
workflowDefinitionName,
workflowInstanceId,
status);
} else {
LOG.info(
"[WORKFLOW_INSTANCE_COMPLETED] Workflow: {}, InstanceId: {}, Status: {} - Workflow completed successfully",
workflowDefinitionName,
workflowInstanceId,
status);
}
}
private String getMainWorkflowDefinitionNameFromTrigger(String triggerWorkflowDefinitionName) {

View File

@ -17,26 +17,113 @@ import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;
public class WorkflowInstanceStageListener implements JavaDelegate {
@Override
public void execute(DelegateExecution execution) {
String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String processInstanceId = execution.getProcessInstanceId();
String eventName = execution.getEventName();
String currentActivity = execution.getCurrentActivityId();
WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution);
try {
WorkflowInstanceStateRepository workflowInstanceStateRepository =
(WorkflowInstanceStateRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
switch (execution.getEventName()) {
case "start" -> addNewStage(varHandler, execution, workflowInstanceStateRepository);
case "end" -> updateStage(varHandler, execution, workflowInstanceStateRepository);
switch (eventName) {
case "start" -> {
LOG.info(
"[STAGE_START] Workflow: {}, ProcessInstance: {}, Activity: {} - Creating new stage record",
workflowName,
processInstanceId,
currentActivity);
addNewStage(varHandler, execution, workflowInstanceStateRepository);
}
case "end" -> {
LOG.info(
"[STAGE_END] Workflow: {}, ProcessInstance: {}, Activity: {} - Updating stage completion",
workflowName,
processInstanceId,
currentActivity);
updateStage(varHandler, execution, workflowInstanceStateRepository);
}
default -> LOG.debug(
String.format(
"WorkflowStageUpdaterListener does not support listening for the event: '%s'",
execution.getEventName()));
"[STAGE_EVENT] Workflow: {}, ProcessInstance: {}, Activity: {} - Unsupported event: {}",
workflowName,
processInstanceId,
currentActivity,
eventName);
}
} catch (Exception exc) {
LOG.error(
String.format(
"[%s] Failed due to: %s ",
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()),
"[STAGE_ERROR] Workflow: {}, ProcessInstance: {}, Activity: {}, Event: {} - Stage processing failed. Error: {}",
workflowName,
processInstanceId,
currentActivity,
eventName,
exc.getMessage(),
exc);
// CRITICAL: Record the stage failure in the database even if processing failed
if ("end".equals(eventName)) {
try {
WorkflowInstanceStateRepository workflowInstanceStateRepository =
(WorkflowInstanceStateRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
String businessKey = execution.getProcessInstanceBusinessKey();
if (businessKey != null && !businessKey.isEmpty()) {
UUID workflowInstanceId = UUID.fromString(businessKey);
UUID executionId =
(UUID) execution.getVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE);
if (executionId == null) {
// Use Flowable's execution ID as fallback
executionId = UUID.nameUUIDFromBytes(execution.getId().getBytes());
LOG.warn(
"[STAGE_FALLBACK_ID] Workflow: {}, ProcessInstance: {} - Using fallback execution ID: {}",
workflowName,
processInstanceId,
executionId);
}
String stage = Optional.ofNullable(currentActivity).orElse(workflowName);
// Create a failed stage record
UUID stageId =
workflowInstanceStateRepository.addNewStageToInstance(
stage + "_failed",
executionId,
workflowInstanceId,
workflowName,
System.currentTimeMillis());
java.util.Map<String, Object> failureData = new java.util.HashMap<>();
failureData.put("status", "FAILED");
failureData.put("error", exc.getMessage());
failureData.put("errorClass", exc.getClass().getSimpleName());
workflowInstanceStateRepository.updateStage(
stageId, System.currentTimeMillis(), failureData);
LOG.warn(
"[STAGE_FAILED_RECORDED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Failed stage recorded in database",
workflowName,
processInstanceId,
stage,
stageId);
} else {
LOG.error(
"[STAGE_NO_BUSINESS_KEY] Workflow: {}, ProcessInstance: {} - Cannot record stage failure, business key is missing",
workflowName,
processInstanceId);
}
} catch (Exception recordExc) {
LOG.error(
"[STAGE_DB_ERROR] Workflow: {}, ProcessInstance: {} - Failed to record stage failure in database. Error: {}",
workflowName,
processInstanceId,
recordExc.getMessage(),
recordExc);
}
}
}
}
@ -47,9 +134,42 @@ public class WorkflowInstanceStageListener implements JavaDelegate {
execution.removeTransientVariable(FAILURE_VARIABLE);
String workflowDefinitionName =
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());
String processInstanceId = execution.getProcessInstanceId();
// Check business key first - critical for stage tracking
String businessKey = execution.getProcessInstanceBusinessKey();
if (businessKey == null || businessKey.isEmpty()) {
LOG.error(
"[STAGE_MISSING_KEY] Workflow: {}, ProcessInstance: {} - Business key is missing for stage creation",
workflowDefinitionName,
processInstanceId);
throw new IllegalStateException(
String.format(
"Business key is missing for stage creation in workflow: %s",
workflowDefinitionName));
}
UUID workflowInstanceId = UUID.fromString(businessKey);
// Get or create workflow instance execution ID
UUID workflowInstanceExecutionId =
(UUID) execution.getVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE);
if (workflowInstanceExecutionId == null) {
// This should have been set by WorkflowInstanceExecutionIdSetterListener
LOG.error(
"[STAGE_MISSING_EXEC_ID] Workflow: {}, ProcessInstance: {}, InstanceId: {} - Workflow instance execution ID is null",
workflowDefinitionName,
processInstanceId,
workflowInstanceId);
// Use Flowable's execution ID as fallback
workflowInstanceExecutionId = UUID.nameUUIDFromBytes(execution.getId().getBytes());
execution.setVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE, workflowInstanceExecutionId);
LOG.warn(
"[STAGE_CREATED_FALLBACK] Workflow: {}, ProcessInstance: {} - Created fallback execution ID: {}",
workflowDefinitionName,
processInstanceId,
workflowInstanceExecutionId);
}
String stage =
Optional.ofNullable(execution.getCurrentActivityId()).orElse(workflowDefinitionName);
UUID workflowInstanceStateId =
@ -60,15 +180,44 @@ public class WorkflowInstanceStageListener implements JavaDelegate {
workflowDefinitionName,
System.currentTimeMillis());
varHandler.setNodeVariable(STAGE_INSTANCE_STATE_ID_VARIABLE, workflowInstanceStateId);
LOG.info(
"[STAGE_CREATED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Stage record created successfully",
workflowDefinitionName,
processInstanceId,
stage,
workflowInstanceStateId);
}
private void updateStage(
WorkflowVariableHandler varHandler,
DelegateExecution execution,
WorkflowInstanceStateRepository workflowInstanceStateRepository) {
String workflowDefinitionName =
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String processInstanceId = execution.getProcessInstanceId();
String stage =
Optional.ofNullable(execution.getCurrentActivityId()).orElse(workflowDefinitionName);
UUID workflowInstanceStateId =
(UUID) varHandler.getNodeVariable(STAGE_INSTANCE_STATE_ID_VARIABLE);
if (workflowInstanceStateId == null) {
LOG.error(
"[STAGE_UPDATE_NO_ID] Workflow: {}, ProcessInstance: {}, Stage: {} - Cannot update stage, state ID is null",
workflowDefinitionName,
processInstanceId,
stage);
return;
}
workflowInstanceStateRepository.updateStage(
workflowInstanceStateId, System.currentTimeMillis(), execution.getVariables());
LOG.info(
"[STAGE_UPDATED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Stage completion recorded",
workflowDefinitionName,
processInstanceId,
stage,
workflowInstanceStateId);
}
}

View File

@ -26,16 +26,40 @@ public abstract class BaseDelegate implements JavaDelegate {
@Override
public void execute(DelegateExecution execution) {
String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String processInstanceId = execution.getProcessInstanceId();
String activityId = execution.getCurrentActivityId();
String delegateClass = this.getClass().getSimpleName();
varHandler = new WorkflowVariableHandler(execution);
try {
inputNamespaceMap =
JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class);
configMap = JsonUtils.readOrConvertValue(configMapExpr.getValue(execution), Map.class);
LOG.debug(
"[DELEGATE_EXECUTE] Workflow: {}, ProcessInstance: {}, Activity: {}, Delegate: {} - Starting delegate execution",
workflowName,
processInstanceId,
activityId,
delegateClass);
innerExecute(execution);
LOG.debug(
"[DELEGATE_SUCCESS] Workflow: {}, ProcessInstance: {}, Activity: {}, Delegate: {} - Delegate execution completed",
workflowName,
processInstanceId,
activityId,
delegateClass);
} catch (Exception exc) {
LOG.error(
String.format(
"[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())),
"[DELEGATE_ERROR] Workflow: {}, ProcessInstance: {}, Activity: {}, Delegate: {} - Delegate execution failed. Error: {}",
workflowName,
processInstanceId,
activityId,
delegateClass,
exc.getMessage(),
exc);
varHandler.setGlobalVariable(EXCEPTION_VARIABLE, exc.toString());
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());