feat(governance): Implement transactional custom workflows - improved

This commit introduces a robust, transactional, and extensible framework for custom governance workflows in OpenMetadata.

Key features and improvements include:

Transactional Workflow Management: A new WorkflowTransactionManager ensures atomic operations for creating, updating, and deleting workflow definitions, maintaining consistency between the OpenMetadata database and the Flowable engine.

Safe ID Encoding: Implemented a WorkflowIdEncoder to generate safe, Base64-encoded, and collision-resistant IDs for Flowable processes, preventing errors from ID truncation.

Rollback and Deprecation Tasks:

Added RollbackEntityTask to revert entities to their last approved state.

Introduced DeprecateStaleEntityTask for automated lifecycle management of stale assets.

Enhanced Workflow Engine:

Improved WorkflowHandler to validate workflow definitions before deployment.

Added new custom functions to the rule engine for checking entity update timestamps and calculating field completeness scores.

CI/CD and Build Improvements:

Updated the CI Dockerfile with a multi-stage build and refined dependency installation.

Modified POM files to include necessary dependencies for new features.
This commit is contained in:
Ram Narayan Balaji 2025-08-20 19:39:11 +05:30
parent 8d37e4ab0c
commit e1473cee79
15 changed files with 1043 additions and 30 deletions

View File

@ -25,6 +25,7 @@ import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration;
import org.flowable.engine.repository.Deployment;
import org.flowable.engine.repository.ProcessDefinition;
import org.flowable.engine.runtime.Execution;
import org.flowable.engine.runtime.ProcessInstance;
@ -278,6 +279,25 @@ public class WorkflowHandler {
.singleResult();
}
public boolean validateWorkflowDefinition(String workflowDefinition) {
try {
RepositoryService repositoryService = processEngine.getRepositoryService();
Deployment deployment =
repositoryService
.createDeployment()
.addString("test-workflow.bpmn20.xml", workflowDefinition)
.name("validation-test-" + System.currentTimeMillis())
.deploy();
repositoryService.deleteDeployment(deployment.getId(), true);
return true;
} catch (Exception e) {
LOG.error("Workflow definition validation failed: {}", e.getMessage());
return false;
}
}
public Map<String, Object> transformToNodeVariables(
UUID customTaskId, Map<String, Object> variables) {
Map<String, Object> namespacedVariables = null;

View File

@ -0,0 +1,111 @@
package org.openmetadata.service.governance.workflows;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WorkflowIdEncoder {
private static final int MAX_FLOWABLE_ID_LENGTH = 255;
private static final int SAFE_ID_LENGTH = 200;
private static final Map<String, String> ID_CACHE = new ConcurrentHashMap<>();
private static final Map<String, String> REVERSE_CACHE = new ConcurrentHashMap<>();
private WorkflowIdEncoder() {}
public static String encodeId(String originalId) {
if (originalId == null || originalId.isEmpty()) {
return originalId;
}
return ID_CACHE.computeIfAbsent(
originalId,
id -> {
String encoded =
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(id.getBytes(StandardCharsets.UTF_8))
.replace("-", "_")
.replace("+", "_");
if (encoded.length() > SAFE_ID_LENGTH) {
String hash = generateHash(id);
String prefix = encoded.substring(0, 100);
encoded = prefix + "_" + hash;
LOG.debug("Long ID encoded with hash: original={}, encoded={}", id, encoded);
}
REVERSE_CACHE.put(encoded, id);
return encoded;
});
}
public static String decodeId(String encodedId) {
if (encodedId == null || encodedId.isEmpty()) {
return encodedId;
}
String cached = REVERSE_CACHE.get(encodedId);
if (cached != null) {
return cached;
}
if (encodedId.contains("_") && encodedId.length() > 100) {
LOG.warn("Cannot decode hashed ID: {}", encodedId);
return encodedId;
}
try {
String decoded =
new String(
Base64.getUrlDecoder().decode(encodedId.replace("_", "+").replace("_", "-")),
StandardCharsets.UTF_8);
REVERSE_CACHE.put(encodedId, decoded);
ID_CACHE.put(decoded, encodedId);
return decoded;
} catch (Exception e) {
LOG.error("Failed to decode ID: {}", encodedId, e);
return encodedId;
}
}
public static String generateSafeFlowableId(String... parts) {
String combined = String.join("-", parts);
String encoded = encodeId(combined);
if (encoded.length() > MAX_FLOWABLE_ID_LENGTH) {
String hash = generateHash(combined);
String safeId = "id_" + hash;
ID_CACHE.put(combined, safeId);
REVERSE_CACHE.put(safeId, combined);
LOG.debug("Generated safe Flowable ID: original={}, safe={}", combined, safeId);
return safeId;
}
return encoded;
}
private static String generateHash(String input) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
String base64Hash =
Base64.getUrlEncoder().withoutPadding().encodeToString(hash).substring(0, 32);
return base64Hash.replace("-", "").replace("_", "").replace("+", "").replace("/", "");
} catch (NoSuchAlgorithmException e) {
LOG.error("SHA-256 algorithm not available", e);
return input.hashCode() + "";
}
}
public static void clearCache() {
ID_CACHE.clear();
REVERSE_CACHE.clear();
}
}

View File

