Adding DataCompleteness Task Node, Flowable Debug logs

This commit is contained in:
Ram Narayan Balaji 2025-08-20 21:14:05 +05:30
parent e1473cee79
commit cdd144b504
15 changed files with 2653 additions and 93 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,98 @@
# OpenMetadata Governance Workflows - Architecture & Implementation Guide
## Table of Contents
1. [Architecture Overview](#architecture-overview)
2. [Core Components](#core-components)
3. [Implementation Details](#implementation-details)
4. [Workflow Triggers](#workflow-triggers)
5. [Workflow Nodes Catalog](#workflow-nodes-catalog)
6. [Custom Workflow Examples](#custom-workflow-examples)
7. [Best Practices](#best-practices)
1. **Non-blocking**: Workflows must NOT block entity operations (glossary term creation, etc.)
2. **BPMN-driven flow**: Flowable's BPMN engine controls the flow based on conditions - we don't override this
3. **Complete audit trail**: Every execution attempt must be recorded, even failures
4. **No ID generation**: Use Flowable's execution IDs, not random UUIDs
## Solution Implementation
### 1. WorkflowInstanceListener
- **On Exception in execute()**: Still record the workflow state with FAILURE status
- **Use Flowable's execution ID**: Convert `execution.getId()` to UUID for tracking
- **Always persist state**: Even on failure, write to `workflow_instance_time_series`
### 2. WorkflowInstanceStageListener
- **On Exception**: Create a failed stage record so there's an audit trail
- **Use deterministic IDs**: `UUID.nameUUIDFromBytes(execution.getId().getBytes())`
- **Don't block flow**: Log and record but let Flowable continue per BPMN
### 3. BaseDelegate (Task Implementations)
- **Throw BpmnError**: This is CORRECT - allows boundary events to handle failures
- **Set exception variable**: For downstream stages to check
- **Let BPMN decide**: The workflow definition controls whether to continue or fail
### 4. WorkflowFailureListener
- **Keep as-is**: `isFailOnException() = false` is correct - don't block entity operations
- **Purpose**: Global monitoring, not flow control
## How It Works
1. **Task fails** → BaseDelegate throws BpmnError
2. **BPMN handles** → Boundary events catch error, workflow continues/fails per definition
3. **Listeners record** → Even on exception, state is persisted to database
4. **No silent failures** → Database always has the true state
## Key Changes Made
```java
// WorkflowInstanceListener - Always record state
catch (Exception exc) {
LOG.error(...);
// Still write to DB even on failure
if ("end".equals(execution.getEventName())) {
workflowInstanceRepository.updateWorkflowInstance(
workflowInstanceId,
System.currentTimeMillis(),
Map.of("status", "FAILURE", "error", exc.getMessage())
);
}
}
// WorkflowInstanceStageListener - Create failure records
catch (Exception exc) {
LOG.error(...);
// Create a failed stage record for audit
if ("end".equals(execution.getEventName())) {
UUID stageId = workflowInstanceStateRepository.addNewStageToInstance(
stage + "_failed", ...
);
workflowInstanceStateRepository.updateStage(
stageId,
System.currentTimeMillis(),
Map.of("status", "FAILED", "error", exc.getMessage())
);
}
}
```
## What We DON'T Do
1. **Don't generate random UUIDs** - Use Flowable's execution IDs
2. **Don't skip stages** - Let BPMN control flow
3. **Don't throw from listeners** - Would block entity operations
4. **Don't override BPMN decisions** - Respect workflow definitions
## Testing
1. Remove `relatedEntity` from glossary workflow trigger
2. Check database - should show FAILURE status, not FINISHED
3. Check stages - should have failed stage records
4. Entity (glossary term) should still be created
## Result
- Workflows fail gracefully per BPMN definition
- Database always reflects true state
- No silent failures
- Entity operations never blocked
- Complete audit trail maintained

View File

@ -7,6 +7,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.Process;
@ -22,6 +23,7 @@ import org.openmetadata.service.governance.workflows.elements.TriggerFactory;
import org.openmetadata.service.governance.workflows.elements.TriggerInterface;
import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent;
@Slf4j
@Getter
public class Workflow {
public static final String INGESTION_PIPELINE_ID_VARIABLE = "ingestionPipelineId";
@ -43,14 +45,27 @@ public class Workflow {
public static final String GLOBAL_NAMESPACE = "global";
public Workflow(WorkflowDefinition workflowDefinition) {
LOG.info(
"[WorkflowBuild] START: Creating workflow '{}' with {} nodes, {} edges",
workflowDefinition.getFullyQualifiedName(),
workflowDefinition.getNodes() != null ? workflowDefinition.getNodes().size() : 0,
workflowDefinition.getEdges() != null ? workflowDefinition.getEdges().size() : 0);
// Build Trigger
LOG.debug(
"[WorkflowBuild] Creating trigger: type='{}'",
workflowDefinition.getTrigger() != null
? workflowDefinition.getTrigger().getType()
: "none");
this.triggerModel = new BpmnModel();
triggerModel.setTargetNamespace("");
TriggerInterface trigger = TriggerFactory.createTrigger(workflowDefinition);
trigger.addToWorkflow(triggerModel);
this.triggerWorkflowName = trigger.getTriggerWorkflowId();
LOG.debug("[WorkflowBuild] Trigger created: id='{}'", triggerWorkflowName);
// Build Main Workflow
LOG.debug("[WorkflowBuild] Validating workflow graph");
new WorkflowGraph(workflowDefinition).validate();
this.mainModel = new BpmnModel();
@ -67,30 +82,55 @@ public class Workflow {
// Add Nodes
for (WorkflowNodeDefinitionInterface nodeDefinitionObj : workflowDefinition.getNodes()) {
LOG.debug(
"[WorkflowBuild] Adding node: name='{}' type='{}' outputs={}",
nodeDefinitionObj.getName(),
nodeDefinitionObj.getType(),
nodeDefinitionObj.getOutput());
NodeInterface node =
NodeFactory.createNode(nodeDefinitionObj, workflowDefinition.getConfig());
node.addToWorkflow(mainModel, process);
Optional.ofNullable(node.getRuntimeExceptionBoundaryEvent())
.ifPresent(runtimeExceptionBoundaryEvents::add);
.ifPresent(
event -> {
LOG.debug(
"[WorkflowBuild] Added boundary event for node '{}'",
nodeDefinitionObj.getName());
runtimeExceptionBoundaryEvents.add(event);
});
}
// Add Edges
for (EdgeDefinition edgeDefinition : workflowDefinition.getEdges()) {
LOG.debug(
"[WorkflowBuild] Processing edge: from='{}' to='{}' condition='{}'",
edgeDefinition.getFrom(),
edgeDefinition.getTo(),
edgeDefinition.getCondition());
Edge edge = new Edge(edgeDefinition);
edge.addToWorkflow(mainModel, process);
}
// Configure Exception Flow
configureRuntimeExceptionFlow(process, runtimeExceptionBoundaryEvents);
LOG.info(
"[WorkflowBuild] SUCCESS: Workflow '{}' built with trigger '{}'",
mainWorkflowName,
triggerWorkflowName);
}
private void configureRuntimeExceptionFlow(
Process process, List<BoundaryEvent> runtimeExceptionBoundaryEvents) {
EndEvent errorEndEvent = new EndEvent("Error");
process.addFlowElement(errorEndEvent.getEndEvent());
LOG.debug(
"[WorkflowBuild] Configuring error flow for {} boundary events",
runtimeExceptionBoundaryEvents.size());
for (BoundaryEvent event : runtimeExceptionBoundaryEvents) {
process.addFlowElement(new SequenceFlow(event.getId(), errorEndEvent.getEndEvent().getId()));
LOG.debug("[WorkflowBuild] Added error flow: boundaryEvent='{}' -> errorEnd", event.getId());
}
}

View File

@ -11,7 +11,7 @@ public class WorkflowFailureListener implements FlowableEventListener {
@Override
public void onEvent(FlowableEvent event) {
if (FlowableEngineEventType.JOB_EXECUTION_FAILURE.equals(event.getType())) {
LOG.error("Workflow Failed: " + event);
LOG.error("[WorkflowFailure] JOB_EXECUTION_FAILURE: {}", event);
}
}

View File

@ -222,8 +222,29 @@ public class WorkflowHandler {
public ProcessInstance triggerByKey(
String processDefinitionKey, String businessKey, Map<String, Object> variables) {
RuntimeService runtimeService = processEngine.getRuntimeService();
LOG.debug("[GovernanceWorkflows] '{}' triggered with '{}'", processDefinitionKey, variables);
return runtimeService.startProcessInstanceByKey(processDefinitionKey, businessKey, variables);
LOG.info(
"[WorkflowTrigger] START: processKey='{}' businessKey='{}' variables={}",
processDefinitionKey,
businessKey,
variables);
try {
ProcessInstance instance =
runtimeService.startProcessInstanceByKey(processDefinitionKey, businessKey, variables);
LOG.info(
"[WorkflowTrigger] SUCCESS: processKey='{}' instanceId='{}' businessKey='{}'",
processDefinitionKey,
instance.getId(),
businessKey);
return instance;
} catch (Exception e) {
LOG.error(
"[WorkflowTrigger] FAILED: processKey='{}' businessKey='{}' error='{}'",
processDefinitionKey,
businessKey,
e.getMessage(),
e);
throw e;
}
}
public void triggerWithSignal(String signal, Map<String, Object> variables) {
@ -300,19 +321,36 @@ public class WorkflowHandler {
public Map<String, Object> transformToNodeVariables(
UUID customTaskId, Map<String, Object> variables) {
LOG.debug(
"[WorkflowVariable] transformToNodeVariables: customTaskId='{}' inputVars={}",
customTaskId,
variables);
Map<String, Object> namespacedVariables = null;
Optional<Task> oTask = Optional.ofNullable(getTaskFromCustomTaskId(customTaskId));
if (oTask.isPresent()) {
Task task = oTask.get();
String namespace = getParentActivityId(task.getExecutionId());
LOG.debug(
"[WorkflowVariable] Found task namespace: taskId='{}' executionId='{}' namespace='{}'",
task.getId(),
task.getExecutionId(),
namespace);
namespacedVariables = new HashMap<>();
for (Map.Entry<String, Object> entry : variables.entrySet()) {
namespacedVariables.put(
getNamespacedVariableName(namespace, entry.getKey()), entry.getValue());
String namespacedVar = getNamespacedVariableName(namespace, entry.getKey());
namespacedVariables.put(namespacedVar, entry.getValue());
LOG.debug(
"[WorkflowVariable] Transformed: '{}' -> '{}' = '{}'",
entry.getKey(),
namespacedVar,
entry.getValue());
}
LOG.debug(
"[WorkflowVariable] transformToNodeVariables complete: outputVars={}",
namespacedVariables);
} else {
LOG.debug(String.format("Flowable Task for Task ID %s not found.", customTaskId));
LOG.warn("[WorkflowVariable] Task not found for customTaskId='{}'", customTaskId);
}
return namespacedVariables;
}
@ -323,19 +361,43 @@ public class WorkflowHandler {
public void resolveTask(UUID customTaskId, Map<String, Object> variables) {
TaskService taskService = processEngine.getTaskService();
LOG.info("[WorkflowTask] RESOLVE: customTaskId='{}' variables={}", customTaskId, variables);
try {
Optional<Task> oTask = Optional.ofNullable(getTaskFromCustomTaskId(customTaskId));
if (oTask.isPresent()) {
Task task = oTask.get();
LOG.info(
"[WorkflowTask] Found task: flowableTaskId='{}' processInstanceId='{}' name='{}'",
task.getId(),
task.getProcessInstanceId(),
task.getName());
Optional.ofNullable(variables)
.ifPresentOrElse(
variablesValue -> taskService.complete(task.getId(), variablesValue),
() -> taskService.complete(task.getId()));
variablesValue -> {
LOG.info(
"[WorkflowTask] Completing with variables: taskId='{}' vars={}",
task.getId(),
variablesValue);
taskService.complete(task.getId(), variablesValue);
},
() -> {
LOG.info(
"[WorkflowTask] Completing without variables: taskId='{}'", task.getId());
taskService.complete(task.getId());
});
LOG.info("[WorkflowTask] SUCCESS: Task '{}' resolved", customTaskId);
} else {
LOG.debug(String.format("Flowable Task for Task ID %s not found.", customTaskId));
LOG.warn("[WorkflowTask] NOT_FOUND: No Flowable task for customTaskId='{}'", customTaskId);
}
} catch (FlowableObjectNotFoundException ex) {
LOG.debug(String.format("Flowable Task for Task ID %s not found.", customTaskId));
LOG.error(
"[WorkflowTask] ERROR: Flowable task not found for customTaskId='{}': {}",
customTaskId,
ex.getMessage());
} catch (Exception e) {
LOG.error(
"[WorkflowTask] ERROR: Failed to resolve task '{}': {}", customTaskId, e.getMessage(), e);
throw e;
}
}

View File

@ -28,8 +28,20 @@ public class WorkflowVariableHandler {
public Object getNamespacedVariable(String namespace, String varName) {
String namespacedVarName = getNamespacedVariableName(namespace, varName);
if (namespacedVarName != null) {
return varScope.getVariable(namespacedVarName);
Object value = varScope.getVariable(namespacedVarName);
LOG.debug(
"[WorkflowVariable] GET: namespace='{}' varName='{}' namespacedVar='{}' value='{}' type='{}'",
namespace,
varName,
namespacedVarName,
value,
value != null ? value.getClass().getSimpleName() : "null");
return value;
} else {
LOG.debug(
"[WorkflowVariable] GET: namespace='{}' varName='{}' returned null (no namespace)",
namespace,
varName);
return null;
}
}
@ -43,8 +55,15 @@ public class WorkflowVariableHandler {
String namespacedVarName = getNamespacedVariableName(namespace, varName);
if (namespacedVarName != null) {
varScope.setVariable(namespacedVarName, varValue);
LOG.debug(String.format("%s variable set to %s", namespacedVarName, varValue));
LOG.debug(
"[WorkflowVariable] SET: namespace='{}' varName='{}' namespacedVar='{}' value='{}' type='{}'",
namespace,
varName,
namespacedVarName,
varValue,
varValue != null ? varValue.getClass().getSimpleName() : "null");
} else {
LOG.error("[WorkflowVariable] ERROR: Namespace is null when setting variable '{}'", varName);
throw new RuntimeException("Namespace can't be null when setting a namespaced variable.");
}
}
@ -54,20 +73,43 @@ public class WorkflowVariableHandler {
}
private String getNodeNamespace() {
String namespace;
if (varScope instanceof DelegateExecution) {
return Optional.ofNullable(((DelegateExecution) varScope).getParent().getCurrentActivityId())
.orElseGet(() -> ((DelegateExecution) varScope).getCurrentActivityId().split("\\.")[0]);
DelegateExecution execution = (DelegateExecution) varScope;
namespace =
Optional.ofNullable(
execution.getParent() != null
? execution.getParent().getCurrentActivityId()
: null)
.orElseGet(() -> execution.getCurrentActivityId().split("\\.")[0]);
LOG.debug(
"[WorkflowVariable] getNodeNamespace: DelegateExecution activityId='{}' namespace='{}'",
execution.getCurrentActivityId(),
namespace);
} else if (varScope instanceof DelegateTask) {
return WorkflowHandler.getInstance()
.getParentActivityId(((DelegateTask) varScope).getExecutionId());
DelegateTask task = (DelegateTask) varScope;
namespace = WorkflowHandler.getInstance().getParentActivityId(task.getExecutionId());
LOG.debug(
"[WorkflowVariable] getNodeNamespace: DelegateTask executionId='{}' namespace='{}'",
task.getExecutionId(),
namespace);
} else {
LOG.error(
"[WorkflowVariable] ERROR: Invalid varScope type: {}",
varScope != null ? varScope.getClass().getName() : "null");
throw new RuntimeException(
"varScope must be either an instance of 'DelegateExecution' or 'DelegateTask'.");
}
return namespace;
}
public void setNodeVariable(String varName, Object varValue) {
String namespace = getNodeNamespace();
LOG.debug(
"[WorkflowVariable] setNodeVariable: varName='{}' value='{}' using namespace='{}'",
varName,
varValue,
namespace);
setNamespacedVariable(namespace, varName, varValue);
}

View File

@ -3,26 +3,45 @@ package org.openmetadata.service.governance.workflows.elements;
import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.openmetadata.common.utils.CommonUtil;
@Slf4j
public class Edge {
private final SequenceFlow edge;
public Edge(org.openmetadata.schema.governance.workflows.elements.EdgeDefinition edgeDefinition) {
SequenceFlow edge = new SequenceFlow(edgeDefinition.getFrom(), edgeDefinition.getTo());
if (!CommonUtil.nullOrEmpty(edgeDefinition.getCondition())) {
edge.setConditionExpression(
getFlowableCondition(edgeDefinition.getFrom(), edgeDefinition.getCondition()));
String conditionExpression =
getFlowableCondition(edgeDefinition.getFrom(), edgeDefinition.getCondition());
edge.setConditionExpression(conditionExpression);
LOG.debug(
"[WorkflowEdge] Created conditional edge from='{}' to='{}' with condition='{}' expression='{}'",
edgeDefinition.getFrom(),
edgeDefinition.getTo(),
edgeDefinition.getCondition(),
conditionExpression);
} else {
LOG.debug(
"[WorkflowEdge] Created unconditional edge from='{}' to='{}'",
edgeDefinition.getFrom(),
edgeDefinition.getTo());
}
this.edge = edge;
}
private String getFlowableCondition(String from, String condition) {
return String.format(
"${%s == '%s'}", getNamespacedVariableName(from, RESULT_VARIABLE), condition);
String variableName = getNamespacedVariableName(from, RESULT_VARIABLE);
String expression = String.format("${%s == '%s'}", variableName, condition);
LOG.debug(
"[WorkflowEdge] Condition expression: checking if variable '{}' equals '{}'",
variableName,
condition);
return expression;
}
public void addToWorkflow(BpmnModel model, Process process) {

View File

@ -6,6 +6,7 @@ import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinit
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.ConditionalSetEntityAttributeTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CreateAndRunIngestionPipelineTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RollbackEntityTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityAttributeTaskDefinition;
@ -17,6 +18,7 @@ import org.openmetadata.schema.governance.workflows.elements.nodes.startEvent.St
import org.openmetadata.schema.governance.workflows.elements.nodes.userTask.UserApprovalTaskDefinition;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.ConditionalSetEntityAttributeTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.DataCompletenessTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.RollbackEntityTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityAttributeTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTask;
@ -51,6 +53,8 @@ public class NodeFactory {
case RUN_APP_TASK -> new RunAppTask((RunAppTaskDefinition) nodeDefinition, config);
case ROLLBACK_ENTITY_TASK -> new RollbackEntityTask(
(RollbackEntityTaskDefinition) nodeDefinition, config);
case DATA_COMPLETENESS_TASK -> new DataCompletenessTask(
(DataCompletenessTaskDefinition) nodeDefinition, config);
case PARALLEL_GATEWAY -> new ParallelGateway(
(ParallelGatewayDefinition) nodeDefinition, config);
};

View File

@ -0,0 +1,129 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask;
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.FieldExtension;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.bpmn.model.ServiceTask;
import org.flowable.bpmn.model.StartEvent;
import org.flowable.bpmn.model.SubProcess;
import org.openmetadata.schema.governance.workflows.WorkflowConfiguration;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.governance.workflows.elements.NodeInterface;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.DataCompletenessImpl;
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.SubProcessBuilder;
@Slf4j
public class DataCompletenessTask implements NodeInterface {
private final SubProcess subProcess;
private final BoundaryEvent runtimeExceptionBoundaryEvent;
public DataCompletenessTask(
DataCompletenessTaskDefinition nodeDefinition, WorkflowConfiguration workflowConfig) {
String subProcessId = nodeDefinition.getName();
SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build();
StartEvent startEvent =
new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build();
ServiceTask dataCompletenessTask = getDataCompletenessServiceTask(subProcessId, nodeDefinition);
EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build();
subProcess.addFlowElement(startEvent);
subProcess.addFlowElement(dataCompletenessTask);
subProcess.addFlowElement(endEvent);
subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), dataCompletenessTask.getId()));
subProcess.addFlowElement(new SequenceFlow(dataCompletenessTask.getId(), endEvent.getId()));
this.subProcess = subProcess;
this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(dataCompletenessTask);
}
private ServiceTask getDataCompletenessServiceTask(
String parentId, DataCompletenessTaskDefinition nodeDefinition) {
// Get configuration with defaults if null
var config = nodeDefinition.getConfig();
Boolean treatEmptyStringAsNull = config.getTreatEmptyStringAsNull();
Boolean treatEmptyArrayAsNull = config.getTreatEmptyArrayAsNull();
List<FieldExtension> fieldExtensions =
List.of(
new FieldExtensionBuilder()
.fieldName("fieldsToCheckExpr")
.fieldValue(JsonUtils.pojoToJson(config.getFieldsToCheck()))
.build(),
new FieldExtensionBuilder()
.fieldName("qualityBandsExpr")
.fieldValue(JsonUtils.pojoToJson(config.getQualityBands()))
.build(),
new FieldExtensionBuilder()
.fieldName("treatEmptyStringAsNullExpr")
.fieldValue(
String.valueOf(treatEmptyStringAsNull != null ? treatEmptyStringAsNull : true))
.build(),
new FieldExtensionBuilder()
.fieldName("treatEmptyArrayAsNullExpr")
.fieldValue(
String.valueOf(treatEmptyArrayAsNull != null ? treatEmptyArrayAsNull : true))
.build(),
new FieldExtensionBuilder()
.fieldName("inputNamespaceMapExpr")
.fieldValue(
JsonUtils.pojoToJson(
nodeDefinition.getInputNamespaceMap() != null
? nodeDefinition.getInputNamespaceMap()
: new java.util.HashMap<>()))
.build());
ServiceTaskBuilder builder =
new ServiceTaskBuilder()
.id(getFlowableElementId(parentId, "dataCompletenessTask"))
.implementation(DataCompletenessImpl.class.getName());
for (FieldExtension fieldExtension : fieldExtensions) {
builder.addFieldExtension(fieldExtension);
}
return builder.build();
}
@Override
public void addToWorkflow(BpmnModel model, Process process) {
process.addFlowElement(subProcess);
process.addFlowElement(runtimeExceptionBoundaryEvent);
}
@Override
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
return runtimeExceptionBoundaryEvent;
}
private BoundaryEvent getRuntimeExceptionBoundaryEvent(ServiceTask serviceTask) {
BoundaryEvent boundaryEvent = new BoundaryEvent();
boundaryEvent.setId(getFlowableElementId(serviceTask.getId(), "runtimeExceptionBoundaryEvent"));
boundaryEvent.setAttachedToRefId(serviceTask.getId());
org.flowable.bpmn.model.ErrorEventDefinition errorEventDef =
new org.flowable.bpmn.model.ErrorEventDefinition();
errorEventDef.setErrorCode("workflowRuntimeException");
boundaryEvent.addEventDefinition(errorEventDef);
return boundaryEvent;
}
}

View File

@ -52,20 +52,12 @@ public class CheckEntityAttributesImpl implements JavaDelegate {
private Boolean checkAttributes(MessageParser.EntityLink entityLink, String rules) {
EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL);
boolean result;
try {
Object result = RuleEngine.getInstance().apply(rules, JsonUtils.getMap(entity));
// Handle both boolean and numeric results for scoring scenarios
if (result instanceof Number) {
double score = ((Number) result).doubleValue();
// For numeric results, consider >= 50 as success (configurable threshold)
return score >= 50.0;
}
// Default boolean handling
return Boolean.TRUE.equals(result);
result = (boolean) RuleEngine.getInstance().apply(rules, JsonUtils.getMap(entity));
} catch (Exception e) {
throw new RuntimeException(e);
}
return result;
}
}

View File

@ -0,0 +1,229 @@
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl;
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.engine.delegate.BpmnError;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.governance.workflows.WorkflowVariableHandler;
import org.openmetadata.service.resources.feeds.MessageParser;
@Slf4j
public class DataCompletenessImpl implements JavaDelegate {
private Expression fieldsToCheckExpr;
private Expression qualityBandsExpr;
private Expression treatEmptyStringAsNullExpr;
private Expression treatEmptyArrayAsNullExpr;
private Expression inputNamespaceMapExpr;
@Override
public void execute(DelegateExecution execution) {
WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution);
try {
// Get configuration
Map<String, String> inputNamespaceMap =
JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class);
List<String> fieldsToCheck =
JsonUtils.readOrConvertValue(fieldsToCheckExpr.getValue(execution), List.class);
List<Map<String, Object>> qualityBandMaps =
JsonUtils.readOrConvertValue(qualityBandsExpr.getValue(execution), List.class);
List<QualityBand> qualityBands = new ArrayList<>();
for (Map<String, Object> bandMap : qualityBandMaps) {
QualityBand band = new QualityBand();
band.setName((String) bandMap.get("name"));
band.setMinimumScore(((Number) bandMap.get("minimumScore")).doubleValue());
qualityBands.add(band);
}
boolean treatEmptyStringAsNull =
Boolean.parseBoolean(treatEmptyStringAsNullExpr.getValue(execution).toString());
boolean treatEmptyArrayAsNull =
Boolean.parseBoolean(treatEmptyArrayAsNullExpr.getValue(execution).toString());
// Get the entity
MessageParser.EntityLink entityLink =
MessageParser.EntityLink.parse(
(String)
varHandler.getNamespacedVariable(
inputNamespaceMap.get(RELATED_ENTITY_VARIABLE), RELATED_ENTITY_VARIABLE));
EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL);
Map<String, Object> entityMap = JsonUtils.getMap(entity);
// Calculate completeness
DataCompletenessResult result =
calculateCompleteness(
entityMap,
fieldsToCheck,
qualityBands,
treatEmptyStringAsNull,
treatEmptyArrayAsNull);
// Set output variables
varHandler.setNodeVariable("completenessScore", result.score);
varHandler.setNodeVariable("filledFieldsCount", result.filledFieldsCount);
varHandler.setNodeVariable("totalFieldsCount", result.totalFieldsCount);
varHandler.setNodeVariable("missingFields", result.missingFields);
varHandler.setNodeVariable("filledFields", result.filledFields);
varHandler.setNodeVariable("qualityBand", result.qualityBand);
// Set result variable for edge routing (using the quality band name)
varHandler.setNodeVariable(RESULT_VARIABLE, result.qualityBand);
LOG.info(
"[WorkflowNode][DataCompleteness] EXECUTED: entity='{}' score={}% band='{}' filled={}/{}",
entityLink,
result.score,
result.qualityBand,
result.filledFieldsCount,
result.totalFieldsCount);
} catch (Exception exc) {
LOG.error(
"[{}] Data completeness check failed: ",
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()),
exc);
varHandler.setGlobalVariable(EXCEPTION_VARIABLE, ExceptionUtils.getStackTrace(exc));
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
}
}
private DataCompletenessResult calculateCompleteness(
Map<String, Object> entityMap,
List<String> fieldsToCheck,
List<QualityBand> qualityBands,
boolean treatEmptyStringAsNull,
boolean treatEmptyArrayAsNull) {
DataCompletenessResult result = new DataCompletenessResult();
result.totalFieldsCount = fieldsToCheck.size();
result.missingFields = new ArrayList<>();
result.filledFields = new ArrayList<>();
for (String fieldPath : fieldsToCheck) {
Object value = getFieldValue(entityMap, fieldPath);
if (isFieldFilled(value, treatEmptyStringAsNull, treatEmptyArrayAsNull)) {
result.filledFieldsCount++;
result.filledFields.add(fieldPath);
} else {
result.missingFields.add(fieldPath);
}
}
// Calculate percentage
result.score =
result.totalFieldsCount > 0
? (result.filledFieldsCount * 100.0) / result.totalFieldsCount
: 0.0;
// Determine quality band based on score
result.qualityBand = determineQualityBand(result.score, qualityBands);
return result;
}
private Object getFieldValue(Map<String, Object> entityMap, String fieldPath) {
// Handle nested fields with dot notation
String[] parts = fieldPath.split("\\.");
Object current = entityMap;
for (String part : parts) {
if (current == null) {
return null;
}
// Handle array notation like "columns[]"
if (part.endsWith("[]")) {
String fieldName = part.substring(0, part.length() - 2);
if (current instanceof Map) {
current = ((Map<?, ?>) current).get(fieldName);
// For arrays, check if any element exists
if (current instanceof List && !((List<?>) current).isEmpty()) {
return current; // Return the list itself if non-empty
}
}
return null;
} else if (current instanceof Map) {
current = ((Map<?, ?>) current).get(part);
} else {
return null;
}
}
return current;
}
private boolean isFieldFilled(
Object value, boolean treatEmptyStringAsNull, boolean treatEmptyArrayAsNull) {
if (value == null) {
return false;
}
if (value instanceof String) {
String str = (String) value;
return treatEmptyStringAsNull ? !str.trim().isEmpty() : true;
}
if (value instanceof List) {
List<?> list = (List<?>) value;
return treatEmptyArrayAsNull ? !list.isEmpty() : true;
}
if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
return !map.isEmpty();
}
// For other types (numbers, booleans), non-null means filled
return true;
}
private String determineQualityBand(double score, List<QualityBand> qualityBands) {
// Sort bands by minimumScore in descending order to evaluate from highest to lowest
List<QualityBand> sortedBands = new ArrayList<>(qualityBands);
sortedBands.sort(Comparator.comparingDouble(QualityBand::getMinimumScore).reversed());
// Find the matching band
for (QualityBand band : sortedBands) {
if (score >= band.getMinimumScore()) {
return band.getName();
}
}
// If no band matches (shouldn't happen if bands are configured correctly)
// Return the band with the lowest threshold
return sortedBands.isEmpty() ? "undefined" : sortedBands.getLast().getName();
}
@Data
public static class QualityBand {
private String name;
private Double minimumScore;
}
private static class DataCompletenessResult {
double score = 0.0;
int filledFieldsCount = 0;
int totalFieldsCount = 0;
List<String> missingFields;
List<String> filledFields;
String qualityBand = "undefined";
}
}

View File

@ -3,7 +3,6 @@ package org.openmetadata.service.rules;
import io.github.jamsesso.jsonlogic.ast.JsonLogicArray;
import io.github.jamsesso.jsonlogic.evaluator.JsonLogicEvaluationException;
import io.github.jamsesso.jsonlogic.evaluator.JsonLogicEvaluator;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.NotNull;
@ -112,46 +111,4 @@ public class JsonLogicUtils {
long timestamp = ((Number) timestampObj).longValue();
return updatedAt > timestamp;
}
public static @NotNull Object evaluateFieldCompleteness(
JsonLogicEvaluator evaluator, JsonLogicArray arguments, Object data)
throws JsonLogicEvaluationException {
if (arguments.isEmpty()) return 0.0;
// Get the list of field names to check
List<String> fields = new ArrayList<>();
for (int i = 0; i < arguments.size(); i++) {
Object arg = evaluator.evaluate(arguments.get(i), data);
if (arg instanceof String) {
fields.add((String) arg);
}
}
if (fields.isEmpty()) return 0.0;
// Check if data is a Map (entity)
if (!(data instanceof Map<?, ?> entityMap)) return 0.0;
// Count non-empty fields
long filledCount = 0;
for (String field : fields) {
Object value = entityMap.get(field);
if (value != null) {
// Check if the value is non-empty based on its type
if (value instanceof String && !((String) value).trim().isEmpty()) {
filledCount++;
} else if (value instanceof List && !((List<?>) value).isEmpty()) {
filledCount++;
} else if (value instanceof Map && !((Map<?, ?>) value).isEmpty()) {
filledCount++;
} else if (!(value instanceof String || value instanceof List || value instanceof Map)) {
// For other types (numbers, booleans), non-null means filled
filledCount++;
}
}
}
// Return percentage as a number (0-100)
return (filledCount * 100.0) / fields.size();
}
}

View File

@ -24,8 +24,7 @@ public class LogicOps {
IS_REVIEWER("isReviewer"),
IS_OWNER("isOwner"),
IS_UPDATED_BEFORE("isUpdatedBefore"),
IS_UPDATED_AFTER("isUpdatedAfter"),
FIELD_COMPLETENESS("fieldCompleteness");
IS_UPDATED_AFTER("isUpdatedAfter");
public final String key;
@ -114,22 +113,6 @@ public class LogicOps {
return JsonLogicUtils.evaluateIsUpdatedAfter(evaluator, arguments, data);
}
});
// {"fieldCompleteness": ["field1", "field2", "field3"]} - Returns % of non-empty fields
jsonLogic.addOperation(
new JsonLogicExpression() {
@Override
public String key() {
return CustomLogicOps.FIELD_COMPLETENESS.key;
}
@Override
public Object evaluate(
JsonLogicEvaluator evaluator, JsonLogicArray arguments, Object data)
throws JsonLogicEvaluationException {
return JsonLogicUtils.evaluateFieldCompleteness(evaluator, arguments, data);
}
});
}
/**

View File

@ -17,6 +17,7 @@
"createAndRunIngestionPipelineTask",
"runAppTask",
"rollbackEntityTask",
"dataCompletenessTask",
"parallelGateway"
]
}

View File

@ -0,0 +1,131 @@
{
"$id": "https://open-metadata.org/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DataCompletenessTaskDefinition",
"description": "Evaluates entity data completeness based on field presence and outputs quality bands.",
"javaInterfaces": [
"org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface"
],
"javaType": "org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition",
"type": "object",
"definitions": {
"qualityBand": {
"type": "object",
"properties": {
"name": {
"title": "Band Name",
"description": "Name for this quality band (e.g., 'gold', 'excellent', 'tier1')",
"type": "string"
},
"minimumScore": {
"title": "Minimum Score",
"description": "Minimum completeness percentage for this band",
"type": "number",
"minimum": 0,
"maximum": 100
}
},
"required": ["name", "minimumScore"],
"additionalProperties": false
}
},
"properties": {
"type": {
"type": "string",
"default": "automatedTask"
},
"subType": {
"type": "string",
"default": "dataCompletenessTask"
},
"name": {
"title": "Node Name",
"description": "Unique name that identifies this node in the workflow",
"$ref": "../../../../../type/basic.json#/definitions/entityName"
},
"displayName": {
"title": "Display Name",
"description": "User-friendly display name for this node",
"type": "string"
},
"description": {
"title": "Description",
"description": "Description of what this completeness check does",
"$ref": "../../../../../type/basic.json#/definitions/markdown"
},
"config": {
"title": "Completeness Configuration",
"type": "object",
"properties": {
"fieldsToCheck": {
"title": "Fields to Check",
"description": "List of entity field paths to evaluate. Supports dot notation for nested fields (e.g., 'owner.name', 'columns[].description')",
"type": "array",
"items": {
"type": "string"
},
"minItems": 1,
"examples": ["name", "description", "owner", "tags", "columns[].description"]
},
"qualityBands": {
"title": "Quality Bands",
"description": "Define quality levels based on completeness scores. Bands are evaluated from highest to lowest score.",
"type": "array",
"items": {
"$ref": "#/definitions/qualityBand"
},
"default": [
{"name": "excellent", "minimumScore": 90},
{"name": "good", "minimumScore": 75},
{"name": "acceptable", "minimumScore": 50},
{"name": "poor", "minimumScore": 0}
],
"minItems": 1
},
"treatEmptyStringAsNull": {
"title": "Treat Empty String as Missing",
"description": "Consider empty strings ('') as missing values",
"type": "boolean",
"default": true
},
"treatEmptyArrayAsNull": {
"title": "Treat Empty Array as Missing",
"description": "Consider empty arrays ([]) as missing values",
"type": "boolean",
"default": true
}
},
"required": ["fieldsToCheck"],
"additionalProperties": false
},
"input": {
"type": "array",
"items": { "type": "string" },
"default": ["relatedEntity"],
"additionalItems": false,
"minItems": 1,
"maxItems": 1
},
"inputNamespaceMap": {
"type": "object",
"properties": {
"relatedEntity": {
"type": "string",
"default": "global"
}
},
"additionalProperties": false,
"required": ["relatedEntity"]
},
"output": {
"title": "Output Variables",
"description": "Variables this node outputs for use in subsequent nodes",
"type": "array",
"items": { "type": "string" },
"default": ["completenessScore", "qualityBand", "filledFieldsCount", "totalFieldsCount", "missingFields", "filledFields", "result"],
"additionalItems": false
}
},
"required": ["name", "config"],
"additionalProperties": false
}