Transaction handling fix, id truncation fix by migration, feed repo fix for multi reviewer pattern, copilot comments

This commit is contained in:
Ram Narayan Balaji 2025-08-21 15:07:57 +05:30
parent d86b41ffb4
commit dd67fb59b3
13 changed files with 159 additions and 92 deletions

View File

@ -0,0 +1,16 @@
-- Increase Flowable ACTIVITY_ID_ column size to support longer user-defined workflow node names
-- This is safe as we're only increasing VARCHAR size, not changing data type or constraints
-- Flowable 7.0.1 doesn't have hard-coded assumptions about this field size
-- ACT_RU_EVENT_SUBSCR is the main bottleneck with 64 char limit
ALTER TABLE ACT_RU_EVENT_SUBSCR MODIFY ACTIVITY_ID_ varchar(255);
-- For consistency, also update other tables that might reference activity IDs
-- Note: ACT_RU_EXECUTION already has ACT_ID_ varchar(255), so it's fine
-- History tables might also need updating if you're using them
-- ALTER TABLE ACT_HI_ACTINST MODIFY ACT_ID_ varchar(255);
-- ALTER TABLE ACT_HI_DETAIL MODIFY ACT_INST_ID_ varchar(255);
-- Note: Always backup your database before running this migration
-- This change is forward-compatible but may need consideration during Flowable upgrades

View File

@ -0,0 +1,16 @@
-- Increase Flowable ACTIVITY_ID_ column size to support longer user-defined workflow node names
-- This is safe as we're only increasing VARCHAR size, not changing data type or constraints
-- Flowable 7.0.1 doesn't have hard-coded assumptions about this field size
-- ACT_RU_EVENT_SUBSCR is the main bottleneck with 64 char limit
ALTER TABLE ACT_RU_EVENT_SUBSCR ALTER COLUMN ACTIVITY_ID_ TYPE varchar(255);
-- For consistency, also update other tables that might reference activity IDs
-- Note: ACT_RU_EXECUTION already has ACT_ID_ varchar(255), so it's fine
-- History tables might also need updating if you're using them
-- ALTER TABLE ACT_HI_ACTINST ALTER COLUMN ACT_ID_ TYPE varchar(255);
-- ALTER TABLE ACT_HI_DETAIL ALTER COLUMN ACT_INST_ID_ TYPE varchar(255);
-- Note: Always backup your database before running this migration
-- This change is forward-compatible but may need consideration during Flowable upgrades

View File