@ -0,0 +1,167 @@
package org.openmetadata.service.governance.workflows;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.converter.BpmnXMLConverter;
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository;
@Slf4j
public class WorkflowTransactionManager {
/**
* Atomically stores a workflow definition in OpenMetadata and deploys it to Flowable.
* If either operation fails, both are rolled back.
*
* @param entity The workflow definition to store and deploy
* @param update Whether this is an update operation
* @return The stored and deployed workflow definition
*/
public WorkflowDefinition storeAndDeployWorkflowDefinition(
WorkflowDefinition entity, boolean update) {
WorkflowDefinitionRepository repository =
(WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION);
// First validate the workflow definition with Flowable before storing
Workflow workflow = new Workflow(entity);
BpmnXMLConverter converter = new BpmnXMLConverter();
String mainBpmnXml = new String(converter.convertToXML(workflow.getMainModel()));
String triggerBpmnXml = new String(converter.convertToXML(workflow.getTriggerModel()));
if (!WorkflowHandler.getInstance().validateWorkflowDefinition(mainBpmnXml)
|| !WorkflowHandler.getInstance().validateWorkflowDefinition(triggerBpmnXml)) {
throw new UnhandledServerException("Invalid workflow definition: Failed Flowable validation");
}
// Use transaction to store the entity
Entity.getJdbi()
.useTransaction(
TransactionIsolationLevel.READ_COMMITTED,
handle -> {
try {
// Store the entity in OpenMetadata DB using the parent storeEntity method
repository.storeEntityInternal(entity, update);
// Delete any existing workflow with the same name for clean replacement
try {
WorkflowHandler.getInstance().deleteWorkflowDefinition(entity);
} catch (Exception e) {
// Ignore if doesn't exist
}
// Deploy to Flowable
WorkflowHandler.getInstance().deploy(workflow);
LOG.info(
"Successfully stored and deployed workflow definition: {}", entity.getName());
} catch (Exception e) {
LOG.error(
"Failed to store and deploy workflow definition: {}", entity.getName(), e);
// Rollback will happen automatically due to exception in transaction
throw new UnhandledServerException(
"Failed to store and deploy workflow definition: " + e.getMessage());
}
});
return entity;
}
/**
* Atomically updates a workflow definition in OpenMetadata and redeploys it to Flowable.
* If either operation fails, both are rolled back.
*
* @param original The original workflow definition
* @param updated The updated workflow definition
* @return The updated and redeployed workflow definition
*/
public WorkflowDefinition updateAndRedeployWorkflowDefinition(
WorkflowDefinition original, WorkflowDefinition updated) {
WorkflowDefinitionRepository repository =
(WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION);
// First validate the updated workflow definition
Workflow updatedWorkflow = new Workflow(updated);
BpmnXMLConverter converter = new BpmnXMLConverter();
String mainBpmnXml = new String(converter.convertToXML(updatedWorkflow.getMainModel()));
String triggerBpmnXml = new String(converter.convertToXML(updatedWorkflow.getTriggerModel()));
if (!WorkflowHandler.getInstance().validateWorkflowDefinition(mainBpmnXml)
|| !WorkflowHandler.getInstance().validateWorkflowDefinition(triggerBpmnXml)) {
throw new UnhandledServerException(
"Invalid updated workflow definition: Failed Flowable validation");
}
// Use transaction to update the entity
Entity.getJdbi()
.useTransaction(
TransactionIsolationLevel.READ_COMMITTED,
handle -> {
try {
// Delete old deployment from Flowable first
WorkflowHandler.getInstance().deleteWorkflowDefinition(original);
// Update the entity in OpenMetadata DB using the parent storeEntity method
repository.storeEntityInternal(updated, true);
// Deploy updated version to Flowable
WorkflowHandler.getInstance().deploy(updatedWorkflow);
LOG.info(
"Successfully updated and redeployed workflow definition: {}",
updated.getName());
} catch (Exception e) {
LOG.error(
"Failed to update and redeploy workflow definition: {}", updated.getName(), e);
// Try to restore original deployment
try {
WorkflowHandler.getInstance().deploy(new Workflow(original));
LOG.info("Restored original workflow deployment for: {}", original.getName());
} catch (Exception restoreEx) {
LOG.error(
"Failed to restore original workflow deployment: {}",
original.getName(),
restoreEx);
}
throw new UnhandledServerException(
"Failed to update and redeploy workflow definition: " + e.getMessage());
}
});
return updated;
}
/**
* Atomically deletes a workflow definition from OpenMetadata and undeploys it from Flowable.
*
* @param entity The workflow definition to delete
*/
public void deleteWorkflowDefinition(WorkflowDefinition entity) {
WorkflowDefinitionRepository repository =
(WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION);
Entity.getJdbi()
.useTransaction(
TransactionIsolationLevel.READ_COMMITTED,
handle -> {
try {
// Delete from Flowable first
WorkflowHandler.getInstance().deleteWorkflowDefinition(entity);
// Delete from OpenMetadata DB
repository.delete("admin", entity.getId(), false, false);
LOG.info("Successfully deleted workflow definition: {}", entity.getName());
} catch (Exception e) {
LOG.error("Failed to delete workflow definition: {}", entity.getName(), e);
throw new UnhandledServerException(
"Failed to delete workflow definition: " + e.getMessage());
}
});
}
}

