mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-27 18:36:08 +00:00
MINOR: Increase visibility on flowable process (#18792)
* Add annotation to turn delete_recursive test into the last one to be executed * Improve Fetching Entities by only fetching the FQN * Cut back default batchsize to 500 * implement logging for flowable process * Implement Error Handling Flow * Remove testing code * Checking if User is Reviewer when resolving glossary task
This commit is contained in:
parent
2e9efe5b82
commit
8cade39bad
@ -1,30 +1,42 @@
|
||||
package org.openmetadata.service.governance.workflows;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.STAGE_INSTANCE_STATE_ID_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
|
||||
|
||||
import java.util.UUID;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.engine.delegate.DelegateExecution;
|
||||
import org.flowable.engine.delegate.JavaDelegate;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.jdbi3.WorkflowInstanceRepository;
|
||||
import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;
|
||||
|
||||
@Slf4j
|
||||
public class MainWorkflowTerminationListener implements JavaDelegate {
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
WorkflowInstanceStateRepository workflowInstanceStateRepository =
|
||||
(WorkflowInstanceStateRepository)
|
||||
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
|
||||
try {
|
||||
WorkflowInstanceStateRepository workflowInstanceStateRepository =
|
||||
(WorkflowInstanceStateRepository)
|
||||
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
|
||||
|
||||
UUID workflowInstanceStateId = (UUID) execution.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE);
|
||||
workflowInstanceStateRepository.updateStage(
|
||||
workflowInstanceStateId, System.currentTimeMillis(), execution.getVariables());
|
||||
UUID workflowInstanceStateId = (UUID) execution.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE);
|
||||
workflowInstanceStateRepository.updateStage(
|
||||
workflowInstanceStateId, System.currentTimeMillis(), execution.getVariables());
|
||||
|
||||
WorkflowInstanceRepository workflowInstanceRepository =
|
||||
(WorkflowInstanceRepository) Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE);
|
||||
WorkflowInstanceRepository workflowInstanceRepository =
|
||||
(WorkflowInstanceRepository)
|
||||
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE);
|
||||
|
||||
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());
|
||||
workflowInstanceRepository.updateWorkflowInstance(
|
||||
workflowInstanceId, System.currentTimeMillis());
|
||||
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());
|
||||
workflowInstanceRepository.updateWorkflowInstance(
|
||||
workflowInstanceId, System.currentTimeMillis(), execution.getVariables());
|
||||
} catch (Exception exc) {
|
||||
LOG.error(
|
||||
String.format(
|
||||
"[%s] Failed due to: %s ",
|
||||
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()),
|
||||
exc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,8 @@ public class Workflow {
|
||||
public static final String STAGE_INSTANCE_STATE_ID_VARIABLE = "stageInstanceStateId";
|
||||
public static final String WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE =
|
||||
"workflowInstanceExecutionId";
|
||||
public static final String WORKFLOW_RUNTIME_EXCEPTION = "workflowRuntimeException";
|
||||
public static final String EXCEPTION_VARIABLE = "exception";
|
||||
private final TriggerWorkflow triggerWorkflow;
|
||||
private final MainWorkflow mainWorkflow;
|
||||
|
||||
|
@ -0,0 +1,33 @@
|
||||
package org.openmetadata.service.governance.workflows;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
|
||||
|
||||
@Slf4j
|
||||
public class WorkflowFailureListener implements FlowableEventListener {
|
||||
|
||||
@Override
|
||||
public void onEvent(FlowableEvent event) {
|
||||
if (FlowableEngineEventType.JOB_EXECUTION_FAILURE.equals(event.getType())) {
|
||||
LOG.error("Workflow Failed: " + event);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFailOnException() {
|
||||
// Return true if the listener should fail the operation on an exception
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFireOnTransactionLifecycleEvent() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getOnTransaction() {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -50,6 +50,9 @@ public class WorkflowHandler {
|
||||
.setJdbcDriver(config.getDataSourceFactory().getDriverClass())
|
||||
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE);
|
||||
|
||||
// Add Global Failure Listener
|
||||
processEngineConfiguration.setEventListeners(List.of(new WorkflowFailureListener()));
|
||||
|
||||
if (ConnectionType.MYSQL.label.equals(config.getDataSourceFactory().getDriverClass())) {
|
||||
processEngineConfiguration.setDatabaseType(ProcessEngineConfiguration.DATABASE_TYPE_MYSQL);
|
||||
} else {
|
||||
@ -256,4 +259,16 @@ public class WorkflowHandler {
|
||||
repositoryService.activateProcessDefinitionByKey(
|
||||
getTriggerWorkflowId(workflowName), true, null);
|
||||
}
|
||||
|
||||
public void terminateWorkflow(String workflowName) {
|
||||
runtimeService
|
||||
.createProcessInstanceQuery()
|
||||
.processDefinitionKey(getTriggerWorkflowId(workflowName))
|
||||
.list()
|
||||
.forEach(
|
||||
instance -> {
|
||||
runtimeService.deleteProcessInstance(
|
||||
instance.getId(), "Terminating all instances due to user request.");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -1,15 +1,34 @@
|
||||
package org.openmetadata.service.governance.workflows;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
|
||||
|
||||
import java.util.UUID;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.engine.delegate.DelegateExecution;
|
||||
import org.flowable.engine.delegate.JavaDelegate;
|
||||
|
||||
@Slf4j
|
||||
public class WorkflowInstanceExecutionIdSetterListener implements JavaDelegate {
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
UUID workflowInstanceExecutionId = UUID.randomUUID();
|
||||
execution.setVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE, workflowInstanceExecutionId);
|
||||
try {
|
||||
String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
|
||||
String relatedEntity = (String) execution.getVariable(RELATED_ENTITY_VARIABLE);
|
||||
LOG.debug(
|
||||
String.format(
|
||||
"New Execution for Workflow '%s'. Related Entity: '%s'",
|
||||
workflowName, relatedEntity));
|
||||
|
||||
UUID workflowInstanceExecutionId = UUID.randomUUID();
|
||||
execution.setVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE, workflowInstanceExecutionId);
|
||||
} catch (Exception exc) {
|
||||
LOG.error(
|
||||
String.format(
|
||||
"[%s] Failed due to: %s ",
|
||||
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()),
|
||||
exc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,16 +13,25 @@ import org.openmetadata.service.jdbi3.WorkflowInstanceRepository;
|
||||
public class WorkflowInstanceListener implements JavaDelegate {
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
WorkflowInstanceRepository workflowInstanceRepository =
|
||||
(WorkflowInstanceRepository) Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE);
|
||||
try {
|
||||
WorkflowInstanceRepository workflowInstanceRepository =
|
||||
(WorkflowInstanceRepository)
|
||||
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE);
|
||||
|
||||
switch (execution.getEventName()) {
|
||||
case "start" -> addWorkflowInstance(execution, workflowInstanceRepository);
|
||||
case "end" -> updateWorkflowInstance(execution, workflowInstanceRepository);
|
||||
default -> LOG.debug(
|
||||
switch (execution.getEventName()) {
|
||||
case "start" -> addWorkflowInstance(execution, workflowInstanceRepository);
|
||||
case "end" -> updateWorkflowInstance(execution, workflowInstanceRepository);
|
||||
default -> LOG.debug(
|
||||
String.format(
|
||||
"WorkflowStageUpdaterListener does not support listening for the event: '%s'",
|
||||
execution.getEventName()));
|
||||
}
|
||||
} catch (Exception exc) {
|
||||
LOG.error(
|
||||
String.format(
|
||||
"WorkflowStageUpdaterListener does not support listening for the event: '%s'",
|
||||
execution.getEventName()));
|
||||
"[%s] Failed due to: %s ",
|
||||
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()),
|
||||
exc);
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,13 +54,22 @@ public class WorkflowInstanceListener implements JavaDelegate {
|
||||
workflowInstanceId,
|
||||
System.currentTimeMillis(),
|
||||
execution.getVariables());
|
||||
LOG.debug(
|
||||
String.format(
|
||||
"Workflow '%s' Triggered. Instance: '%s'", workflowDefinitionName, workflowInstanceId));
|
||||
}
|
||||
|
||||
private void updateWorkflowInstance(
|
||||
DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) {
|
||||
String workflowDefinitionName =
|
||||
getMainWorkflowDefinitionNameFromTrigger(
|
||||
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()));
|
||||
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());
|
||||
workflowInstanceRepository.updateWorkflowInstance(
|
||||
workflowInstanceId, System.currentTimeMillis());
|
||||
workflowInstanceId, System.currentTimeMillis(), execution.getVariables());
|
||||
LOG.debug(
|
||||
String.format(
|
||||
"Workflow '%s' Finished. Instance: '%s'", workflowDefinitionName, workflowInstanceId));
|
||||
}
|
||||
|
||||
private String getMainWorkflowDefinitionNameFromTrigger(String triggerWorkflowDefinitionName) {
|
||||
|
@ -16,17 +16,25 @@ import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;
|
||||
public class WorkflowInstanceStageListener implements JavaDelegate {
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
WorkflowInstanceStateRepository workflowInstanceStateRepository =
|
||||
(WorkflowInstanceStateRepository)
|
||||
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
|
||||
try {
|
||||
WorkflowInstanceStateRepository workflowInstanceStateRepository =
|
||||
(WorkflowInstanceStateRepository)
|
||||
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
|
||||
|
||||
switch (execution.getEventName()) {
|
||||
case "start" -> addNewStage(execution, workflowInstanceStateRepository);
|
||||
case "end" -> updateStage(execution, workflowInstanceStateRepository);
|
||||
default -> LOG.debug(
|
||||
switch (execution.getEventName()) {
|
||||
case "start" -> addNewStage(execution, workflowInstanceStateRepository);
|
||||
case "end" -> updateStage(execution, workflowInstanceStateRepository);
|
||||
default -> LOG.debug(
|
||||
String.format(
|
||||
"WorkflowStageUpdaterListener does not support listening for the event: '%s'",
|
||||
execution.getEventName()));
|
||||
}
|
||||
} catch (Exception exc) {
|
||||
LOG.error(
|
||||
String.format(
|
||||
"WorkflowStageUpdaterListener does not support listening for the event: '%s'",
|
||||
execution.getEventName()));
|
||||
"[%s] Failed due to: %s ",
|
||||
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()),
|
||||
exc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,9 +1,15 @@
|
||||
package org.openmetadata.service.governance.workflows.elements;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.flowable.bpmn.model.Activity;
|
||||
import org.flowable.bpmn.model.BoundaryEvent;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.bpmn.model.EndEvent;
|
||||
import org.flowable.bpmn.model.ErrorEventDefinition;
|
||||
import org.flowable.bpmn.model.FlowNode;
|
||||
import org.flowable.bpmn.model.FlowableListener;
|
||||
import org.flowable.bpmn.model.Process;
|
||||
@ -16,6 +22,10 @@ import org.openmetadata.service.governance.workflows.flowable.builders.FlowableL
|
||||
public interface NodeInterface {
|
||||
void addToWorkflow(BpmnModel model, Process process);
|
||||
|
||||
default BoundaryEvent getRuntimeExceptionBoundaryEvent() {
|
||||
return null;
|
||||
}
|
||||
|
||||
default void attachWorkflowInstanceStageListeners(FlowNode flowableNode) {
|
||||
List<String> events = List.of("start", "end");
|
||||
attachWorkflowInstanceStageListeners(flowableNode, events);
|
||||
@ -59,4 +69,17 @@ public interface NodeInterface {
|
||||
.build();
|
||||
endEvent.getExecutionListeners().add(listener);
|
||||
}
|
||||
|
||||
default BoundaryEvent getRuntimeExceptionBoundaryEvent(Activity activity) {
|
||||
ErrorEventDefinition runtimeExceptionDefinition = new ErrorEventDefinition();
|
||||
runtimeExceptionDefinition.setErrorCode(WORKFLOW_RUNTIME_EXCEPTION);
|
||||
|
||||
BoundaryEvent runtimeExceptionBoundaryEvent = new BoundaryEvent();
|
||||
runtimeExceptionBoundaryEvent.setId(
|
||||
getFlowableElementId(activity.getId(), "runtimeExceptionBoundaryEvent"));
|
||||
runtimeExceptionBoundaryEvent.addEventDefinition(runtimeExceptionDefinition);
|
||||
|
||||
runtimeExceptionBoundaryEvent.setAttachedToRef(activity);
|
||||
return runtimeExceptionBoundaryEvent;
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTa
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;
|
||||
|
||||
import org.flowable.bpmn.model.BoundaryEvent;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.bpmn.model.EndEvent;
|
||||
import org.flowable.bpmn.model.FieldExtension;
|
||||
@ -21,6 +22,7 @@ import org.openmetadata.service.governance.workflows.flowable.builders.SubProces
|
||||
|
||||
public class CheckEntityAttributesTask implements NodeInterface {
|
||||
private final SubProcess subProcess;
|
||||
private final BoundaryEvent runtimeExceptionBoundaryEvent;
|
||||
|
||||
public CheckEntityAttributesTask(CheckEntityAttributesTaskDefinition nodeDefinition) {
|
||||
String subProcessId = nodeDefinition.getName();
|
||||
@ -45,9 +47,15 @@ public class CheckEntityAttributesTask implements NodeInterface {
|
||||
|
||||
attachWorkflowInstanceStageListeners(subProcess);
|
||||
|
||||
this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
|
||||
this.subProcess = subProcess;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
|
||||
return runtimeExceptionBoundaryEvent;
|
||||
}
|
||||
|
||||
private ServiceTask getCheckEntityAttributesServiceTask(String subProcessId, String rules) {
|
||||
FieldExtension rulesExpr =
|
||||
new FieldExtensionBuilder().fieldName("rulesExpr").fieldValue(rules).build();
|
||||
@ -63,5 +71,6 @@ public class CheckEntityAttributesTask implements NodeInterface {
|
||||
|
||||
public void addToWorkflow(BpmnModel model, Process process) {
|
||||
process.addFlowElement(subProcess);
|
||||
process.addFlowElement(runtimeExceptionBoundaryEvent);
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTa
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;
|
||||
|
||||
import java.util.Optional;
|
||||
import org.flowable.bpmn.model.BoundaryEvent;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.bpmn.model.EndEvent;
|
||||
import org.flowable.bpmn.model.FieldExtension;
|
||||
@ -23,6 +24,7 @@ import org.openmetadata.service.governance.workflows.flowable.builders.SubProces
|
||||
|
||||
public class SetEntityCertificationTask implements NodeInterface {
|
||||
private final SubProcess subProcess;
|
||||
private final BoundaryEvent runtimeExceptionBoundaryEvent;
|
||||
|
||||
public SetEntityCertificationTask(SetEntityCertificationTaskDefinition nodeDefinition) {
|
||||
String subProcessId = nodeDefinition.getName();
|
||||
@ -50,9 +52,15 @@ public class SetEntityCertificationTask implements NodeInterface {
|
||||
|
||||
attachWorkflowInstanceStageListeners(subProcess);
|
||||
|
||||
this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
|
||||
this.subProcess = subProcess;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
|
||||
return runtimeExceptionBoundaryEvent;
|
||||
}
|
||||
|
||||
private ServiceTask getSetEntityCertificationServiceTask(
|
||||
String subProcessId, CertificationConfiguration.CertificationEnum certification) {
|
||||
FieldExtension certificationExpr =
|
||||
@ -75,5 +83,6 @@ public class SetEntityCertificationTask implements NodeInterface {
|
||||
|
||||
public void addToWorkflow(BpmnModel model, Process process) {
|
||||
process.addFlowElement(subProcess);
|
||||
process.addFlowElement(runtimeExceptionBoundaryEvent);
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTa
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;
|
||||
|
||||
import org.flowable.bpmn.model.BoundaryEvent;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.bpmn.model.EndEvent;
|
||||
import org.flowable.bpmn.model.FieldExtension;
|
||||
@ -21,6 +22,7 @@ import org.openmetadata.service.governance.workflows.flowable.builders.SubProces
|
||||
|
||||
public class SetGlossaryTermStatusTask implements NodeInterface {
|
||||
private final SubProcess subProcess;
|
||||
private final BoundaryEvent runtimeExceptionBoundaryEvent;
|
||||
|
||||
public SetGlossaryTermStatusTask(SetGlossaryTermStatusTaskDefinition nodeDefinition) {
|
||||
String subProcessId = nodeDefinition.getName();
|
||||
@ -46,9 +48,15 @@ public class SetGlossaryTermStatusTask implements NodeInterface {
|
||||
|
||||
attachWorkflowInstanceStageListeners(subProcess);
|
||||
|
||||
this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
|
||||
this.subProcess = subProcess;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
|
||||
return runtimeExceptionBoundaryEvent;
|
||||
}
|
||||
|
||||
private ServiceTask getSetGlossaryTermStatusServiceTask(String subProcessId, String status) {
|
||||
FieldExtension statusExpr =
|
||||
new FieldExtensionBuilder().fieldName("statusExpr").fieldValue(status).build();
|
||||
@ -64,5 +72,6 @@ public class SetGlossaryTermStatusTask implements NodeInterface {
|
||||
|
||||
public void addToWorkflow(BpmnModel model, Process process) {
|
||||
process.addFlowElement(subProcess);
|
||||
process.addFlowElement(runtimeExceptionBoundaryEvent);
|
||||
}
|
||||
}
|
||||
|
@ -1,11 +1,16 @@
|
||||
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
|
||||
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
|
||||
|
||||
import io.github.jamsesso.jsonlogic.JsonLogic;
|
||||
import io.github.jamsesso.jsonlogic.JsonLogicException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.api.delegate.Expression;
|
||||
import org.flowable.engine.delegate.BpmnError;
|
||||
import org.flowable.engine.delegate.DelegateExecution;
|
||||
import org.flowable.engine.delegate.JavaDelegate;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
@ -14,15 +19,25 @@ import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
public class CheckEntityAttributesImpl implements JavaDelegate {
|
||||
private Expression rulesExpr;
|
||||
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
String rules = (String) rulesExpr.getValue(execution);
|
||||
MessageParser.EntityLink entityLink =
|
||||
MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE));
|
||||
execution.setVariable(RESULT_VARIABLE, checkAttributes(entityLink, rules));
|
||||
try {
|
||||
String rules = (String) rulesExpr.getValue(execution);
|
||||
MessageParser.EntityLink entityLink =
|
||||
MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE));
|
||||
execution.setVariable(RESULT_VARIABLE, checkAttributes(entityLink, rules));
|
||||
} catch (Exception exc) {
|
||||
LOG.error(
|
||||
String.format(
|
||||
"[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())),
|
||||
exc);
|
||||
execution.setVariable(EXCEPTION_VARIABLE, exc.toString());
|
||||
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private Boolean checkAttributes(MessageParser.EntityLink entityLink, String rules) {
|
||||
|
@ -1,11 +1,16 @@
|
||||
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RESOLVED_BY_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
|
||||
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
|
||||
|
||||
import java.util.Optional;
|
||||
import javax.json.JsonPatch;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.api.delegate.Expression;
|
||||
import org.flowable.engine.delegate.BpmnError;
|
||||
import org.flowable.engine.delegate.DelegateExecution;
|
||||
import org.flowable.engine.delegate.JavaDelegate;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
@ -18,25 +23,35 @@ import org.openmetadata.service.resources.tags.TagLabelUtil;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
public class SetEntityCertificationImpl implements JavaDelegate {
|
||||
private Expression certificationExpr;
|
||||
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
MessageParser.EntityLink entityLink =
|
||||
MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE));
|
||||
String entityType = entityLink.getEntityType();
|
||||
EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL);
|
||||
try {
|
||||
MessageParser.EntityLink entityLink =
|
||||
MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE));
|
||||
String entityType = entityLink.getEntityType();
|
||||
EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL);
|
||||
|
||||
String certification =
|
||||
Optional.ofNullable(certificationExpr)
|
||||
.map(certificationExpr -> (String) certificationExpr.getValue(execution))
|
||||
.orElse(null);
|
||||
String user =
|
||||
Optional.ofNullable((String) execution.getVariable(RESOLVED_BY_VARIABLE))
|
||||
.orElse(entity.getUpdatedBy());
|
||||
String certification =
|
||||
Optional.ofNullable(certificationExpr)
|
||||
.map(certificationExpr -> (String) certificationExpr.getValue(execution))
|
||||
.orElse(null);
|
||||
String user =
|
||||
Optional.ofNullable((String) execution.getVariable(RESOLVED_BY_VARIABLE))
|
||||
.orElse(entity.getUpdatedBy());
|
||||
|
||||
setStatus(entity, entityType, user, certification);
|
||||
setStatus(entity, entityType, user, certification);
|
||||
} catch (Exception exc) {
|
||||
LOG.error(
|
||||
String.format(
|
||||
"[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())),
|
||||
exc);
|
||||
execution.setVariable(EXCEPTION_VARIABLE, exc.toString());
|
||||
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void setStatus(
|
||||
|
@ -1,12 +1,17 @@
|
||||
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RESOLVED_BY_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
|
||||
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import javax.json.JsonPatch;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.api.delegate.Expression;
|
||||
import org.flowable.engine.delegate.BpmnError;
|
||||
import org.flowable.engine.delegate.DelegateExecution;
|
||||
import org.flowable.engine.delegate.JavaDelegate;
|
||||
import org.openmetadata.schema.entity.data.GlossaryTerm;
|
||||
@ -16,21 +21,31 @@ import org.openmetadata.service.jdbi3.GlossaryTermRepository;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
public class SetGlossaryTermStatusImpl implements JavaDelegate {
|
||||
private Expression statusExpr;
|
||||
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
MessageParser.EntityLink entityLink =
|
||||
MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE));
|
||||
GlossaryTerm glossaryTerm = Entity.getEntity(entityLink, "*", Include.ALL);
|
||||
try {
|
||||
MessageParser.EntityLink entityLink =
|
||||
MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE));
|
||||
GlossaryTerm glossaryTerm = Entity.getEntity(entityLink, "*", Include.ALL);
|
||||
|
||||
String status = (String) statusExpr.getValue(execution);
|
||||
String user =
|
||||
Optional.ofNullable((String) execution.getVariable(RESOLVED_BY_VARIABLE))
|
||||
.orElse(glossaryTerm.getUpdatedBy());
|
||||
String status = (String) statusExpr.getValue(execution);
|
||||
String user =
|
||||
Optional.ofNullable((String) execution.getVariable(RESOLVED_BY_VARIABLE))
|
||||
.orElse(glossaryTerm.getUpdatedBy());
|
||||
|
||||
setStatus(glossaryTerm, user, status);
|
||||
setStatus(glossaryTerm, user, status);
|
||||
} catch (Exception exc) {
|
||||
LOG.error(
|
||||
String.format(
|
||||
"[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())),
|
||||
exc);
|
||||
execution.setVariable(EXCEPTION_VARIABLE, exc.toString());
|
||||
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void setStatus(GlossaryTerm glossaryTerm, String user, String status) {
|
||||
|
@ -1,14 +1,21 @@
|
||||
package org.openmetadata.service.governance.workflows.elements.nodes.endEvent;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.bpmn.model.Process;
|
||||
import org.openmetadata.schema.governance.workflows.elements.nodes.endEvent.EndEventDefinition;
|
||||
import org.openmetadata.service.governance.workflows.elements.NodeInterface;
|
||||
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
|
||||
|
||||
@Getter
|
||||
public class EndEvent implements NodeInterface {
|
||||
private final org.flowable.bpmn.model.EndEvent endEvent;
|
||||
|
||||
public EndEvent(String id) {
|
||||
this.endEvent = new EndEventBuilder().id(id).build();
|
||||
attachWorkflowInstanceStageListeners(endEvent);
|
||||
}
|
||||
|
||||
public EndEvent(EndEventDefinition nodeDefinition) {
|
||||
this.endEvent = new EndEventBuilder().id(nodeDefinition.getName()).build();
|
||||
attachWorkflowInstanceStageListeners(endEvent);
|
||||
|
@ -34,6 +34,7 @@ import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
public class UserApprovalTask implements NodeInterface {
|
||||
private final SubProcess subProcess;
|
||||
private final BoundaryEvent runtimeExceptionBoundaryEvent;
|
||||
private final List<Message> messages = new ArrayList<>();
|
||||
|
||||
public UserApprovalTask(UserApprovalTaskDefinition nodeDefinition) {
|
||||
@ -92,9 +93,15 @@ public class UserApprovalTask implements NodeInterface {
|
||||
|
||||
attachWorkflowInstanceStageListeners(subProcess);
|
||||
|
||||
this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
|
||||
this.subProcess = subProcess;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
|
||||
return runtimeExceptionBoundaryEvent;
|
||||
}
|
||||
|
||||
private ServiceTask getSetAssigneesVariableServiceTask(
|
||||
String subProcessId, FieldExtension assigneesExpr, FieldExtension assigneesVarNameExpr) {
|
||||
ServiceTask serviceTask =
|
||||
@ -146,6 +153,7 @@ public class UserApprovalTask implements NodeInterface {
|
||||
|
||||
public void addToWorkflow(BpmnModel model, Process process) {
|
||||
process.addFlowElement(subProcess);
|
||||
process.addFlowElement(runtimeExceptionBoundaryEvent);
|
||||
for (Message message : messages) {
|
||||
model.addMessage(message);
|
||||
}
|
||||
|
@ -1,12 +1,17 @@
|
||||
package org.openmetadata.service.governance.workflows.elements.nodes.userTask.impl;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.STAGE_INSTANCE_STATE_ID_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
|
||||
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.engine.delegate.BpmnError;
|
||||
import org.flowable.engine.delegate.TaskListener;
|
||||
import org.flowable.identitylink.api.IdentityLink;
|
||||
import org.flowable.task.service.delegate.DelegateTask;
|
||||
@ -27,23 +32,35 @@ import org.openmetadata.service.resources.feeds.FeedResource;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.util.WebsocketNotificationHandler;
|
||||
|
||||
@Slf4j
|
||||
public class CreateApprovalTaskImpl implements TaskListener {
|
||||
@Override
|
||||
public void notify(DelegateTask delegateTask) {
|
||||
List<EntityReference> assignees = getAssignees(delegateTask);
|
||||
MessageParser.EntityLink entityLink =
|
||||
MessageParser.EntityLink.parse((String) delegateTask.getVariable(RELATED_ENTITY_VARIABLE));
|
||||
GlossaryTerm entity = Entity.getEntity(entityLink, "*", Include.ALL);
|
||||
try {
|
||||
List<EntityReference> assignees = getAssignees(delegateTask);
|
||||
MessageParser.EntityLink entityLink =
|
||||
MessageParser.EntityLink.parse(
|
||||
(String) delegateTask.getVariable(RELATED_ENTITY_VARIABLE));
|
||||
GlossaryTerm entity = Entity.getEntity(entityLink, "*", Include.ALL);
|
||||
|
||||
Thread task = createApprovalTask(entity, assignees);
|
||||
WorkflowHandler.getInstance().setCustomTaskId(delegateTask.getId(), task.getId());
|
||||
Thread task = createApprovalTask(entity, assignees);
|
||||
WorkflowHandler.getInstance().setCustomTaskId(delegateTask.getId(), task.getId());
|
||||
|
||||
UUID workflowInstanceStateId =
|
||||
(UUID) delegateTask.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE);
|
||||
WorkflowInstanceStateRepository workflowInstanceStateRepository =
|
||||
(WorkflowInstanceStateRepository)
|
||||
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
|
||||
workflowInstanceStateRepository.updateStageWithTask(task.getId(), workflowInstanceStateId);
|
||||
UUID workflowInstanceStateId =
|
||||
(UUID) delegateTask.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE);
|
||||
WorkflowInstanceStateRepository workflowInstanceStateRepository =
|
||||
(WorkflowInstanceStateRepository)
|
||||
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
|
||||
workflowInstanceStateRepository.updateStageWithTask(task.getId(), workflowInstanceStateId);
|
||||
} catch (Exception exc) {
|
||||
LOG.error(
|
||||
String.format(
|
||||
"[%s] Failure: ",
|
||||
getProcessDefinitionKeyFromId(delegateTask.getProcessDefinitionId())),
|
||||
exc);
|
||||
delegateTask.setVariable(EXCEPTION_VARIABLE, exc.toString());
|
||||
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private List<EntityReference> getAssignees(DelegateTask delegateTask) {
|
||||
|
@ -1,12 +1,17 @@
|
||||
package org.openmetadata.service.governance.workflows.elements.nodes.userTask.impl;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
|
||||
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.api.delegate.Expression;
|
||||
import org.flowable.engine.delegate.BpmnError;
|
||||
import org.flowable.engine.delegate.DelegateExecution;
|
||||
import org.flowable.engine.delegate.JavaDelegate;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
@ -16,33 +21,44 @@ import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
public class SetApprovalAssigneesImpl implements JavaDelegate {
|
||||
private Expression assigneesExpr;
|
||||
private Expression assigneesVarNameExpr;
|
||||
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
Map<String, Object> assigneesConfig =
|
||||
JsonUtils.readOrConvertValue(assigneesExpr.getValue(execution), Map.class);
|
||||
Boolean addReviewers = (Boolean) assigneesConfig.get("addReviewers");
|
||||
Optional<List<EntityReference>> oExtraAssignees =
|
||||
Optional.ofNullable(
|
||||
JsonUtils.readOrConvertValue(assigneesConfig.get("extraAssignees"), List.class));
|
||||
try {
|
||||
Map<String, Object> assigneesConfig =
|
||||
JsonUtils.readOrConvertValue(assigneesExpr.getValue(execution), Map.class);
|
||||
Boolean addReviewers = (Boolean) assigneesConfig.get("addReviewers");
|
||||
Optional<List<EntityReference>> oExtraAssignees =
|
||||
Optional.ofNullable(
|
||||
JsonUtils.readOrConvertValue(assigneesConfig.get("extraAssignees"), List.class));
|
||||
|
||||
List<String> assignees = new ArrayList<>();
|
||||
List<String> assignees = new ArrayList<>();
|
||||
|
||||
if (addReviewers) {
|
||||
MessageParser.EntityLink entityLink =
|
||||
MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE));
|
||||
EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL);
|
||||
assignees.addAll(getEntityLinkStringFromEntityReference(entity.getReviewers()));
|
||||
if (addReviewers) {
|
||||
MessageParser.EntityLink entityLink =
|
||||
MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE));
|
||||
EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL);
|
||||
assignees.addAll(getEntityLinkStringFromEntityReference(entity.getReviewers()));
|
||||
}
|
||||
|
||||
oExtraAssignees.ifPresent(
|
||||
extraAssignees ->
|
||||
assignees.addAll(getEntityLinkStringFromEntityReference(extraAssignees)));
|
||||
|
||||
execution.setVariableLocal(
|
||||
assigneesVarNameExpr.getValue(execution).toString(), JsonUtils.pojoToJson(assignees));
|
||||
} catch (Exception exc) {
|
||||
LOG.error(
|
||||
String.format(
|
||||
"[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())),
|
||||
exc);
|
||||
execution.setVariable(EXCEPTION_VARIABLE, exc.toString());
|
||||
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
|
||||
}
|
||||
|
||||
oExtraAssignees.ifPresent(
|
||||
extraAssignees -> assignees.addAll(getEntityLinkStringFromEntityReference(extraAssignees)));
|
||||
|
||||
execution.setVariableLocal(
|
||||
assigneesVarNameExpr.getValue(execution).toString(), JsonUtils.pojoToJson(assignees));
|
||||
}
|
||||
|
||||
private List<String> getEntityLinkStringFromEntityReference(List<EntityReference> assignees) {
|
||||
|
@ -1,20 +1,37 @@
|
||||
package org.openmetadata.service.governance.workflows.elements.nodes.userTask.impl;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
|
||||
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
|
||||
|
||||
import java.util.List;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.api.delegate.Expression;
|
||||
import org.flowable.engine.delegate.BpmnError;
|
||||
import org.flowable.engine.delegate.TaskListener;
|
||||
import org.flowable.task.service.delegate.DelegateTask;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
public class SetCandidateUsersImpl implements TaskListener {
|
||||
private Expression assigneesVarNameExpr;
|
||||
|
||||
@Override
|
||||
public void notify(DelegateTask delegateTask) {
|
||||
List<String> assignees =
|
||||
JsonUtils.readOrConvertValue(
|
||||
delegateTask.getVariable(assigneesVarNameExpr.getValue(delegateTask).toString()),
|
||||
List.class);
|
||||
delegateTask.addCandidateUsers(assignees);
|
||||
try {
|
||||
List<String> assignees =
|
||||
JsonUtils.readOrConvertValue(
|
||||
delegateTask.getVariable(assigneesVarNameExpr.getValue(delegateTask).toString()),
|
||||
List.class);
|
||||
delegateTask.addCandidateUsers(assignees);
|
||||
} catch (Exception exc) {
|
||||
LOG.error(
|
||||
String.format(
|
||||
"[%s] Failure: ",
|
||||
getProcessDefinitionKeyFromId(delegateTask.getProcessDefinitionId())),
|
||||
exc);
|
||||
delegateTask.setVariable(EXCEPTION_VARIABLE, exc.toString());
|
||||
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,15 +1,19 @@
|
||||
package org.openmetadata.service.governance.workflows.elements.triggers;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import lombok.Getter;
|
||||
import org.flowable.bpmn.model.BoundaryEvent;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.bpmn.model.CallActivity;
|
||||
import org.flowable.bpmn.model.EndEvent;
|
||||
import org.flowable.bpmn.model.ErrorEventDefinition;
|
||||
import org.flowable.bpmn.model.FieldExtension;
|
||||
import org.flowable.bpmn.model.IOParameter;
|
||||
import org.flowable.bpmn.model.Process;
|
||||
@ -55,6 +59,21 @@ public class EventBasedEntityTrigger implements TriggerInterface {
|
||||
CallActivity workflowTrigger = getWorkflowTrigger(triggerWorkflowId, mainWorkflowName);
|
||||
process.addFlowElement(workflowTrigger);
|
||||
|
||||
ErrorEventDefinition runtimeExceptionDefinition = new ErrorEventDefinition();
|
||||
runtimeExceptionDefinition.setErrorCode(WORKFLOW_RUNTIME_EXCEPTION);
|
||||
|
||||
BoundaryEvent runtimeExceptionBoundaryEvent = new BoundaryEvent();
|
||||
runtimeExceptionBoundaryEvent.setId(
|
||||
getFlowableElementId(workflowTrigger.getId(), "runtimeExceptionBoundaryEvent"));
|
||||
runtimeExceptionBoundaryEvent.addEventDefinition(runtimeExceptionDefinition);
|
||||
|
||||
runtimeExceptionBoundaryEvent.setAttachedToRef(workflowTrigger);
|
||||
process.addFlowElement(runtimeExceptionBoundaryEvent);
|
||||
|
||||
EndEvent errorEndEvent =
|
||||
new EndEventBuilder().id(getFlowableElementId(triggerWorkflowId, "errorEndEvent")).build();
|
||||
process.addFlowElement(errorEndEvent);
|
||||
|
||||
EndEvent endEvent =
|
||||
new EndEventBuilder().id(getFlowableElementId(triggerWorkflowId, "endEvent")).build();
|
||||
process.addFlowElement(endEvent);
|
||||
@ -77,6 +96,8 @@ public class EventBasedEntityTrigger implements TriggerInterface {
|
||||
process.addFlowElement(filterNotPassed);
|
||||
// WorkflowTrigger -> End
|
||||
process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), endEvent.getId()));
|
||||
process.addFlowElement(
|
||||
new SequenceFlow(runtimeExceptionBoundaryEvent.getId(), errorEndEvent.getId()));
|
||||
|
||||
this.process = process;
|
||||
this.triggerWorkflowId = triggerWorkflowId;
|
||||
@ -126,7 +147,12 @@ public class EventBasedEntityTrigger implements TriggerInterface {
|
||||
inputParameter.setSource(RELATED_ENTITY_VARIABLE);
|
||||
inputParameter.setTarget(RELATED_ENTITY_VARIABLE);
|
||||
|
||||
IOParameter outputParameter = new IOParameter();
|
||||
outputParameter.setSource(EXCEPTION_VARIABLE);
|
||||
outputParameter.setTarget(EXCEPTION_VARIABLE);
|
||||
|
||||
workflowTrigger.setInParameters(List.of(inputParameter));
|
||||
workflowTrigger.setOutParameters(List.of(outputParameter));
|
||||
|
||||
return workflowTrigger;
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package org.openmetadata.service.governance.workflows.elements.triggers;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;
|
||||
|
||||
@ -121,7 +122,12 @@ public class PeriodicBatchEntityTrigger implements TriggerInterface {
|
||||
inputParameter.setSource(RELATED_ENTITY_VARIABLE);
|
||||
inputParameter.setTarget(RELATED_ENTITY_VARIABLE);
|
||||
|
||||
IOParameter outputParameter = new IOParameter();
|
||||
outputParameter.setSource(EXCEPTION_VARIABLE);
|
||||
outputParameter.setTarget(EXCEPTION_VARIABLE);
|
||||
|
||||
workflowTrigger.setInParameters(List.of(inputParameter));
|
||||
workflowTrigger.setOutParameters(List.of(outputParameter));
|
||||
workflowTrigger.setLoopCharacteristics(multiInstance);
|
||||
|
||||
return workflowTrigger;
|
||||
|
@ -1,22 +1,28 @@
|
||||
package org.openmetadata.service.governance.workflows.flowable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import lombok.Getter;
|
||||
import org.flowable.bpmn.model.BoundaryEvent;
|
||||
import org.flowable.bpmn.model.BpmnModel;
|
||||
import org.flowable.bpmn.model.Process;
|
||||
import org.flowable.bpmn.model.SequenceFlow;
|
||||
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
|
||||
import org.openmetadata.schema.governance.workflows.elements.EdgeDefinition;
|
||||
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
|
||||
import org.openmetadata.service.governance.workflows.elements.Edge;
|
||||
import org.openmetadata.service.governance.workflows.elements.NodeFactory;
|
||||
import org.openmetadata.service.governance.workflows.elements.NodeInterface;
|
||||
import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Getter
|
||||
public class MainWorkflow {
|
||||
private final BpmnModel model;
|
||||
private final String workflowName;
|
||||
private final List<BoundaryEvent> runtimeExceptionBoundaryEvents = new ArrayList<>();
|
||||
|
||||
public MainWorkflow(WorkflowDefinition workflowDefinition) {
|
||||
BpmnModel model = new BpmnModel();
|
||||
@ -33,8 +39,12 @@ public class MainWorkflow {
|
||||
// Add Nodes
|
||||
for (Object nodeDefinitionObj :
|
||||
(List<WorkflowNodeDefinitionInterface>) workflowDefinition.getNodes()) {
|
||||
NodeFactory.createNode(JsonUtils.readOrConvertValue(nodeDefinitionObj, Map.class))
|
||||
.addToWorkflow(model, process);
|
||||
NodeInterface node =
|
||||
NodeFactory.createNode(JsonUtils.readOrConvertValue(nodeDefinitionObj, Map.class));
|
||||
node.addToWorkflow(model, process);
|
||||
|
||||
Optional.ofNullable(node.getRuntimeExceptionBoundaryEvent())
|
||||
.ifPresent(runtimeExceptionBoundaryEvents::add);
|
||||
}
|
||||
|
||||
// Add Edges
|
||||
@ -43,7 +53,18 @@ public class MainWorkflow {
|
||||
edge.addToWorkflow(model, process);
|
||||
}
|
||||
|
||||
// Configure Exception Flow
|
||||
configureRuntimeExceptionFlow(process);
|
||||
|
||||
this.model = model;
|
||||
this.workflowName = workflowName;
|
||||
}
|
||||
|
||||
private void configureRuntimeExceptionFlow(Process process) {
|
||||
EndEvent errorEndEvent = new EndEvent("Error");
|
||||
process.addFlowElement(errorEndEvent.getEndEvent());
|
||||
for (BoundaryEvent event : runtimeExceptionBoundaryEvents) {
|
||||
process.addFlowElement(new SequenceFlow(event.getId(), errorEndEvent.getEndEvent().getId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -197,6 +197,9 @@ public class FeedRepository {
|
||||
if (repository.supportsTags) {
|
||||
fieldList.add("tags");
|
||||
}
|
||||
if (repository.supportsReviewers) {
|
||||
fieldList.add("reviewers");
|
||||
}
|
||||
return String.join(",", fieldList.toArray(new String[0]));
|
||||
}
|
||||
}
|
||||
|
@ -598,6 +598,9 @@ public class GlossaryTermRepository extends EntityRepository<GlossaryTerm> {
|
||||
@Override
|
||||
public EntityInterface performTask(String user, ResolveTask resolveTask) {
|
||||
// TODO: Resolve this outside
|
||||
GlossaryTerm glossaryTerm = (GlossaryTerm) threadContext.getAboutEntity();
|
||||
checkUpdatedByReviewer(glossaryTerm, user);
|
||||
|
||||
UUID taskId = threadContext.getThread().getId();
|
||||
Map<String, Object> variables = new HashMap<>();
|
||||
variables.put(RESULT_VARIABLE, resolveTask.getNewValue().equalsIgnoreCase("approved"));
|
||||
@ -607,7 +610,6 @@ public class GlossaryTermRepository extends EntityRepository<GlossaryTerm> {
|
||||
|
||||
// TODO: performTask returns the updated Entity and the flow applies the new value.
|
||||
// This should be changed with the new Governance Workflows.
|
||||
GlossaryTerm glossaryTerm = (GlossaryTerm) threadContext.getAboutEntity();
|
||||
// glossaryTerm.setStatus(Status.APPROVED);
|
||||
return glossaryTerm;
|
||||
}
|
||||
@ -652,7 +654,7 @@ public class GlossaryTermRepository extends EntityRepository<GlossaryTerm> {
|
||||
}
|
||||
}
|
||||
|
||||
private void checkUpdatedByReviewer(GlossaryTerm term, String updatedBy) {
|
||||
public static void checkUpdatedByReviewer(GlossaryTerm term, String updatedBy) {
|
||||
// Only list of allowed reviewers can change the status from DRAFT to APPROVED
|
||||
List<EntityReference> reviewers = term.getReviewers();
|
||||
if (!nullOrEmpty(reviewers)) {
|
||||
|
@ -1,5 +1,7 @@
|
||||
package org.openmetadata.service.jdbi3;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import org.openmetadata.schema.governance.workflows.WorkflowInstance;
|
||||
@ -41,12 +43,17 @@ public class WorkflowInstanceRepository extends EntityTimeSeriesRepository<Workf
|
||||
workflowDefinitionName);
|
||||
}
|
||||
|
||||
public void updateWorkflowInstance(UUID workflowInstanceId, Long endedAt) {
|
||||
public void updateWorkflowInstance(
|
||||
UUID workflowInstanceId, Long endedAt, Map<String, Object> variables) {
|
||||
WorkflowInstance workflowInstance =
|
||||
JsonUtils.readValue(timeSeriesDao.getById(workflowInstanceId), WorkflowInstance.class);
|
||||
|
||||
workflowInstance.setEndedAt(endedAt);
|
||||
|
||||
if (variables.containsKey(EXCEPTION_VARIABLE)) {
|
||||
workflowInstance.setException(true);
|
||||
}
|
||||
|
||||
getTimeSeriesDao().update(JsonUtils.pojoToJson(workflowInstance), workflowInstanceId);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
package org.openmetadata.service.jdbi3;
|
||||
|
||||
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import org.openmetadata.schema.governance.workflows.Stage;
|
||||
@ -61,6 +63,10 @@ public class WorkflowInstanceStateRepository
|
||||
stage.setEndedAt(endedAt);
|
||||
stage.setVariables(variables);
|
||||
|
||||
if (variables.containsKey(EXCEPTION_VARIABLE)) {
|
||||
workflowInstanceState.setException(true);
|
||||
}
|
||||
|
||||
workflowInstanceState.setStage(stage);
|
||||
|
||||
getTimeSeriesDao().update(JsonUtils.pojoToJson(workflowInstanceState), workflowInstanceStateId);
|
||||
|
@ -30,6 +30,10 @@
|
||||
"timestamp": {
|
||||
"description": "Timestamp on which the workflow instance state was created.",
|
||||
"$ref": "../../type/basic.json#/definitions/timestamp"
|
||||
},
|
||||
"exception": {
|
||||
"description": "If the Workflow Instance has errors, 'True'. Else, 'False'.",
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"required": [],
|
||||
|
@ -54,6 +54,10 @@
|
||||
"timestamp": {
|
||||
"description": "Timestamp on which the workflow instance state was created.",
|
||||
"$ref": "../../type/basic.json#/definitions/timestamp"
|
||||
},
|
||||
"exception": {
|
||||
"description": "If the Workflow Instance has errors, 'True'. Else, 'False'.",
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"required": [],
|
||||
|
Loading…
x
Reference in New Issue
Block a user