@ -1,10 +1,12 @@
package org.openmetadata.service.governance.workflows; package org.openmetadata.service.governance.workflows;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition; import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.service.governance.workflows.flowable.MainWorkflow; import org.openmetadata.service.governance.workflows.flowable.MainWorkflow;
import org.openmetadata.service.governance.workflows.flowable.TriggerWorkflow; import org.openmetadata.service.governance.workflows.flowable.TriggerWorkflow;
@Slf4j
@Getter @Getter
public class Workflow { public class Workflow {
public static final String INGESTION_PIPELINE_ID_VARIABLE = "ingestionPipelineId"; public static final String INGESTION_PIPELINE_ID_VARIABLE = "ingestionPipelineId";
@ -30,17 +32,38 @@ public class Workflow {
public static String getFlowableElementId(String parentName, String elementName) { public static String getFlowableElementId(String parentName, String elementName) {
String fullId = String.format("%s.%s", parentName, elementName); String fullId = String.format("%s.%s", parentName, elementName);
// Flowable has a limit on ACTIVITY_ID_ column (64 chars in MySQL)
// We need to ensure our IDs don't exceed this limit // After migration 1.10.0, ACTIVITY_ID_ supports up to 255 chars
if (fullId.length() > 60) { // But we still want to keep IDs reasonable for debugging
// For long IDs, create a short but unique identifier if (fullId.length() <= 250) {
String hash = Integer.toHexString(fullId.hashCode());
String safeId = elementName.length() > 20 ? elementName.substring(0, 20) : elementName;
return safeId + "_" + hash;
}
return fullId; return fullId;
} }
// For extremely long IDs (user-defined nodes can have any name),
// create a truncated but still meaningful ID
String truncated =
truncateWithMeaning(parentName, 100) + "." + truncateWithMeaning(elementName, 140);
LOG.warn(
"Truncated workflow element ID from {} chars to {}: {} -> {}",
fullId.length(),
truncated.length(),
fullId,
truncated);
return truncated;
}
private static String truncateWithMeaning(String text, int maxLength) {
if (text.length() <= maxLength) {
return text;
}
// Keep first part and last part for context
int keepStart = (maxLength - 3) * 2 / 3; // Keep 2/3 at start
int keepEnd = (maxLength - 3) - keepStart; // Rest at end
return text.substring(0, keepStart) + "..." + text.substring(text.length() - keepEnd);
}
public static String getResultFromBoolean(boolean result) { public static String getResultFromBoolean(boolean result) {
return result ? SUCCESSFUL_RESULT : FAILURE_RESULT; return result ? SUCCESSFUL_RESULT : FAILURE_RESULT;
} }

View File

@ -172,7 +172,6 @@ public class WorkflowHandler {
// Deploy Main Workflow // Deploy Main Workflow
byte[] bpmnMainWorkflowBytes = byte[] bpmnMainWorkflowBytes =
bpmnXMLConverter.convertToXML(workflow.getMainWorkflow().getModel()); bpmnXMLConverter.convertToXML(workflow.getMainWorkflow().getModel());
LOG.info("[Deploy] Deploying main workflow: {}", workflow.getMainWorkflow().getWorkflowName());
repositoryService repositoryService
.createDeployment() .createDeployment()
.addBytes( .addBytes(
@ -184,9 +183,6 @@ public class WorkflowHandler {
// Deploy Trigger Workflow // Deploy Trigger Workflow
byte[] bpmnTriggerWorkflowBytes = byte[] bpmnTriggerWorkflowBytes =
bpmnXMLConverter.convertToXML(workflow.getTriggerWorkflow().getModel()); bpmnXMLConverter.convertToXML(workflow.getTriggerWorkflow().getModel());
LOG.info(
"[Deploy] Deploying trigger workflow: {}", workflow.getTriggerWorkflow().getWorkflowName());
try {
repositoryService repositoryService
.createDeployment() .createDeployment()
.addBytes( .addBytes(
@ -195,16 +191,6 @@ public class WorkflowHandler {
bpmnTriggerWorkflowBytes) bpmnTriggerWorkflowBytes)
.name(workflow.getTriggerWorkflow().getWorkflowName()) .name(workflow.getTriggerWorkflow().getWorkflowName())
.deploy(); .deploy();
LOG.info(
"[Deploy] Successfully deployed trigger workflow: {}",
workflow.getTriggerWorkflow().getWorkflowName());
} catch (Exception e) {
LOG.error(
"[Deploy] Failed to deploy trigger workflow: {}",
workflow.getTriggerWorkflow().getWorkflowName(),
e);
throw e;
}
} }
public boolean isDeployed(WorkflowDefinition wf) { public boolean isDeployed(WorkflowDefinition wf) {
@ -506,7 +492,7 @@ public class WorkflowHandler {
taskService.setVariable(task.getId(), "approvalCount", approvalCount); taskService.setVariable(task.getId(), "approvalCount", approvalCount);
taskService.setVariable(task.getId(), "rejectionCount", rejectionCount); taskService.setVariable(task.getId(), "rejectionCount", rejectionCount);
LOG.info( LOG.debug(
"[MultiApproval] Task '{}' - Approvals: {}/{}, Rejections: {}/{}", "[MultiApproval] Task '{}' - Approvals: {}/{}, Rejections: {}/{}",
task.getId(), task.getId(),
approvalCount, approvalCount,
@ -516,7 +502,7 @@ public class WorkflowHandler {
// Check if rejection threshold is met (rejection takes precedence) // Check if rejection threshold is met (rejection takes precedence)
if (rejectionCount >= rejectionThreshold) { if (rejectionCount >= rejectionThreshold) {
LOG.info( LOG.debug(
"[MultiApproval] Rejection threshold met ({}/{}), rejecting task", "[MultiApproval] Rejection threshold met ({}/{}), rejecting task",
rejectionCount, rejectionCount,
rejectionThreshold); rejectionThreshold);
@ -528,7 +514,7 @@ public class WorkflowHandler {
// Check if approval threshold is met // Check if approval threshold is met
if (approvalCount >= approvalThreshold) { if (approvalCount >= approvalThreshold) {
LOG.info( LOG.debug(
"[MultiApproval] Approval threshold met ({}/{}), approving task", "[MultiApproval] Approval threshold met ({}/{}), approving task",
approvalCount, approvalCount,
approvalThreshold); approvalThreshold);
@ -539,7 +525,7 @@ public class WorkflowHandler {
} }
// Task remains open for more votes // Task remains open for more votes
LOG.info( LOG.debug(
"[MultiApproval] Task '{}' remains open. Need {} more approvals or {} more rejections", "[MultiApproval] Task '{}' remains open. Need {} more approvals or {} more rejections",
task.getId(), task.getId(),
approvalThreshold - approvalCount, approvalThreshold - approvalCount,
@ -557,6 +543,30 @@ public class WorkflowHandler {
} }
} }
/**
* Check if a task has multi-approval support by checking for approval threshold variables.
* Tasks deployed with the new multi-approval feature will have these variables.
* Legacy tasks won't have them.
*/
public boolean hasMultiApprovalSupport(UUID customTaskId) {
try {
Task task = getTaskFromCustomTaskId(customTaskId);
if (task == null) {
return false;
}
TaskService taskService = processEngine.getTaskService();
// Check if the task has approval threshold variable
// This variable is only present in workflows deployed with multi-approval support
Object approvalThreshold = taskService.getVariable(task.getId(), "approvalThreshold");
return approvalThreshold != null;
} catch (Exception e) {
LOG.debug(
"Error checking multi-approval support for task {}: {}", customTaskId, e.getMessage());
return false;
}
}
public void terminateTaskProcessInstance(UUID customTaskId, String reason) { public void terminateTaskProcessInstance(UUID customTaskId, String reason) {
TaskService taskService = processEngine.getTaskService(); TaskService taskService = processEngine.getTaskService();
RuntimeService runtimeService = processEngine.getRuntimeService(); RuntimeService runtimeService = processEngine.getRuntimeService();

View File

@ -10,18 +10,17 @@ import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository;
import org.openmetadata.service.util.RestUtil.PutResponse; import org.openmetadata.service.util.RestUtil.PutResponse;
/** /**
* WorkflowTransactionManager provides atomic operations for workflow definitions * WorkflowTransactionManager provides coordinated operations for workflow definitions.
* across both OpenMetadata and Flowable databases.
* *
* IMPORTANT DESIGN PRINCIPLES: * REALITY CHECK:
* 1. This should ONLY be used at the API/Resource layer, NOT in repository methods * - We CANNOT have true atomic transactions across OpenMetadata and Flowable databases
* 2. This should NEVER be called during seed data initialization * - We use compensating transactions pattern: try to clean up on failures
* 3. This manages the TOP-LEVEL transaction for workflow operations * - This is "best effort" coordination, not true 2PC (two-phase commit)
* *
* The problem we're solving: * USAGE:
* - We need atomic operations across TWO databases (OpenMetadata and Flowable) * - Use at API/Resource layer only, NOT in repositories
* - If either operation fails, both should rollback * - Skip during seed data initialization (WorkflowHandler not initialized)
* - But we must NOT interfere with seed data loading which has its own transaction management * - Accepts that some edge cases may leave orphaned deployments in Flowable
*/ */
@Slf4j @Slf4j
public class WorkflowTransactionManager { public class WorkflowTransactionManager {
@ -35,9 +34,8 @@ public class WorkflowTransactionManager {
WorkflowDefinitionRepository repository = WorkflowDefinitionRepository repository =
(WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION); (WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION);
// Validate the workflow BEFORE starting any transaction // Pre-validate by creating Workflow object (constructor will throw if invalid)
Workflow workflow = new Workflow(entity); Workflow workflow = new Workflow(entity);
// validateWorkflow(workflow);
// Start a NEW transaction at the API level // Start a NEW transaction at the API level
// This is the TOP-LEVEL transaction for this operation // This is the TOP-LEVEL transaction for this operation
@ -75,9 +73,8 @@ public class WorkflowTransactionManager {
WorkflowDefinitionRepository repository = WorkflowDefinitionRepository repository =
(WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION); (WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION);
// Validate the updated workflow BEFORE starting transaction // Pre-validate the updated workflow
Workflow updatedWorkflow = new Workflow(updated); Workflow updatedWorkflow = new Workflow(updated);
// validateWorkflow(updatedWorkflow);
// Start a NEW transaction at the API level // Start a NEW transaction at the API level
Jdbi jdbi = Entity.getJdbi(); Jdbi jdbi = Entity.getJdbi();
@ -135,31 +132,6 @@ public class WorkflowTransactionManager {
}); });
} }
/** // Removed deployToFlowableFirst - the postCreate/postUpdate hooks handle deployment
* Validate a workflow definition with Flowable. // We accept that we cannot have true atomic transactions across two databases
* This is done BEFORE starting any transaction.
*/
// private static void validateWorkflow(Workflow workflow) {
// if (!WorkflowHandler.isInitialized()) {
// throw new UnhandledServerException("WorkflowHandler is not initialized");
// }
//
// try {
// BpmnXMLConverter converter = new BpmnXMLConverter();
// String mainBpmnXml = new String(converter.convertToXML(workflow.getMainModel()));
// String triggerBpmnXml = new String(converter.convertToXML(workflow.getTriggerModel()));
//
// boolean valid =
// WorkflowHandler.getInstance().validateWorkflowDefinition(mainBpmnXml)
// && WorkflowHandler.getInstance().validateWorkflowDefinition(triggerBpmnXml);
//
// if (!valid) {
// throw new UnhandledServerException(
// "Invalid workflow definition: Failed Flowable validation");
// }
// } catch (Exception e) {
// LOG.error("Error validating workflow", e);
// throw new UnhandledServerException("Failed to validate workflow: " + e.getMessage(), e);
// }
// }
} }

View File

@ -46,7 +46,6 @@ public class CheckEntityAttributesTask implements NodeInterface {
nodeDefinition.getInputNamespaceMap() != null nodeDefinition.getInputNamespaceMap() != null
? nodeDefinition.getInputNamespaceMap() ? nodeDefinition.getInputNamespaceMap()
: new HashMap<>())); : new HashMap<>()));
;
EndEvent endEvent = EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build(); new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build();

View File

@ -297,8 +297,12 @@ public class ConditionalSetEntityAttributeImpl implements JavaDelegate {
} }
if (fieldValue == null || fieldValue.isEmpty()) { if (fieldValue == null || fieldValue.isEmpty()) {
// Remove item with matching property // Remove items where the property value is null or empty
arrayList.removeIf(item -> fieldValue.equals(item.get(propertyName))); arrayList.removeIf(
item -> {
Object value = item.get(propertyName);
return value == null || (value instanceof String && ((String) value).isEmpty());
});
} else { } else {
// Find existing item or create new one // Find existing item or create new one
Map<String, Object> targetItem = Map<String, Object> targetItem =

View File

@ -17,11 +17,10 @@ import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity; import org.openmetadata.service.Entity;
import org.openmetadata.service.governance.workflows.Workflow; import org.openmetadata.service.governance.workflows.Workflow;
import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.EntityRepository;
import org.springframework.beans.factory.annotation.Autowired;
@Slf4j @Slf4j
public class RollbackEntityImpl implements JavaDelegate { public class RollbackEntityImpl implements JavaDelegate {
@Autowired private Workflow workflow; private Workflow workflow;
@Override @Override
public void execute(DelegateExecution execution) { public void execute(DelegateExecution execution) {

View File

@ -423,11 +423,11 @@ public class SetEntityAttributeImpl implements JavaDelegate {
} }
if (fieldValue == null || fieldValue.isEmpty()) { if (fieldValue == null || fieldValue.isEmpty()) {
// Remove item with matching property // Remove items where the property value is null or empty
arrayList.removeIf( arrayList.removeIf(
item -> { item -> {
assert fieldValue != null; Object value = item.get(propertyName);
return fieldValue.equals(item.get(propertyName)); return value == null || (value instanceof String && ((String) value).isEmpty());
}); });
} else { } else {
// Find existing item or create new one // Find existing item or create new one

View File

@ -25,8 +25,26 @@ public class FetchEntitiesImpl implements JavaDelegate {
@Override @Override
public void execute(DelegateExecution execution) { public void execute(DelegateExecution execution) {
List<String> entityTypes = // Handle both legacy entityType (string) and new entityTypes (array) for backward compatibility
JsonUtils.readOrConvertValue(entityTypesExpr.getValue(execution), List.class); Object entityTypesValue = entityTypesExpr.getValue(execution);
List<String> entityTypes;
if (entityTypesValue instanceof List) {
entityTypes = (List<String>) entityTypesValue;
} else if (entityTypesValue instanceof String) {
// Try to parse as JSON array, fallback to single string
String strValue = (String) entityTypesValue;
if (strValue.trim().startsWith("[") && strValue.trim().endsWith("]")) {
entityTypes = JsonUtils.readOrConvertValue(strValue, List.class);
} else {
// Legacy single entityType
entityTypes = new ArrayList<>();
entityTypes.add(strValue);
}
} else {
// Fallback: try to convert to List
entityTypes = JsonUtils.readOrConvertValue(entityTypesValue, List.class);
}
String searchFilter = String searchFilter =
Optional.ofNullable(searchFilterExpr) Optional.ofNullable(searchFilterExpr)
.map(expr -> (String) expr.getValue(execution)) .map(expr -> (String) expr.getValue(execution))

View File

@ -218,6 +218,18 @@ public class FeedRepository {
@SuppressWarnings("unused") @SuppressWarnings("unused")
protected void closeTask(String user, CloseTask closeTask) {} protected void closeTask(String user, CloseTask closeTask) {}
/**
* Check if this task supports multi-approval.
* ALL workflows support multi-approval by default after the upgrade.
* Only legacy workflows (deployed before multi-approval feature) return false.
*/
public boolean supportsMultiApproval() {
// Check if this workflow was deployed with multi-approval support
// by checking if the task has approval threshold variables
return WorkflowHandler.getInstance()
.hasMultiApprovalSupport(threadContext.getThread().getId());
}
protected final TaskType getTaskType() { protected final TaskType getTaskType() {
return threadContext.getThread().getTask().getType(); return threadContext.getThread().getTask().getType();
} }
@ -386,9 +398,8 @@ public class FeedRepository {
String origJson = JsonUtils.pojoToJson(aboutEntity); String origJson = JsonUtils.pojoToJson(aboutEntity);
EntityInterface updatedEntity = taskWorkflow.performTask(user, resolveTask); EntityInterface updatedEntity = taskWorkflow.performTask(user, resolveTask);
// For approval tasks, check if the task is actually completed // For tasks that support multi-approval, check if the task is actually completed
// (multi-approval tasks may still be open after a single approval) if (taskWorkflow.supportsMultiApproval()) {
if (taskWorkflow instanceof GlossaryTermRepository.ApprovalTaskWorkflow) {
// Check if the workflow task is still open // Check if the workflow task is still open
UUID taskId = threadContext.getThread().getId(); UUID taskId = threadContext.getThread().getId();
boolean isTaskStillOpen = WorkflowHandler.getInstance().isTaskStillOpen(taskId); boolean isTaskStillOpen = WorkflowHandler.getInstance().isTaskStillOpen(taskId);
@ -396,7 +407,6 @@ public class FeedRepository {
if (isTaskStillOpen) { if (isTaskStillOpen) {
// Task is still open, waiting for more approvals // Task is still open, waiting for more approvals
// Don't close the task or apply patches yet // Don't close the task or apply patches yet
LOG.info("Task {} is still open, waiting for more approvals", taskId);
return; return;
} }
} }

View File

@ -23,7 +23,7 @@
"type": "object", "type": "object",
"properties": { "properties": {
"entityType": { "entityType": {
"description": "[DEPRECATED] Single Entity Type for which it should be triggered. Use entityTypes for multiple types.", "description": "Deprecated: Single entity type for which workflow should be triggered. Use 'entityTypes' for multiple types.",
"type": "string" "type": "string"
}, },
"entityTypes": { "entityTypes": {
@ -37,7 +37,7 @@
}, },
"events": { "events": {
"title": "Events", "title": "Events",
"descriptions": "Select the events that should trigger this workflow", "description": "Select the events that should trigger this workflow",
"type": "array", "type": "array",
"items": { "items": {
"$ref": "#/definitions/event" "$ref": "#/definitions/event"

View File

@ -20,7 +20,7 @@
}, },
"entityType": { "entityType": {
"title": "Entity Type", "title": "Entity Type",
"description": "[DEPRECATED] Single Entity Type for which it should be triggered. Use entityTypes for multiple types.", "description": "Deprecated: Single entity type for which workflow should be triggered. Use 'entityTypes' for multiple types.",
"type": "string" "type": "string"
}, },
"entityTypes": { "entityTypes": {