View File

@ -6,6 +6,7 @@ import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinit
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.ConditionalSetEntityAttributeTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CreateAndRunIngestionPipelineTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RollbackEntityTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityAttributeTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTaskDefinition;
@ -16,6 +17,7 @@ import org.openmetadata.schema.governance.workflows.elements.nodes.startEvent.St
import org.openmetadata.schema.governance.workflows.elements.nodes.userTask.UserApprovalTaskDefinition;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.ConditionalSetEntityAttributeTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.RollbackEntityTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityAttributeTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTask;
@ -47,6 +49,8 @@ public class NodeFactory {
case CREATE_AND_RUN_INGESTION_PIPELINE_TASK -> new CreateAndRunIngestionPipelineTask(
(CreateAndRunIngestionPipelineTaskDefinition) nodeDefinition, config);
case RUN_APP_TASK -> new RunAppTask((RunAppTaskDefinition) nodeDefinition, config);
case ROLLBACK_ENTITY_TASK -> new RollbackEntityTask(
(RollbackEntityTaskDefinition) nodeDefinition, config);
case PARALLEL_GATEWAY -> new ParallelGateway(
(ParallelGatewayDefinition) nodeDefinition, config);
};

View File

@ -0,0 +1,77 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask;
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;
import java.util.List;
import lombok.Getter;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.FieldExtension;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.ServiceTask;
import org.openmetadata.schema.governance.workflows.WorkflowConfiguration;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RollbackEntityTaskDefinition;
import org.openmetadata.service.governance.workflows.elements.NodeInterface;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.RollbackEntityImpl;
import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
@Getter
public class RollbackEntityTask implements NodeInterface {
private final String nodeId;
private final ServiceTask serviceTask;
private BoundaryEvent runtimeExceptionBoundaryEvent;
private final RollbackEntityTaskDefinition config;
public RollbackEntityTask(
RollbackEntityTaskDefinition nodeDefinition, WorkflowConfiguration workflowConfig) {
this.config = nodeDefinition;
this.nodeId = getFlowableElementId(nodeDefinition.getName(), "rollbackTask");
List<FieldExtension> extensions =
List.of(
new FieldExtensionBuilder()
.fieldName(RELATED_ENTITY_VARIABLE)
.fieldValue(RELATED_ENTITY_VARIABLE)
.build(),
new FieldExtensionBuilder()
.fieldName(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE)
.fieldValue(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE)
.build(),
new FieldExtensionBuilder()
.fieldName("rollbackToStatus")
.fieldValue(
nodeDefinition.getConfig() != null
&& nodeDefinition.getConfig().getRollbackToStatus() != null
? nodeDefinition.getConfig().getRollbackToStatus()
: "Approved")
.build());
ServiceTaskBuilder builder =
new ServiceTaskBuilder().id(this.nodeId).implementation(RollbackEntityImpl.class.getName());
for (FieldExtension extension : extensions) {
builder.addFieldExtension(extension);
}
serviceTask = builder.build();
}
@Override
public void addToWorkflow(BpmnModel model, Process process) {
process.addFlowElement(serviceTask);
runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(serviceTask, false);
runtimeExceptionBoundaryEvent.setId(
getFlowableElementId(serviceTask.getId(), WORKFLOW_RUNTIME_EXCEPTION));
process.addFlowElement(runtimeExceptionBoundaryEvent);
}
@Override
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
return runtimeExceptionBoundaryEvent;
}
}

View File

@ -52,12 +52,20 @@ public class CheckEntityAttributesImpl implements JavaDelegate {
private Boolean checkAttributes(MessageParser.EntityLink entityLink, String rules) {
EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL);
boolean result;
try {
result = (boolean) RuleEngine.getInstance().apply(rules, JsonUtils.getMap(entity));
Object result = RuleEngine.getInstance().apply(rules, JsonUtils.getMap(entity));
// Handle both boolean and numeric results for scoring scenarios
if (result instanceof Number) {
double score = ((Number) result).doubleValue();
// For numeric results, consider >= 50 as success (configurable threshold)
return score >= 50.0;
}
// Default boolean handling
return Boolean.TRUE.equals(result);
} catch (Exception e) {
throw new RuntimeException(e);
}
return result;
}
}

View File

