Enum for UserTask Termination Messages, Removed WorkflowUtils to use WorkflowHandler's terminateDuplicate Instances, Approval and rejecters list in the variables, using namespaced variables for updatedBy

This commit is contained in:
Ram Narayan Balaji 2025-09-08 20:20:15 +05:30
parent f8690179e2
commit 3d5a5f1c19
10 changed files with 122 additions and 268 deletions

View File

@ -47,6 +47,7 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.governance.workflows.elements.nodes.userTask.UserTaskType;
import org.openmetadata.service.governance.workflows.flowable.sql.SqlMapper;
import org.openmetadata.service.governance.workflows.flowable.sql.UnlockExecutionSql;
import org.openmetadata.service.governance.workflows.flowable.sql.UnlockJobSql;
@ -238,7 +239,7 @@ public class WorkflowHandler {
public ProcessInstance triggerByKey(
String processDefinitionKey, String businessKey, Map<String, Object> variables) {
RuntimeService runtimeService = processEngine.getRuntimeService();
LOG.info(
LOG.debug(
"[WorkflowTrigger] START: processKey='{}' businessKey='{}' variables={}",
processDefinitionKey,
businessKey,
@ -246,7 +247,7 @@ public class WorkflowHandler {
try {
ProcessInstance instance =
runtimeService.startProcessInstanceByKey(processDefinitionKey, businessKey, variables);
LOG.info(
LOG.debug(
"[WorkflowTrigger] SUCCESS: processKey='{}' instanceId='{}' businessKey='{}'",
processDefinitionKey,
instance.getId(),
@ -394,12 +395,12 @@ public class WorkflowHandler {
public void resolveTask(UUID customTaskId, Map<String, Object> variables) {
TaskService taskService = processEngine.getTaskService();
LOG.info("[WorkflowTask] RESOLVE: customTaskId='{}' variables={}", customTaskId, variables);
LOG.debug("[WorkflowTask] RESOLVE: customTaskId='{}' variables={}", customTaskId, variables);
try {
Optional<Task> oTask = Optional.ofNullable(getTaskFromCustomTaskId(customTaskId));
if (oTask.isPresent()) {
Task task = oTask.get();
LOG.info(
LOG.debug(
"[WorkflowTask] Found task: flowableTaskId='{}' processInstanceId='{}' name='{}'",
task.getId(),
task.getProcessInstanceId(),
@ -416,11 +417,11 @@ public class WorkflowHandler {
boolean taskCompleted =
handleMultiApproval(task, variables, approvalThreshold, rejectionThreshold);
if (taskCompleted) {
LOG.info(
LOG.debug(
"[WorkflowTask] SUCCESS: Multi-approval task '{}' completed with threshold met",
customTaskId);
} else {
LOG.info(
LOG.debug(
"[WorkflowTask] SUCCESS: Multi-approval task '{}' recorded vote, waiting for more votes",
customTaskId);
// Update the Thread entity to remove the task from the current voter's feed
@ -431,18 +432,18 @@ public class WorkflowHandler {
Optional.ofNullable(variables)
.ifPresentOrElse(
variablesValue -> {
LOG.info(
LOG.debug(
"[WorkflowTask] Completing with variables: taskId='{}' vars={}",
task.getId(),
variablesValue);
taskService.complete(task.getId(), variablesValue);
},
() -> {
LOG.info(
LOG.debug(
"[WorkflowTask] Completing without variables: taskId='{}'", task.getId());
taskService.complete(task.getId());
});
LOG.info("[WorkflowTask] SUCCESS: Task '{}' resolved", customTaskId);
LOG.debug("[WorkflowTask] SUCCESS: Task '{}' resolved", customTaskId);
}
} else {
LOG.warn("[WorkflowTask] NOT_FOUND: No Flowable task for customTaskId='{}'", customTaskId);
@ -554,7 +555,7 @@ public class WorkflowHandler {
if (!votedUsers.contains(currentUser)) {
votedUsers.add(currentUser);
taskService.setVariable(flowableTask.getId(), "votedUsers", votedUsers);
LOG.info(
LOG.debug(
"[WorkflowTask] Added user '{}' to voted users list for Flowable task", currentUser);
}
@ -564,7 +565,7 @@ public class WorkflowHandler {
String currentAssignee = flowableTask.getAssignee();
if (currentUser.equals(currentAssignee)) {
taskService.unclaim(flowableTask.getId());
LOG.info(
LOG.debug(
"[WorkflowTask] Unclaimed Flowable task '{}' from user '{}'",
flowableTask.getId(),
currentUser);
@ -572,7 +573,7 @@ public class WorkflowHandler {
// Remove from candidate users if present
taskService.deleteCandidateUser(flowableTask.getId(), currentUser);
LOG.info(
LOG.debug(
"[WorkflowTask] Removed user '{}' from candidate users for Flowable task '{}'",
currentUser,
flowableTask.getId());
@ -611,18 +612,6 @@ public class WorkflowHandler {
return null;
}
private Integer extractTaskIdFromCustomTaskId(UUID customTaskId) {
// The customTaskId might contain the integer task ID
// This is a fallback approach if direct UUID matching doesn't work
try {
// Check if we can extract an integer ID from somewhere
// This depends on how the task ID is generated and stored
return null; // Placeholder - actual implementation would depend on ID generation logic
} catch (Exception e) {
return null;
}
}
private boolean handleMultiApproval(
Task task,
Map<String, Object> variables,
@ -638,79 +627,53 @@ public class WorkflowHandler {
rejectionThreshold = 1;
}
// Get current approval tracking variables
@SuppressWarnings("unchecked")
List<Map<String, Object>> approvals =
(List<Map<String, Object>>) taskService.getVariable(task.getId(), "approvals");
if (approvals == null) {
approvals = new ArrayList<>();
List<String> approversList =
(List<String>) taskService.getVariable(task.getId(), "approversList");
if (approversList == null) {
approversList = new ArrayList<>();
}
Integer approvalCount = (Integer) taskService.getVariable(task.getId(), "approvalCount");
if (approvalCount == null) {
approvalCount = 0;
}
Integer rejectionCount = (Integer) taskService.getVariable(task.getId(), "rejectionCount");
if (rejectionCount == null) {
rejectionCount = 0;
@SuppressWarnings("unchecked")
List<String> rejectersList =
(List<String>) taskService.getVariable(task.getId(), "rejectersList");
if (rejectersList == null) {
rejectersList = new ArrayList<>();
}
// Get the current user and approval decision
// Variables might be namespaced, so check both namespaced and global versions
String currentUser = (String) variables.get("updatedBy");
if (currentUser == null) {
// Try to find namespaced updatedBy (e.g., "ApproveGlossaryTerm_updatedBy")
for (String key : variables.keySet()) {
if (key.endsWith("_updatedBy")) {
currentUser = (String) variables.get(key);
break;
}
}
}
String nodeName = getParentActivityId(task.getExecutionId());
String updatedByVariable = getNamespacedVariableName(nodeName, "updatedBy");
String resultVariable = getNamespacedVariableName(nodeName, "result");
String currentUser = (String) variables.get(updatedByVariable);
Boolean approved = (Boolean) variables.get(resultVariable);
Boolean approved = (Boolean) variables.get("result");
if (approved == null) {
// Try to find namespaced result (e.g., "ApproveGlossaryTerm_result")
for (String key : variables.keySet()) {
if (key.endsWith("_result")) {
approved = (Boolean) variables.get(key);
break;
}
}
}
// Make variables final for lambda expression
final String finalCurrentUser = currentUser;
final Boolean finalApproved = approved;
// Check if this user has already voted
boolean alreadyVoted =
approvals.stream()
.anyMatch(a -> finalCurrentUser != null && finalCurrentUser.equals(a.get("user")));
if (alreadyVoted) {
LOG.warn("[MultiApproval] User '{}' has already voted on this task", finalCurrentUser);
if (currentUser == null || approved == null) {
LOG.warn(
"[MultiApproval] Cannot process approval - missing required variables. "
+ "updatedBy: {}, result: {}. Task remains open.",
currentUser,
approved);
// DON'T complete the task, DON'T increment counts
return false;
}
// Record the approval/rejection
Map<String, Object> approval = new HashMap<>();
approval.put("user", finalCurrentUser);
approval.put("approved", finalApproved);
approval.put("timestamp", System.currentTimeMillis());
approvals.add(approval);
if (Boolean.TRUE.equals(finalApproved)) {
approvalCount++;
} else {
rejectionCount++;
if (approversList.contains(currentUser) || rejectersList.contains(currentUser)) {
LOG.warn("[MultiApproval] User '{}' has already voted on this task", currentUser);
return false;
}
// Update task variables
taskService.setVariable(task.getId(), "approvals", approvals);
taskService.setVariable(task.getId(), "approvalCount", approvalCount);
taskService.setVariable(task.getId(), "rejectionCount", rejectionCount);
if (approved) {
approversList.add(currentUser);
} else {
rejectersList.add(currentUser);
}
taskService.setVariable(task.getId(), "approversList", approversList);
taskService.setVariable(task.getId(), "rejectersList", rejectersList);
int approvalCount = approversList.size();
int rejectionCount = rejectersList.size();
LOG.debug(
"[MultiApproval] Task '{}' - Approvals: {}/{}, Rejections: {}/{}",
@ -727,14 +690,7 @@ public class WorkflowHandler {
rejectionCount,
rejectionThreshold);
// Set the final result - need to check if result is namespaced
String resultKey = "result";
for (String key : variables.keySet()) {
if (key.endsWith("_result")) {
resultKey = key;
break;
}
}
variables.put(resultKey, false);
variables.put(resultVariable, false);
taskService.complete(task.getId(), variables);
return true;
}
@ -746,14 +702,7 @@ public class WorkflowHandler {
approvalCount,
approvalThreshold);
// Set the final result - need to check if result is namespaced
String resultKey = "result";
for (String key : variables.keySet()) {
if (key.endsWith("_result")) {
resultKey = key;
break;
}
}
variables.put(resultKey, true);
variables.put(resultVariable, true);
taskService.complete(task.getId(), variables);
return true;
}
@ -888,12 +837,8 @@ public class WorkflowHandler {
LOG.debug(
"Extracted subprocess ID: '{}' from task key '{}'", subProcessId, taskDefinitionKey);
// Try both possible termination message patterns
// UserApprovalTask uses: subProcessId_terminateProcess
// DetailedUserApprovalTask uses: subProcessId_terminateDetailedProcess
String[] messagePatterns = {
subProcessId + "_terminateProcess", subProcessId + "_terminateDetailedProcess"
};
// Get all possible termination message patterns from the enum
List<String> messagePatterns = UserTaskType.getAllTerminationPatterns(subProcessId);
for (String messageName : messagePatterns) {
LOG.debug("Checking for message subscription: {}", messageName);

View File

@ -1,132 +0,0 @@
package org.openmetadata.service.governance.workflows;
import static org.openmetadata.service.governance.workflows.elements.TriggerFactory.getMainWorkflowDefinitionNameFromTrigger;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.runtime.ProcessInstance;
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
import org.openmetadata.schema.governance.workflows.WorkflowInstance;
import org.openmetadata.schema.utils.ResultList;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;
import org.openmetadata.service.util.EntityUtil;
@Slf4j
public class WorkflowUtils {
/**
* Terminates conflicting workflow instances of a specific type for an entity.
* Orchestrates both Flowable termination and OpenMetadata database updates.
*/
public static void terminateConflictingInstances(
String triggerWorkflowDefinitionKey, String entityLink, String currentProcessInstanceId) {
try {
// Convert trigger name to main workflow name (e.g., "GlossaryTermApprovalWorkflowTrigger"
// "GlossaryTermApprovalWorkflow")
String mainWorkflowDefinitionName =
getMainWorkflowDefinitionNameFromTrigger(triggerWorkflowDefinitionKey);
WorkflowInstanceRepository workflowInstanceRepository =
(WorkflowInstanceRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE);
WorkflowInstanceStateRepository workflowInstanceStateRepository =
(WorkflowInstanceStateRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
// Query for running instances of this workflow type for this entity
ListFilter filter = new ListFilter(null);
filter.addQueryParam("entityLink", entityLink);
long endTs = System.currentTimeMillis();
long startTs = endTs - (7L * 24 * 60 * 60 * 1000);
ResultList<WorkflowInstance> allInstances =
workflowInstanceRepository.list(null, startTs, endTs, 100, filter, false);
// Filter to running instances of this specific workflow type
List<WorkflowInstance> candidateInstances =
allInstances.getData().stream()
.filter(
instance -> WorkflowInstance.WorkflowStatus.RUNNING.equals(instance.getStatus()))
.filter(
instance -> {
try {
WorkflowDefinitionRepository repo =
(WorkflowDefinitionRepository)
Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION);
var def =
repo.get(
null,
instance.getWorkflowDefinitionId(),
EntityUtil.Fields.EMPTY_FIELDS);
return mainWorkflowDefinitionName.equals(def.getName());
} catch (Exception e) {
return false;
}
})
.toList();
// Check if there are multiple instances using Flowable's API
RuntimeService runtimeService = WorkflowHandler.getInstance().getRuntimeService();
List<ProcessInstance> runningProcessInstances =
runtimeService
.createProcessInstanceQuery()
.processDefinitionKey(mainWorkflowDefinitionName)
.list();
// Find instances to terminate (exclude the current one by process instance ID)
List<WorkflowInstance> conflictingInstances =
candidateInstances.stream()
.filter(
instance -> {
// Check if this WorkflowInstance corresponds to a different process instance
return runningProcessInstances.stream()
.filter(pi -> !pi.getId().equals(currentProcessInstanceId))
.anyMatch(pi -> pi.getBusinessKey().equals(instance.getId().toString()));
})
.toList();
if (conflictingInstances.isEmpty()) {
LOG.debug("No conflicting instances found to terminate for {}", mainWorkflowDefinitionName);
return;
}
// Execute everything in a single transaction for atomicity
Entity.getJdbi()
.inTransaction(
TransactionIsolationLevel.READ_COMMITTED,
handle -> {
try {
// 1. Terminate Flowable processes using existing handler method
WorkflowHandler.getInstance().terminateWorkflow(mainWorkflowDefinitionName);
// 2. Mark OpenMetadata instances and states as FAILURE
for (WorkflowInstance instance : conflictingInstances) {
workflowInstanceStateRepository.markInstanceStatesAsFailed(
instance.getId(), "Terminated due to conflicting workflow instance");
workflowInstanceRepository.markInstanceAsFailed(
instance.getId(), "Terminated due to conflicting workflow instance");
}
return null;
} catch (Exception e) {
LOG.error(
"Failed to terminate conflicting instances in transaction: {}",
e.getMessage());
throw e;
}
});
LOG.info(
"Terminated {} conflicting instances of {} for entity {}",
conflictingInstances.size(),
mainWorkflowDefinitionName,
entityLink);
} catch (Exception e) {
LOG.warn("Failed to terminate conflicting instances: {}", e.getMessage());
}
}
}

View File

@ -73,8 +73,8 @@ public class CheckEntityAttributesTask implements NodeInterface {
private ServiceTask getCheckEntityAttributesServiceTask(
String subProcessId, String rules, String inputNamespaceMap) {
LOG.info("CheckEntityAttributesTask: rules = {}", rules);
LOG.info("CheckEntityAttributesTask: inputNamespaceMap = {}", inputNamespaceMap);
LOG.debug("CheckEntityAttributesTask: rules = {}", rules);
LOG.debug("CheckEntityAttributesTask: inputNamespaceMap = {}", inputNamespaceMap);
FieldExtension rulesExpr =
new FieldExtensionBuilder().fieldName("rulesExpr").fieldValue(rules).build();
FieldExtension inputNamespaceMapExpr =

View File

@ -197,10 +197,10 @@ public class ChangeReviewTask implements NodeInterface {
}
private BoundaryEvent getTerminationEvent(String subProcessId) {
// Use a consistent format that matches what getFlowableElementId produces
// This ensures uniqueness per node within the workflow definition
// ChangeReviewTask uses terminateChangeReviewProcess to distinguish from UserApprovalTask
String uniqueMessageName = getFlowableElementId(subProcessId, "terminateChangeReviewProcess");
// Use the enum to get the termination message suffix for this task type
String uniqueMessageName =
getFlowableElementId(
subProcessId, UserTaskType.CHANGE_REVIEW_TASK.getTerminationMessageSuffix());
Message terminationMessage = new Message();
terminationMessage.setId(uniqueMessageName);

View File

@ -195,10 +195,10 @@ public class UserApprovalTask implements NodeInterface {
}
private BoundaryEvent getTerminationEvent(String subProcessId) {
// Use a consistent format that matches what getFlowableElementId produces
// This ensures uniqueness per node within the workflow definition
// Format: subProcessId.terminateProcess (where subProcessId is the node name)
String uniqueMessageName = getFlowableElementId(subProcessId, "terminateProcess");
// Use the enum to get the termination message suffix for this task type
String uniqueMessageName =
getFlowableElementId(
subProcessId, UserTaskType.USER_APPROVAL_TASK.getTerminationMessageSuffix());
Message terminationMessage = new Message();
terminationMessage.setId(uniqueMessageName);

View File

@ -0,0 +1,39 @@
package org.openmetadata.service.governance.workflows.elements.nodes.userTask;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public enum UserTaskType {
USER_APPROVAL_TASK("terminateProcess"),
CHANGE_REVIEW_TASK("terminateChangeReviewProcess");
private final String terminationMessageSuffix;
UserTaskType(String terminationMessageSuffix) {
this.terminationMessageSuffix = terminationMessageSuffix;
}
public String getTerminationMessageSuffix() {
return terminationMessageSuffix;
}
public String getTerminationMessageName(String subProcessId) {
return subProcessId + "_" + terminationMessageSuffix;
}
public static List<String> getAllTerminationPatterns(String subProcessId) {
return Arrays.stream(values())
.map(type -> type.getTerminationMessageName(subProcessId))
.collect(Collectors.toList());
}
public static String findTerminationPattern(String messageName) {
for (UserTaskType type : values()) {
if (messageName.endsWith("_" + type.getTerminationMessageSuffix())) {
return type.getTerminationMessageSuffix();
}
}
return null;
}
}

View File

@ -4,7 +4,6 @@ import static org.openmetadata.service.governance.workflows.WorkflowHandler.getP
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.BpmnError;
import org.flowable.engine.delegate.TaskListener;
@ -34,18 +33,21 @@ public class ApprovalTaskCompletionValidator implements TaskListener {
return; // Single approval, allow completion
}
// Get current vote counts
// Get current vote lists and calculate counts
@SuppressWarnings("unchecked")
List<Map<String, Object>> approvals =
(List<Map<String, Object>>) delegateTask.getVariable("approvals");
if (approvals == null) {
approvals = new ArrayList<>();
List<String> approversList = (List<String>) delegateTask.getVariable("approversList");
if (approversList == null) {
approversList = new ArrayList<>();
}
Integer approvalCount = (Integer) delegateTask.getVariable("approvalCount");
Integer rejectionCount = (Integer) delegateTask.getVariable("rejectionCount");
if (approvalCount == null) approvalCount = 0;
if (rejectionCount == null) rejectionCount = 0;
@SuppressWarnings("unchecked")
List<String> rejectersList = (List<String>) delegateTask.getVariable("rejectersList");
if (rejectersList == null) {
rejectersList = new ArrayList<>();
}
int approvalCount = approversList.size();
int rejectionCount = rejectersList.size();
// Check if thresholds are met
boolean approvalThresholdMet = approvalCount >= approvalThreshold;

View File

@ -79,9 +79,9 @@ public class CreateApprovalTaskImpl implements TaskListener {
// Store approval and rejection thresholds and initialize approval tracking in task variables
delegateTask.setVariable("approvalThreshold", approvalThreshold);
delegateTask.setVariable("rejectionThreshold", rejectionThreshold);
delegateTask.setVariable("approvals", new ArrayList<Map<String, Object>>());
delegateTask.setVariable("approvalCount", 0);
delegateTask.setVariable("rejectionCount", 0);
// Use separate lists for approvers and rejecters - simpler and cleaner
delegateTask.setVariable("approversList", new ArrayList<String>());
delegateTask.setVariable("rejectersList", new ArrayList<String>());
} catch (Exception exc) {
LOG.error(
String.format(

View File

@ -82,9 +82,9 @@ public class CreateChangeReviewTaskImpl implements TaskListener {
// Store approval and rejection thresholds and initialize approval tracking in task variables
delegateTask.setVariable("approvalThreshold", approvalThreshold);
delegateTask.setVariable("rejectionThreshold", rejectionThreshold);
delegateTask.setVariable("approvals", new ArrayList<Map<String, Object>>());
delegateTask.setVariable("approvalCount", 0);
delegateTask.setVariable("rejectionCount", 0);
// Use separate lists for approvers and rejecters - simpler and cleaner
delegateTask.setVariable("approversList", new ArrayList<String>());
delegateTask.setVariable("rejectersList", new ArrayList<String>());
} catch (Exception exc) {
LOG.error(
String.format(

View File

@ -16,7 +16,6 @@ import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.governance.workflows.WorkflowUtils;
import org.openmetadata.service.governance.workflows.WorkflowVariableHandler;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.rules.RuleEngine;
@ -47,8 +46,9 @@ public class FilterEntityImpl implements JavaDelegate {
String triggerWorkflowDefinitionKey =
WorkflowHandler.getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String currentProcessInstanceId = execution.getProcessInstanceId();
WorkflowUtils.terminateConflictingInstances(
triggerWorkflowDefinitionKey, entityLinkStr, currentProcessInstanceId);
WorkflowHandler.getInstance()
.terminateDuplicateInstances(
triggerWorkflowDefinitionKey, entityLinkStr, currentProcessInstanceId);
}
log.debug("Trigger Glossary Term Approval Workflow: {}", passesFilter);