@ -0,0 +1,197 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl;
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.GlossaryTerm;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.governance.workflows.Workflow;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.springframework.beans.factory.annotation.Autowired;
@Slf4j
public class RollbackEntityImpl implements JavaDelegate {
@Autowired private Workflow workflow;
@Override
public void execute(DelegateExecution execution) {
try {
String workflowInstanceExecutionId =
(String) execution.getVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE);
Map<String, Object> relatedEntityMap =
(Map<String, Object>) execution.getVariable(RELATED_ENTITY_VARIABLE);
if (relatedEntityMap == null) {
throw new IllegalArgumentException("Related entity variable is null");
}
EntityReference entityReference =
JsonUtils.convertValue(relatedEntityMap, EntityReference.class);
String entityType = entityReference.getType();
UUID entityId = entityReference.getId();
LOG.info(
"[RollbackEntity] Rolling back entity: {} ({}), Workflow Instance: {}",
entityReference.getName(),
entityId,
workflowInstanceExecutionId);
EntityRepository<?> repository = Entity.getEntityRepository(entityType);
EntityInterface currentEntity =
repository.get(null, entityId, repository.getFields("*"), Include.ALL, false);
String rollbackToStatus = (String) execution.getVariable("rollbackToStatus");
if (rollbackToStatus == null || rollbackToStatus.isEmpty()) {
rollbackToStatus = "Approved";
}
Double previousVersion =
getPreviousApprovedVersion(currentEntity, repository, rollbackToStatus);
if (previousVersion == null) {
LOG.warn(
"[RollbackEntity] No previous approved version found for entity: {} ({})",
entityReference.getName(),
entityId);
return;
}
EntityInterface previousEntity = repository.getVersion(entityId, previousVersion.toString());
LOG.info(
"[RollbackEntity] Rolling back entity {} from version {} to version {}",
entityReference.getName(),
currentEntity.getVersion(),
previousVersion);
restoreToPreviousVersion(repository, currentEntity, previousEntity);
// Store rollback information in execution variables
execution.setVariable("rollbackAction", "rollback");
execution.setVariable("rollbackFromVersion", currentEntity.getVersion());
execution.setVariable("rollbackToVersion", previousVersion);
execution.setVariable("rollbackEntityId", entityId.toString());
execution.setVariable("rollbackEntityType", entityType);
LOG.info(
"[RollbackEntity] Successfully rolled back entity: {} ({}) to version {}",
entityReference.getName(),
entityId,
previousVersion);
} catch (Exception e) {
LOG.error("[RollbackEntity] Error during entity rollback: {}", e.getMessage(), e);
throw new RuntimeException("Failed to rollback entity", e);
}
}
private Double getPreviousApprovedVersion(
EntityInterface entity, EntityRepository<?> repository, String rollbackToStatus) {
try {
UUID entityId = entity.getId();
// Get entity history using listVersions method
EntityHistory history = repository.listVersions(entityId);
// Current version
Double currentVersion = entity.getVersion();
// Look through versions to find the most recent approved one before current
Double previousApprovedVersion = null;
for (Object versionObj : history.getVersions()) {
try {
// The versions list contains JSON strings, not Maps
String versionJson;
if (versionObj instanceof String) {
versionJson = (String) versionObj;
} else {
// Fallback: convert to JSON if it's not already a string
versionJson = JsonUtils.pojoToJson(versionObj);
}
// Parse just the version number from the JSON
EntityInterface versionEntity = JsonUtils.readValue(versionJson, entity.getClass());
Double versionNumber = versionEntity.getVersion();
// Skip current and later versions
if (versionNumber >= currentVersion) {
continue;
}
// Get this version's full entity using getVersion
EntityInterface fullVersionEntity =
repository.getVersion(entityId, versionNumber.toString());
// Check if it's approved (for GlossaryTerm, check status field)
if (isApprovedVersion(fullVersionEntity, rollbackToStatus)) {
previousApprovedVersion = versionNumber;
break; // Found the most recent approved version
}
} catch (Exception e) {
LOG.warn("Could not parse version: {}", e.getMessage());
continue;
}
}
return previousApprovedVersion;
} catch (Exception e) {
LOG.error("Error finding previous approved version", e);
return null;
}
}
private boolean isApprovedVersion(EntityInterface entity, String targetStatus) {
// For GlossaryTerm, check the status field
if (entity instanceof GlossaryTerm) {
GlossaryTerm glossaryTerm = (GlossaryTerm) entity;
return targetStatus.equalsIgnoreCase(glossaryTerm.getStatus().value());
}
// For other entities, we'd need to check their status when that field is added
// For now, just return the previous version as "approved"
return true;
}
private void restoreToPreviousVersion(
EntityRepository<?> repository,
EntityInterface currentEntity,
EntityInterface previousEntity) {
try {
// Get current entity using getVersion (same loading method as previous)
currentEntity =
repository.getVersion(currentEntity.getId(), currentEntity.getVersion().toString());
// Get previous entity using getVersion (already loaded this way)
// previousEntity is already from getVersion in the calling method
// Now both loaded the same way - create PATCH
String currentJson = JsonUtils.pojoToJson(currentEntity);
String previousJson = JsonUtils.pojoToJson(previousEntity);
jakarta.json.JsonPatch patch = JsonUtils.getJsonPatch(currentJson, previousJson);
// Apply PATCH using FQN (not ID)
String user = "governance-bot"; // System user for rollback operations
repository.patch(null, currentEntity.getFullyQualifiedName(), user, patch);
LOG.info(
"[RollbackEntity] Successfully applied rollback patch for entity: {} ({})",
currentEntity.getName(),
currentEntity.getId());
} catch (Exception e) {
LOG.error("[RollbackEntity] Failed to restore entity to previous version", e);
throw new RuntimeException("Failed to restore entity to previous version", e);
}
}
}

View File

@ -197,7 +197,7 @@ public class SetEntityAttributeImpl implements JavaDelegate {
* @param fieldValue The value to set in the specified field
*
* @throws RuntimeException if JSON processing fails or entity repository is not found
* @see #setNestedField(Map, String, String) for nested field handling details
* @see #setNestedField(Map, String, Object) for nested field handling details
*/
private void setEntityField(
EntityInterface entity, String entityType, String user, String fieldName, String fieldValue) {
@ -210,24 +210,68 @@ public class SetEntityAttributeImpl implements JavaDelegate {
// Step 3: Convert copy to map for generic field manipulation
Map<String, Object> entityMap = JsonUtils.getMap(entityCopy);
// Step 4: Set the field value in the map - supports nested fields with dot notation
setNestedField(entityMap, fieldName, fieldValue);
// Step 4: Parse field value - could be JSON object/array or simple string
Object parsedValue = parseFieldValue(fieldValue);
// Step 5: Convert the modified map back to entity
// Step 5: Set the field value in the map - supports nested fields with dot notation
setNestedField(entityMap, fieldName, parsedValue);
// Step 6: Convert the modified map back to entity
String modifiedJson = JsonUtils.pojoToJson(entityMap);
EntityInterface modifiedEntity = JsonUtils.readValue(modifiedJson, entity.getClass());
// Step 6: Get the updated JSON from the modified entity
// Step 7: Get the updated JSON from the modified entity
String updatedJson = JsonUtils.pojoToJson(modifiedEntity);
// Step 7: Create patch from original to updated
// Step 8: Create patch from original to updated
JsonPatch patch = JsonUtils.getJsonPatch(originalJson, updatedJson);
// Step 8: Apply patch using the non-deprecated repository method
// Step 9: Apply patch using the non-deprecated repository method
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
entityRepository.patch(null, entity.getId(), user, patch, null);
}
/**
* Parses field value from string to appropriate object type.
* Handles JSON objects, arrays, booleans, numbers, and plain strings.
*
* @param fieldValue The string value to parse
* @return Parsed object (Map, List, Boolean, Number, or String)
*/
private Object parseFieldValue(String fieldValue) {
if (fieldValue == null || fieldValue.isEmpty()) {
return null;
}
// Try to parse as JSON object or array
if ((fieldValue.startsWith("{") && fieldValue.endsWith("}"))
|| (fieldValue.startsWith("[") && fieldValue.endsWith("]"))) {
try {
return JsonUtils.readValue(fieldValue, Object.class);
} catch (Exception e) {
// Not valid JSON, treat as string
}
}
// Try to parse as boolean
if ("true".equalsIgnoreCase(fieldValue) || "false".equalsIgnoreCase(fieldValue)) {
return Boolean.parseBoolean(fieldValue);
}
// Try to parse as number
try {
if (fieldValue.contains(".")) {
return Double.parseDouble(fieldValue);
} else {
return Long.parseLong(fieldValue);
}
} catch (NumberFormatException e) {
// Not a number, return as string
}
return fieldValue;
}
/**
* Sets a field value in a nested map structure using dot notation path navigation.
*
@ -297,10 +341,21 @@ public class SetEntityAttributeImpl implements JavaDelegate {
* @throws ClassCastException if an intermediate value is not a Map when expected
*/
@SuppressWarnings("unchecked")
private void setNestedField(Map<String, Object> map, String fieldName, String fieldValue) {
private void setNestedField(Map<String, Object> map, String fieldName, Object fieldValue) {
// Handle special array patterns intelligently
if (isSmartArrayPattern(fieldName)) {
handleSmartArrayField(map, fieldName, fieldValue);
if (isSmartArrayPattern(fieldName) && fieldValue instanceof String) {
handleSmartArrayField(map, fieldName, (String) fieldValue);
return;
}
// Direct array replacement (when fieldValue is already a List)
if ((fieldName.equals("tags") || fieldName.equals("owners") || fieldName.equals("reviewers"))
&& (fieldValue instanceof List || fieldValue == null)) {
if (fieldValue == null) {
map.remove(fieldName);
} else {
map.put(fieldName, fieldValue);
}
return;
}
@ -322,9 +377,9 @@ public class SetEntityAttributeImpl implements JavaDelegate {
}
}
// Set the final value or remove if null/empty
// Set the final value or remove if null
String finalKey = parts[parts.length - 1];
if (fieldValue == null || fieldValue.isEmpty()) {
if (fieldValue == null || (fieldValue instanceof String && ((String) fieldValue).isEmpty())) {
currentMap.remove(finalKey);
} else {
currentMap.put(finalKey, fieldValue);

View File

@ -31,6 +31,7 @@ import org.openmetadata.schema.governance.workflows.elements.triggers.Config;
import org.openmetadata.schema.governance.workflows.elements.triggers.Event;
import org.openmetadata.schema.governance.workflows.elements.triggers.EventBasedEntityTriggerDefinition;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.governance.workflows.WorkflowIdEncoder;
import org.openmetadata.service.governance.workflows.elements.TriggerInterface;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.FilterEntityImpl;
import org.openmetadata.service.governance.workflows.flowable.builders.CallActivityBuilder;
@ -39,7 +40,6 @@ import org.openmetadata.service.governance.workflows.flowable.builders.FieldExte
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.SignalBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder;
import org.openmetadata.service.util.EntityUtil;
@Slf4j
public class EventBasedEntityTrigger implements TriggerInterface {
@ -135,16 +135,10 @@ public class EventBasedEntityTrigger implements TriggerInterface {
SignalEventDefinition signalEventDefinition = new SignalEventDefinition();
signalEventDefinition.setSignalRef(signal.getId());
// Not to exceed the maximum length - hash with 32 chars, then if length exceeds 60,
// truncate
// Use safe ID encoding instead of truncation
String startEventId =
"id_"
+ getFlowableElementId(
EntityUtil.hash(workflowTriggerId),
String.format("%s-%s-start", entityType, eventId));
if (startEventId.length() > 60) {
startEventId = startEventId.substring(0, 60); // final safeguard
}
WorkflowIdEncoder.generateSafeFlowableId(
workflowTriggerId, entityType, eventId, "start");
StartEvent startEvent = new StartEventBuilder().id(startEventId).build();

View File

@ -12,10 +12,11 @@ import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.schema.governance.workflows.elements.EdgeDefinition;
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.change.ChangeSource;
import org.openmetadata.service.Entity;
import org.openmetadata.service.governance.workflows.Workflow;
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.governance.workflows.WorkflowTransactionManager;
import org.openmetadata.service.resources.governance.WorkflowDefinitionResource;
import org.openmetadata.service.util.EntityUtil;
@ -39,12 +40,12 @@ public class WorkflowDefinitionRepository extends EntityRepository<WorkflowDefin
@Override
protected void postCreate(WorkflowDefinition entity) {
WorkflowHandler.getInstance().deploy(new Workflow(entity));
// Handled in storeEntity via WorkflowTransactionManager
}
@Override
protected void postUpdate(WorkflowDefinition original, WorkflowDefinition updated) {
WorkflowHandler.getInstance().deploy(new Workflow(updated));
// Handled in storeEntity via WorkflowTransactionManager
}
@Override
@ -133,8 +134,26 @@ public class WorkflowDefinitionRepository extends EntityRepository<WorkflowDefin
@Override
@Transaction
public void storeEntity(WorkflowDefinition entity, boolean update) {
// Properly use WorkflowTransactionManager for atomic operations
WorkflowTransactionManager transactionManager = new WorkflowTransactionManager();
if (update) {
// For updates, find the original to pass to transaction manager
WorkflowDefinition original = find(entity.getId(), Include.ALL);
transactionManager.updateAndRedeployWorkflowDefinition(original, entity);
} else {
// For new workflows
transactionManager.storeAndDeployWorkflowDefinition(entity, false);
}
}
/**
* Internal method to store entity without triggering workflow deployment.
* This is called from WorkflowTransactionManager to avoid circular dependency.
*/
public void storeEntityInternal(WorkflowDefinition entity, boolean update) {
// Store the entity directly without triggering workflow deployment
store(entity, update);
WorkflowHandler.getInstance().deploy(new Workflow(entity));
}
@Override

View File

@ -30,16 +30,22 @@ import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import jakarta.ws.rs.core.UriInfo;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.data.RestoreEntity;
import org.openmetadata.schema.api.governance.CreateWorkflowDefinition;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.governance.workflows.Workflow;
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository;
import org.openmetadata.service.limits.Limits;
@ -57,6 +63,7 @@ import org.openmetadata.service.util.ResultList;
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "governanceWorkflows")
@Slf4j
public class WorkflowDefinitionResource
extends EntityResource<WorkflowDefinition, WorkflowDefinitionRepository> {
public static final String COLLECTION_PATH = "v1/governance/workflowDefinitions/";
@ -541,4 +548,144 @@ public class WorkflowDefinitionResource
return Response.status(Response.Status.NOT_FOUND).entity(fqn).build();
}
}
// TEST API - REMOVE BEFORE PRODUCTION
@POST
@Path("/test/rollback/{entityType}/{entityId}")
@Operation(
operationId = "testRollbackEntity",
summary = "Test rollback entity to previous version",
description =
"Tests rolling back an entity to its previous approved version. REMOVE BEFORE PRODUCTION.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Rollback test result",
content = @Content(mediaType = "application/json"))
})
public Response testRollbackEntity(
@Context SecurityContext securityContext,
@Parameter(description = "Entity type (e.g., table, dashboard)") @PathParam("entityType")
String entityType,
@Parameter(description = "Entity UUID") @PathParam("entityId") UUID entityId,
@Parameter(description = "Target version to rollback to (optional)")
@QueryParam("targetVersion")
String targetVersion) {
try {
EntityRepository<?> entityRepo = Entity.getEntityRepository(entityType);
EntityInterface currentEntity =
entityRepo.get(null, entityId, entityRepo.getFields("*"), Include.ALL, false);
if (currentEntity == null) {
return Response.status(Response.Status.NOT_FOUND)
.entity(Map.of("error", "Entity not found", "entityId", entityId))
.build();
}
EntityHistory history = entityRepo.listVersions(entityId);
Double versionToRestore = null;
if (targetVersion != null && !targetVersion.isEmpty()) {
versionToRestore = Double.parseDouble(targetVersion);
} else {
// Find previous version automatically
Double currentVersion = currentEntity.getVersion();
for (Object versionObj : history.getVersions()) {
try {
// The versions list contains JSON strings, not Maps
String versionJson;
if (versionObj instanceof String) {
versionJson = (String) versionObj;
} else {
// Fallback: convert to JSON if it's not already a string
versionJson = org.openmetadata.schema.utils.JsonUtils.pojoToJson(versionObj);
}
// Parse the JSON to get the entity
EntityInterface versionEntity =
org.openmetadata.schema.utils.JsonUtils.readValue(
versionJson, currentEntity.getClass());
Double versionNumber = versionEntity.getVersion();
if (versionNumber != null && versionNumber < currentVersion) {
versionToRestore = versionNumber;
break; // Get the most recent previous version
}
} catch (Exception e) {
// Skip this version if we can't parse it
LOG.warn("Could not parse version object: {}", e.getMessage());
continue;
}
}
}
if (versionToRestore == null) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(
Map.of(
"error", "No previous version found",
"currentVersion", currentEntity.getVersion(),
"availableVersions", history.getVersions()))
.build();
}
String userName = securityContext.getUserPrincipal().getName();
// Put
// currentEntity = entityRepo.get(null, entityId, entityRepo.getFields("*"), Include.ALL,
// false);
// EntityInterface previousEntity = entityRepo.getVersion(entityId,
// versionToRestore.toString());
// @SuppressWarnings("unchecked")
// EntityRepository<EntityInterface> typedRepo = (EntityRepository<EntityInterface>)
// entityRepo;
// previousEntity.setUpdatedBy(userName);
// previousEntity.setUpdatedAt(System.currentTimeMillis());
// org.openmetadata.service.util.RestUtil.PutResponse<EntityInterface> putResponse =
// typedRepo.update(null, currentEntity, previousEntity, userName);
// Get current entity using getVersion (not get with fields)
currentEntity = entityRepo.getVersion(entityId, currentEntity.getVersion().toString());
// Get previous entity using getVersion
EntityInterface previousEntity = entityRepo.getVersion(entityId, versionToRestore.toString());
// Now both loaded the same way - create PATCH
String currentJson = JsonUtils.pojoToJson(currentEntity);
String previousJson = JsonUtils.pojoToJson(previousEntity);
JsonPatch patch = JsonUtils.getJsonPatch(currentJson, previousJson);
// Apply PATCH
entityRepo.patch(null, currentEntity.getFullyQualifiedName(), userName, patch);
Map<String, Object> result = new HashMap<>();
result.put("status", "success");
result.put("entityId", entityId);
result.put("entityType", entityType);
result.put("rolledBackFrom", currentEntity.getVersion());
result.put("rolledBackTo", versionToRestore);
// result.put("newVersion", patch.getEntity().getVersion());
result.put("entityName", currentEntity.getName());
result.put(
"message",
String.format(
"Successfully rolled back %s from version %.1f to %.1f",
currentEntity.getName(), currentEntity.getVersion(), versionToRestore));
return Response.ok(result).build();
} catch (Exception e) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(
Map.of(
"error",
"Rollback failed",
"message",
e.getMessage(),
"entityId",
entityId,
"entityType",
entityType))
.build();
}
}
}

View File

@ -3,6 +3,7 @@ package org.openmetadata.service.rules;
import io.github.jamsesso.jsonlogic.ast.JsonLogicArray;
import io.github.jamsesso.jsonlogic.evaluator.JsonLogicEvaluationException;
import io.github.jamsesso.jsonlogic.evaluator.JsonLogicEvaluator;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.NotNull;
@ -59,4 +60,98 @@ public class JsonLogicUtils {
return false;
}
public static @NotNull Object evaluateIsUpdatedBefore(
JsonLogicEvaluator evaluator, JsonLogicArray arguments, Object data)
throws JsonLogicEvaluationException {
if (arguments.size() != 1) return false;
Object timestampObj = evaluator.evaluate(arguments.getFirst(), data);
if (timestampObj == null) return false;
// Get updatedAt from entity data
if (!(data instanceof Map<?, ?> entityMap)) return false;
Object updatedAtObj = entityMap.get("updatedAt");
if (updatedAtObj == null) return false;
long updatedAt;
if (updatedAtObj instanceof Long) {
updatedAt = (Long) updatedAtObj;
} else if (updatedAtObj instanceof Number) {
updatedAt = ((Number) updatedAtObj).longValue();
} else {
return false;
}
long timestamp = ((Number) timestampObj).longValue();
return updatedAt < timestamp;
}
public static @NotNull Object evaluateIsUpdatedAfter(
JsonLogicEvaluator evaluator, JsonLogicArray arguments, Object data)
throws JsonLogicEvaluationException {
if (arguments.size() != 1) return false;
Object timestampObj = evaluator.evaluate(arguments.getFirst(), data);
if (timestampObj == null) return false;
// Get updatedAt from entity data
if (!(data instanceof Map<?, ?> entityMap)) return false;
Object updatedAtObj = entityMap.get("updatedAt");
if (updatedAtObj == null) return false;
long updatedAt;
if (updatedAtObj instanceof Long) {
updatedAt = (Long) updatedAtObj;
} else if (updatedAtObj instanceof Number) {
updatedAt = ((Number) updatedAtObj).longValue();
} else {
return false;
}
long timestamp = ((Number) timestampObj).longValue();
return updatedAt > timestamp;
}
public static @NotNull Object evaluateFieldCompleteness(
JsonLogicEvaluator evaluator, JsonLogicArray arguments, Object data)
throws JsonLogicEvaluationException {
if (arguments.isEmpty()) return 0.0;
// Get the list of field names to check
List<String> fields = new ArrayList<>();
for (int i = 0; i < arguments.size(); i++) {
Object arg = evaluator.evaluate(arguments.get(i), data);
if (arg instanceof String) {
fields.add((String) arg);
}
}
if (fields.isEmpty()) return 0.0;
// Check if data is a Map (entity)
if (!(data instanceof Map<?, ?> entityMap)) return 0.0;
// Count non-empty fields
long filledCount = 0;
for (String field : fields) {
Object value = entityMap.get(field);
if (value != null) {
// Check if the value is non-empty based on its type
if (value instanceof String && !((String) value).trim().isEmpty()) {
filledCount++;
} else if (value instanceof List && !((List<?>) value).isEmpty()) {
filledCount++;
} else if (value instanceof Map && !((Map<?, ?>) value).isEmpty()) {
filledCount++;
} else if (!(value instanceof String || value instanceof List || value instanceof Map)) {
// For other types (numbers, booleans), non-null means filled
filledCount++;
}
}
}
// Return percentage as a number (0-100)
return (filledCount * 100.0) / fields.size();
}
}

View File

@ -22,7 +22,10 @@ public class LogicOps {
public enum CustomLogicOps {
LENGTH("length"),
IS_REVIEWER("isReviewer"),
IS_OWNER("isOwner");
IS_OWNER("isOwner"),
IS_UPDATED_BEFORE("isUpdatedBefore"),
IS_UPDATED_AFTER("isUpdatedAfter"),
FIELD_COMPLETENESS("fieldCompleteness");
public final String key;
@ -79,6 +82,54 @@ public class LogicOps {
return evaluateUserInRole(evaluator, arguments, data, "owners");
}
});
// {"isUpdatedBefore": 1609459200000} - Check if entity was updated before timestamp
jsonLogic.addOperation(
new JsonLogicExpression() {
@Override
public String key() {
return CustomLogicOps.IS_UPDATED_BEFORE.key;
}
@Override
public Object evaluate(
JsonLogicEvaluator evaluator, JsonLogicArray arguments, Object data)
throws JsonLogicEvaluationException {
return JsonLogicUtils.evaluateIsUpdatedBefore(evaluator, arguments, data);
}
});
// {"isUpdatedAfter": 1609459200000} - Check if entity was updated after timestamp
jsonLogic.addOperation(
new JsonLogicExpression() {
@Override
public String key() {
return CustomLogicOps.IS_UPDATED_AFTER.key;
}
@Override
public Object evaluate(
JsonLogicEvaluator evaluator, JsonLogicArray arguments, Object data)
throws JsonLogicEvaluationException {
return JsonLogicUtils.evaluateIsUpdatedAfter(evaluator, arguments, data);
}
});
// {"fieldCompleteness": ["field1", "field2", "field3"]} - Returns % of non-empty fields
jsonLogic.addOperation(
new JsonLogicExpression() {
@Override
public String key() {
return CustomLogicOps.FIELD_COMPLETENESS.key;
}
@Override
public Object evaluate(
JsonLogicEvaluator evaluator, JsonLogicArray arguments, Object data)
throws JsonLogicEvaluationException {
return JsonLogicUtils.evaluateFieldCompleteness(evaluator, arguments, data);
}
});
}
/**

View File

@ -16,6 +16,7 @@
"startEvent",
"createAndRunIngestionPipelineTask",
"runAppTask",
"rollbackEntityTask",
"parallelGateway"
]
}

View File

@ -0,0 +1,67 @@
{
"$id": "https://open-metadata.org/schema/governance/workflows/elements/nodes/automatedTask/rollbackEntityTask.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "RollbackEntityTaskDefinition",
"description": "Rolls back an entity to its previous approved version.",
"javaInterfaces": [
"org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface"
],
"javaType": "org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RollbackEntityTaskDefinition",
"type": "object",
"properties": {
"type": {
"type": "string",
"default": "automatedTask"
},
"subType": {
"type": "string",
"default": "rollbackEntityTask"
},
"name": {
"title": "Name",
"description": "Name that identifies this Node.",
"$ref": "../../../../../type/basic.json#/definitions/entityName"
},
"displayName": {
"title": "Display Name",
"description": "Display Name that identifies this Node.",
"type": "string"
},
"description": {
"title": "Description",
"description": "Description of the Node.",
"$ref": "../../../../../type/basic.json#/definitions/markdown"
},
"config": {
"title": "Node Configuration",
"type": "object",
"properties": {
"rollbackToStatus": {
"title": "Rollback to Status",
"description": "The status to look for when finding the previous approved version",
"type": "string",
"default": "Approved"
}
},
"additionalProperties": false
},
"input": {
"type": "array",
"items": { "type": "string" },
"default": ["relatedEntity"],
"additionalItems": false,
"minItems": 1,
"maxItems": 1
},
"output": {
"type": "array",
"items": { "type": "string" },
"default": ["rollbackVersion"],
"additionalItems": false,
"minItems": 1,
"maxItems": 1
}
},
"required": ["name"],
"additionalProperties": false
}