From 8cade39bade38fcdb0affe3c5ba24aaa63f258f1 Mon Sep 17 00:00:00 2001 From: IceS2 Date: Tue, 26 Nov 2024 17:04:59 +0100 Subject: [PATCH] MINOR: Increase visibility on flowable process (#18792) * Add annotation to turn delete_recursive test into the last one to be executed * Improve Fetching Entities by only fetching the FQN * Cut back default batchsize to 500 * implement logging for flowable process * Implement Error Handling Flow * Remove testing code * Checking if User is Reviewer when resolving glossary task --- .../MainWorkflowTerminationListener.java | 34 ++++++++---- .../governance/workflows/Workflow.java | 2 + .../workflows/WorkflowFailureListener.java | 33 ++++++++++++ .../governance/workflows/WorkflowHandler.java | 15 ++++++ ...flowInstanceExecutionIdSetterListener.java | 23 +++++++- .../workflows/WorkflowInstanceListener.java | 36 +++++++++---- .../WorkflowInstanceStageListener.java | 26 ++++++---- .../workflows/elements/NodeInterface.java | 23 ++++++++ .../CheckEntityAttributesTask.java | 9 ++++ .../SetEntityCertificationTask.java | 9 ++++ .../SetGlossaryTermStatusTask.java | 9 ++++ .../impl/CheckEntityAttributesImpl.java | 23 ++++++-- .../impl/SetEntityCertificationImpl.java | 39 +++++++++----- .../impl/SetGlossaryTermStatusImpl.java | 31 ++++++++--- .../elements/nodes/endEvent/EndEvent.java | 7 +++ .../nodes/userTask/UserApprovalTask.java | 8 +++ .../userTask/impl/CreateApprovalTaskImpl.java | 41 ++++++++++----- .../impl/SetApprovalAssigneesImpl.java | 52 ++++++++++++------- .../userTask/impl/SetCandidateUsersImpl.java | 27 ++++++++-- .../triggers/EventBasedEntityTrigger.java | 26 ++++++++++ .../triggers/PeriodicBatchEntityTrigger.java | 6 +++ .../workflows/flowable/MainWorkflow.java | 25 ++++++++- .../service/jdbi3/FeedRepository.java | 3 ++ .../service/jdbi3/GlossaryTermRepository.java | 6 ++- .../jdbi3/WorkflowInstanceRepository.java | 9 +++- .../WorkflowInstanceStateRepository.java | 6 +++ .../workflows/workflowInstance.json | 4 ++ .../workflows/workflowInstanceState.json | 4 ++ 28 files changed, 441 insertions(+), 95 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/MainWorkflowTerminationListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/MainWorkflowTerminationListener.java index 9af9538935f..f5e075d97e4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/MainWorkflowTerminationListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/MainWorkflowTerminationListener.java @@ -1,30 +1,42 @@ package org.openmetadata.service.governance.workflows; import static org.openmetadata.service.governance.workflows.Workflow.STAGE_INSTANCE_STATE_ID_VARIABLE; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; import java.util.UUID; +import lombok.extern.slf4j.Slf4j; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.WorkflowInstanceRepository; import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository; +@Slf4j public class MainWorkflowTerminationListener implements JavaDelegate { @Override public void execute(DelegateExecution execution) { - WorkflowInstanceStateRepository workflowInstanceStateRepository = - (WorkflowInstanceStateRepository) - Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); + try { + WorkflowInstanceStateRepository workflowInstanceStateRepository = + (WorkflowInstanceStateRepository) + Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); - UUID workflowInstanceStateId = (UUID) execution.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE); - workflowInstanceStateRepository.updateStage( - workflowInstanceStateId, System.currentTimeMillis(), execution.getVariables()); + UUID workflowInstanceStateId = (UUID) execution.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE); + workflowInstanceStateRepository.updateStage( + workflowInstanceStateId, System.currentTimeMillis(), execution.getVariables()); - WorkflowInstanceRepository workflowInstanceRepository = - (WorkflowInstanceRepository) Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE); + WorkflowInstanceRepository workflowInstanceRepository = + (WorkflowInstanceRepository) + Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE); - UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); - workflowInstanceRepository.updateWorkflowInstance( - workflowInstanceId, System.currentTimeMillis()); + UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); + workflowInstanceRepository.updateWorkflowInstance( + workflowInstanceId, System.currentTimeMillis(), execution.getVariables()); + } catch (Exception exc) { + LOG.error( + String.format( + "[%s] Failed due to: %s ", + getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()), + exc); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java index 013d7ef43a7..3b9b437e361 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java @@ -13,6 +13,8 @@ public class Workflow { public static final String STAGE_INSTANCE_STATE_ID_VARIABLE = "stageInstanceStateId"; public static final String WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE = "workflowInstanceExecutionId"; + public static final String WORKFLOW_RUNTIME_EXCEPTION = "workflowRuntimeException"; + public static final String EXCEPTION_VARIABLE = "exception"; private final TriggerWorkflow triggerWorkflow; private final MainWorkflow mainWorkflow; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java new file mode 100644 index 00000000000..ad197d0f7e3 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java @@ -0,0 +1,33 @@ +package org.openmetadata.service.governance.workflows; + +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.common.engine.api.delegate.event.FlowableEventListener; + +@Slf4j +public class WorkflowFailureListener implements FlowableEventListener { + + @Override + public void onEvent(FlowableEvent event) { + if (FlowableEngineEventType.JOB_EXECUTION_FAILURE.equals(event.getType())) { + LOG.error("Workflow Failed: " + event); + } + } + + @Override + public boolean isFailOnException() { + // Return true if the listener should fail the operation on an exception + return false; + } + + @Override + public boolean isFireOnTransactionLifecycleEvent() { + return false; + } + + @Override + public String getOnTransaction() { + return null; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index 48174ddbd1b..ae3ecab0ac3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -50,6 +50,9 @@ public class WorkflowHandler { .setJdbcDriver(config.getDataSourceFactory().getDriverClass()) .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE); + // Add Global Failure Listener + processEngineConfiguration.setEventListeners(List.of(new WorkflowFailureListener())); + if (ConnectionType.MYSQL.label.equals(config.getDataSourceFactory().getDriverClass())) { processEngineConfiguration.setDatabaseType(ProcessEngineConfiguration.DATABASE_TYPE_MYSQL); } else { @@ -256,4 +259,16 @@ public class WorkflowHandler { repositoryService.activateProcessDefinitionByKey( getTriggerWorkflowId(workflowName), true, null); } + + public void terminateWorkflow(String workflowName) { + runtimeService + .createProcessInstanceQuery() + .processDefinitionKey(getTriggerWorkflowId(workflowName)) + .list() + .forEach( + instance -> { + runtimeService.deleteProcessInstance( + instance.getId(), "Terminating all instances due to user request."); + }); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceExecutionIdSetterListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceExecutionIdSetterListener.java index 6cda1b8c408..9651615cd78 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceExecutionIdSetterListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceExecutionIdSetterListener.java @@ -1,15 +1,34 @@ package org.openmetadata.service.governance.workflows; +import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; import java.util.UUID; +import lombok.extern.slf4j.Slf4j; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; +@Slf4j public class WorkflowInstanceExecutionIdSetterListener implements JavaDelegate { @Override public void execute(DelegateExecution execution) { - UUID workflowInstanceExecutionId = UUID.randomUUID(); - execution.setVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE, workflowInstanceExecutionId); + try { + String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); + String relatedEntity = (String) execution.getVariable(RELATED_ENTITY_VARIABLE); + LOG.debug( + String.format( + "New Execution for Workflow '%s'. Related Entity: '%s'", + workflowName, relatedEntity)); + + UUID workflowInstanceExecutionId = UUID.randomUUID(); + execution.setVariable(WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE, workflowInstanceExecutionId); + } catch (Exception exc) { + LOG.error( + String.format( + "[%s] Failed due to: %s ", + getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()), + exc); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java index b6b1c0069ac..261afe62ab8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java @@ -13,16 +13,25 @@ import org.openmetadata.service.jdbi3.WorkflowInstanceRepository; public class WorkflowInstanceListener implements JavaDelegate { @Override public void execute(DelegateExecution execution) { - WorkflowInstanceRepository workflowInstanceRepository = - (WorkflowInstanceRepository) Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE); + try { + WorkflowInstanceRepository workflowInstanceRepository = + (WorkflowInstanceRepository) + Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE); - switch (execution.getEventName()) { - case "start" -> addWorkflowInstance(execution, workflowInstanceRepository); - case "end" -> updateWorkflowInstance(execution, workflowInstanceRepository); - default -> LOG.debug( + switch (execution.getEventName()) { + case "start" -> addWorkflowInstance(execution, workflowInstanceRepository); + case "end" -> updateWorkflowInstance(execution, workflowInstanceRepository); + default -> LOG.debug( + String.format( + "WorkflowStageUpdaterListener does not support listening for the event: '%s'", + execution.getEventName())); + } + } catch (Exception exc) { + LOG.error( String.format( - "WorkflowStageUpdaterListener does not support listening for the event: '%s'", - execution.getEventName())); + "[%s] Failed due to: %s ", + getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()), + exc); } } @@ -45,13 +54,22 @@ public class WorkflowInstanceListener implements JavaDelegate { workflowInstanceId, System.currentTimeMillis(), execution.getVariables()); + LOG.debug( + String.format( + "Workflow '%s' Triggered. Instance: '%s'", workflowDefinitionName, workflowInstanceId)); } private void updateWorkflowInstance( DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) { + String workflowDefinitionName = + getMainWorkflowDefinitionNameFromTrigger( + getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())); UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); workflowInstanceRepository.updateWorkflowInstance( - workflowInstanceId, System.currentTimeMillis()); + workflowInstanceId, System.currentTimeMillis(), execution.getVariables()); + LOG.debug( + String.format( + "Workflow '%s' Finished. Instance: '%s'", workflowDefinitionName, workflowInstanceId)); } private String getMainWorkflowDefinitionNameFromTrigger(String triggerWorkflowDefinitionName) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java index 3dd81d4d558..3730a42537b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java @@ -16,17 +16,25 @@ import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository; public class WorkflowInstanceStageListener implements JavaDelegate { @Override public void execute(DelegateExecution execution) { - WorkflowInstanceStateRepository workflowInstanceStateRepository = - (WorkflowInstanceStateRepository) - Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); + try { + WorkflowInstanceStateRepository workflowInstanceStateRepository = + (WorkflowInstanceStateRepository) + Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); - switch (execution.getEventName()) { - case "start" -> addNewStage(execution, workflowInstanceStateRepository); - case "end" -> updateStage(execution, workflowInstanceStateRepository); - default -> LOG.debug( + switch (execution.getEventName()) { + case "start" -> addNewStage(execution, workflowInstanceStateRepository); + case "end" -> updateStage(execution, workflowInstanceStateRepository); + default -> LOG.debug( + String.format( + "WorkflowStageUpdaterListener does not support listening for the event: '%s'", + execution.getEventName())); + } + } catch (Exception exc) { + LOG.error( String.format( - "WorkflowStageUpdaterListener does not support listening for the event: '%s'", - execution.getEventName())); + "[%s] Failed due to: %s ", + getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc.getMessage()), + exc); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java index d2c2fbcaef3..536ee685b9c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java @@ -1,9 +1,15 @@ package org.openmetadata.service.governance.workflows.elements; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; +import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; + import java.util.ArrayList; import java.util.List; +import org.flowable.bpmn.model.Activity; +import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.EndEvent; +import org.flowable.bpmn.model.ErrorEventDefinition; import org.flowable.bpmn.model.FlowNode; import org.flowable.bpmn.model.FlowableListener; import org.flowable.bpmn.model.Process; @@ -16,6 +22,10 @@ import org.openmetadata.service.governance.workflows.flowable.builders.FlowableL public interface NodeInterface { void addToWorkflow(BpmnModel model, Process process); + default BoundaryEvent getRuntimeExceptionBoundaryEvent() { + return null; + } + default void attachWorkflowInstanceStageListeners(FlowNode flowableNode) { List events = List.of("start", "end"); attachWorkflowInstanceStageListeners(flowableNode, events); @@ -59,4 +69,17 @@ public interface NodeInterface { .build(); endEvent.getExecutionListeners().add(listener); } + + default BoundaryEvent getRuntimeExceptionBoundaryEvent(Activity activity) { + ErrorEventDefinition runtimeExceptionDefinition = new ErrorEventDefinition(); + runtimeExceptionDefinition.setErrorCode(WORKFLOW_RUNTIME_EXCEPTION); + + BoundaryEvent runtimeExceptionBoundaryEvent = new BoundaryEvent(); + runtimeExceptionBoundaryEvent.setId( + getFlowableElementId(activity.getId(), "runtimeExceptionBoundaryEvent")); + runtimeExceptionBoundaryEvent.addEventDefinition(runtimeExceptionDefinition); + + runtimeExceptionBoundaryEvent.setAttachedToRef(activity); + return runtimeExceptionBoundaryEvent; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java index 6b379923b35..e4a65079702 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java @@ -2,6 +2,7 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTa import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; +import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.EndEvent; import org.flowable.bpmn.model.FieldExtension; @@ -21,6 +22,7 @@ import org.openmetadata.service.governance.workflows.flowable.builders.SubProces public class CheckEntityAttributesTask implements NodeInterface { private final SubProcess subProcess; + private final BoundaryEvent runtimeExceptionBoundaryEvent; public CheckEntityAttributesTask(CheckEntityAttributesTaskDefinition nodeDefinition) { String subProcessId = nodeDefinition.getName(); @@ -45,9 +47,15 @@ public class CheckEntityAttributesTask implements NodeInterface { attachWorkflowInstanceStageListeners(subProcess); + this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess); this.subProcess = subProcess; } + @Override + public BoundaryEvent getRuntimeExceptionBoundaryEvent() { + return runtimeExceptionBoundaryEvent; + } + private ServiceTask getCheckEntityAttributesServiceTask(String subProcessId, String rules) { FieldExtension rulesExpr = new FieldExtensionBuilder().fieldName("rulesExpr").fieldValue(rules).build(); @@ -63,5 +71,6 @@ public class CheckEntityAttributesTask implements NodeInterface { public void addToWorkflow(BpmnModel model, Process process) { process.addFlowElement(subProcess); + process.addFlowElement(runtimeExceptionBoundaryEvent); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetEntityCertificationTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetEntityCertificationTask.java index 4c8c0d1538e..c9f0c972a5c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetEntityCertificationTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetEntityCertificationTask.java @@ -3,6 +3,7 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTa import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; import java.util.Optional; +import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.EndEvent; import org.flowable.bpmn.model.FieldExtension; @@ -23,6 +24,7 @@ import org.openmetadata.service.governance.workflows.flowable.builders.SubProces public class SetEntityCertificationTask implements NodeInterface { private final SubProcess subProcess; + private final BoundaryEvent runtimeExceptionBoundaryEvent; public SetEntityCertificationTask(SetEntityCertificationTaskDefinition nodeDefinition) { String subProcessId = nodeDefinition.getName(); @@ -50,9 +52,15 @@ public class SetEntityCertificationTask implements NodeInterface { attachWorkflowInstanceStageListeners(subProcess); + this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess); this.subProcess = subProcess; } + @Override + public BoundaryEvent getRuntimeExceptionBoundaryEvent() { + return runtimeExceptionBoundaryEvent; + } + private ServiceTask getSetEntityCertificationServiceTask( String subProcessId, CertificationConfiguration.CertificationEnum certification) { FieldExtension certificationExpr = @@ -75,5 +83,6 @@ public class SetEntityCertificationTask implements NodeInterface { public void addToWorkflow(BpmnModel model, Process process) { process.addFlowElement(subProcess); + process.addFlowElement(runtimeExceptionBoundaryEvent); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetGlossaryTermStatusTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetGlossaryTermStatusTask.java index f1d1362b103..55872a21546 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetGlossaryTermStatusTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/SetGlossaryTermStatusTask.java @@ -2,6 +2,7 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTa import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; +import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.EndEvent; import org.flowable.bpmn.model.FieldExtension; @@ -21,6 +22,7 @@ import org.openmetadata.service.governance.workflows.flowable.builders.SubProces public class SetGlossaryTermStatusTask implements NodeInterface { private final SubProcess subProcess; + private final BoundaryEvent runtimeExceptionBoundaryEvent; public SetGlossaryTermStatusTask(SetGlossaryTermStatusTaskDefinition nodeDefinition) { String subProcessId = nodeDefinition.getName(); @@ -46,9 +48,15 @@ public class SetGlossaryTermStatusTask implements NodeInterface { attachWorkflowInstanceStageListeners(subProcess); + this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess); this.subProcess = subProcess; } + @Override + public BoundaryEvent getRuntimeExceptionBoundaryEvent() { + return runtimeExceptionBoundaryEvent; + } + private ServiceTask getSetGlossaryTermStatusServiceTask(String subProcessId, String status) { FieldExtension statusExpr = new FieldExtensionBuilder().fieldName("statusExpr").fieldValue(status).build(); @@ -64,5 +72,6 @@ public class SetGlossaryTermStatusTask implements NodeInterface { public void addToWorkflow(BpmnModel model, Process process) { process.addFlowElement(subProcess); + process.addFlowElement(runtimeExceptionBoundaryEvent); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java index 404b6cdf9ca..7723107f45f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java @@ -1,11 +1,16 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; import io.github.jamsesso.jsonlogic.JsonLogic; import io.github.jamsesso.jsonlogic.JsonLogicException; +import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.BpmnError; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; import org.openmetadata.schema.EntityInterface; @@ -14,15 +19,25 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.util.JsonUtils; +@Slf4j public class CheckEntityAttributesImpl implements JavaDelegate { private Expression rulesExpr; @Override public void execute(DelegateExecution execution) { - String rules = (String) rulesExpr.getValue(execution); - MessageParser.EntityLink entityLink = - MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE)); - execution.setVariable(RESULT_VARIABLE, checkAttributes(entityLink, rules)); + try { + String rules = (String) rulesExpr.getValue(execution); + MessageParser.EntityLink entityLink = + MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE)); + execution.setVariable(RESULT_VARIABLE, checkAttributes(entityLink, rules)); + } catch (Exception exc) { + LOG.error( + String.format( + "[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())), + exc); + execution.setVariable(EXCEPTION_VARIABLE, exc.toString()); + throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage()); + } } private Boolean checkAttributes(MessageParser.EntityLink entityLink, String rules) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityCertificationImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityCertificationImpl.java index 62a6e0509b8..857605f3bee 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityCertificationImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityCertificationImpl.java @@ -1,11 +1,16 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RESOLVED_BY_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; import java.util.Optional; import javax.json.JsonPatch; +import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.BpmnError; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; import org.openmetadata.schema.EntityInterface; @@ -18,25 +23,35 @@ import org.openmetadata.service.resources.tags.TagLabelUtil; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.JsonUtils; +@Slf4j public class SetEntityCertificationImpl implements JavaDelegate { private Expression certificationExpr; @Override public void execute(DelegateExecution execution) { - MessageParser.EntityLink entityLink = - MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE)); - String entityType = entityLink.getEntityType(); - EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); + try { + MessageParser.EntityLink entityLink = + MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE)); + String entityType = entityLink.getEntityType(); + EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); - String certification = - Optional.ofNullable(certificationExpr) - .map(certificationExpr -> (String) certificationExpr.getValue(execution)) - .orElse(null); - String user = - Optional.ofNullable((String) execution.getVariable(RESOLVED_BY_VARIABLE)) - .orElse(entity.getUpdatedBy()); + String certification = + Optional.ofNullable(certificationExpr) + .map(certificationExpr -> (String) certificationExpr.getValue(execution)) + .orElse(null); + String user = + Optional.ofNullable((String) execution.getVariable(RESOLVED_BY_VARIABLE)) + .orElse(entity.getUpdatedBy()); - setStatus(entity, entityType, user, certification); + setStatus(entity, entityType, user, certification); + } catch (Exception exc) { + LOG.error( + String.format( + "[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())), + exc); + execution.setVariable(EXCEPTION_VARIABLE, exc.toString()); + throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage()); + } } private void setStatus( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetGlossaryTermStatusImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetGlossaryTermStatusImpl.java index 70e126d7e9a..6d2d4060cfd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetGlossaryTermStatusImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetGlossaryTermStatusImpl.java @@ -1,12 +1,17 @@ package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RESOLVED_BY_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; import java.util.Objects; import java.util.Optional; import javax.json.JsonPatch; +import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.BpmnError; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; import org.openmetadata.schema.entity.data.GlossaryTerm; @@ -16,21 +21,31 @@ import org.openmetadata.service.jdbi3.GlossaryTermRepository; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.util.JsonUtils; +@Slf4j public class SetGlossaryTermStatusImpl implements JavaDelegate { private Expression statusExpr; @Override public void execute(DelegateExecution execution) { - MessageParser.EntityLink entityLink = - MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE)); - GlossaryTerm glossaryTerm = Entity.getEntity(entityLink, "*", Include.ALL); + try { + MessageParser.EntityLink entityLink = + MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE)); + GlossaryTerm glossaryTerm = Entity.getEntity(entityLink, "*", Include.ALL); - String status = (String) statusExpr.getValue(execution); - String user = - Optional.ofNullable((String) execution.getVariable(RESOLVED_BY_VARIABLE)) - .orElse(glossaryTerm.getUpdatedBy()); + String status = (String) statusExpr.getValue(execution); + String user = + Optional.ofNullable((String) execution.getVariable(RESOLVED_BY_VARIABLE)) + .orElse(glossaryTerm.getUpdatedBy()); - setStatus(glossaryTerm, user, status); + setStatus(glossaryTerm, user, status); + } catch (Exception exc) { + LOG.error( + String.format( + "[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())), + exc); + execution.setVariable(EXCEPTION_VARIABLE, exc.toString()); + throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage()); + } } private void setStatus(GlossaryTerm glossaryTerm, String user, String status) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/endEvent/EndEvent.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/endEvent/EndEvent.java index 29389e3d745..377142d31c9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/endEvent/EndEvent.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/endEvent/EndEvent.java @@ -1,14 +1,21 @@ package org.openmetadata.service.governance.workflows.elements.nodes.endEvent; +import lombok.Getter; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.Process; import org.openmetadata.schema.governance.workflows.elements.nodes.endEvent.EndEventDefinition; import org.openmetadata.service.governance.workflows.elements.NodeInterface; import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder; +@Getter public class EndEvent implements NodeInterface { private final org.flowable.bpmn.model.EndEvent endEvent; + public EndEvent(String id) { + this.endEvent = new EndEventBuilder().id(id).build(); + attachWorkflowInstanceStageListeners(endEvent); + } + public EndEvent(EndEventDefinition nodeDefinition) { this.endEvent = new EndEventBuilder().id(nodeDefinition.getName()).build(); attachWorkflowInstanceStageListeners(endEvent); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/UserApprovalTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/UserApprovalTask.java index 3a653d6800b..464cf57c96a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/UserApprovalTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/UserApprovalTask.java @@ -34,6 +34,7 @@ import org.openmetadata.service.util.JsonUtils; public class UserApprovalTask implements NodeInterface { private final SubProcess subProcess; + private final BoundaryEvent runtimeExceptionBoundaryEvent; private final List messages = new ArrayList<>(); public UserApprovalTask(UserApprovalTaskDefinition nodeDefinition) { @@ -92,9 +93,15 @@ public class UserApprovalTask implements NodeInterface { attachWorkflowInstanceStageListeners(subProcess); + this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess); this.subProcess = subProcess; } + @Override + public BoundaryEvent getRuntimeExceptionBoundaryEvent() { + return runtimeExceptionBoundaryEvent; + } + private ServiceTask getSetAssigneesVariableServiceTask( String subProcessId, FieldExtension assigneesExpr, FieldExtension assigneesVarNameExpr) { ServiceTask serviceTask = @@ -146,6 +153,7 @@ public class UserApprovalTask implements NodeInterface { public void addToWorkflow(BpmnModel model, Process process) { process.addFlowElement(subProcess); + process.addFlowElement(runtimeExceptionBoundaryEvent); for (Message message : messages) { model.addMessage(message); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/CreateApprovalTaskImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/CreateApprovalTaskImpl.java index bd8df6612fc..58b3c922d67 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/CreateApprovalTaskImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/CreateApprovalTaskImpl.java @@ -1,12 +1,17 @@ package org.openmetadata.service.governance.workflows.elements.nodes.userTask.impl; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.STAGE_INSTANCE_STATE_ID_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.delegate.BpmnError; import org.flowable.engine.delegate.TaskListener; import org.flowable.identitylink.api.IdentityLink; import org.flowable.task.service.delegate.DelegateTask; @@ -27,23 +32,35 @@ import org.openmetadata.service.resources.feeds.FeedResource; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.util.WebsocketNotificationHandler; +@Slf4j public class CreateApprovalTaskImpl implements TaskListener { @Override public void notify(DelegateTask delegateTask) { - List assignees = getAssignees(delegateTask); - MessageParser.EntityLink entityLink = - MessageParser.EntityLink.parse((String) delegateTask.getVariable(RELATED_ENTITY_VARIABLE)); - GlossaryTerm entity = Entity.getEntity(entityLink, "*", Include.ALL); + try { + List assignees = getAssignees(delegateTask); + MessageParser.EntityLink entityLink = + MessageParser.EntityLink.parse( + (String) delegateTask.getVariable(RELATED_ENTITY_VARIABLE)); + GlossaryTerm entity = Entity.getEntity(entityLink, "*", Include.ALL); - Thread task = createApprovalTask(entity, assignees); - WorkflowHandler.getInstance().setCustomTaskId(delegateTask.getId(), task.getId()); + Thread task = createApprovalTask(entity, assignees); + WorkflowHandler.getInstance().setCustomTaskId(delegateTask.getId(), task.getId()); - UUID workflowInstanceStateId = - (UUID) delegateTask.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE); - WorkflowInstanceStateRepository workflowInstanceStateRepository = - (WorkflowInstanceStateRepository) - Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); - workflowInstanceStateRepository.updateStageWithTask(task.getId(), workflowInstanceStateId); + UUID workflowInstanceStateId = + (UUID) delegateTask.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE); + WorkflowInstanceStateRepository workflowInstanceStateRepository = + (WorkflowInstanceStateRepository) + Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); + workflowInstanceStateRepository.updateStageWithTask(task.getId(), workflowInstanceStateId); + } catch (Exception exc) { + LOG.error( + String.format( + "[%s] Failure: ", + getProcessDefinitionKeyFromId(delegateTask.getProcessDefinitionId())), + exc); + delegateTask.setVariable(EXCEPTION_VARIABLE, exc.toString()); + throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage()); + } } private List getAssignees(DelegateTask delegateTask) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/SetApprovalAssigneesImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/SetApprovalAssigneesImpl.java index c6fadbcb4f0..f3a2ed0585d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/SetApprovalAssigneesImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/SetApprovalAssigneesImpl.java @@ -1,12 +1,17 @@ package org.openmetadata.service.governance.workflows.elements.nodes.userTask.impl; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.BpmnError; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; import org.openmetadata.schema.EntityInterface; @@ -16,33 +21,44 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.util.JsonUtils; +@Slf4j public class SetApprovalAssigneesImpl implements JavaDelegate { private Expression assigneesExpr; private Expression assigneesVarNameExpr; @Override public void execute(DelegateExecution execution) { - Map assigneesConfig = - JsonUtils.readOrConvertValue(assigneesExpr.getValue(execution), Map.class); - Boolean addReviewers = (Boolean) assigneesConfig.get("addReviewers"); - Optional> oExtraAssignees = - Optional.ofNullable( - JsonUtils.readOrConvertValue(assigneesConfig.get("extraAssignees"), List.class)); + try { + Map assigneesConfig = + JsonUtils.readOrConvertValue(assigneesExpr.getValue(execution), Map.class); + Boolean addReviewers = (Boolean) assigneesConfig.get("addReviewers"); + Optional> oExtraAssignees = + Optional.ofNullable( + JsonUtils.readOrConvertValue(assigneesConfig.get("extraAssignees"), List.class)); - List assignees = new ArrayList<>(); + List assignees = new ArrayList<>(); - if (addReviewers) { - MessageParser.EntityLink entityLink = - MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE)); - EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); - assignees.addAll(getEntityLinkStringFromEntityReference(entity.getReviewers())); + if (addReviewers) { + MessageParser.EntityLink entityLink = + MessageParser.EntityLink.parse((String) execution.getVariable(RELATED_ENTITY_VARIABLE)); + EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); + assignees.addAll(getEntityLinkStringFromEntityReference(entity.getReviewers())); + } + + oExtraAssignees.ifPresent( + extraAssignees -> + assignees.addAll(getEntityLinkStringFromEntityReference(extraAssignees))); + + execution.setVariableLocal( + assigneesVarNameExpr.getValue(execution).toString(), JsonUtils.pojoToJson(assignees)); + } catch (Exception exc) { + LOG.error( + String.format( + "[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())), + exc); + execution.setVariable(EXCEPTION_VARIABLE, exc.toString()); + throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage()); } - - oExtraAssignees.ifPresent( - extraAssignees -> assignees.addAll(getEntityLinkStringFromEntityReference(extraAssignees))); - - execution.setVariableLocal( - assigneesVarNameExpr.getValue(execution).toString(), JsonUtils.pojoToJson(assignees)); } private List getEntityLinkStringFromEntityReference(List assignees) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/SetCandidateUsersImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/SetCandidateUsersImpl.java index cdd423c3851..ee3234593f3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/SetCandidateUsersImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/impl/SetCandidateUsersImpl.java @@ -1,20 +1,37 @@ package org.openmetadata.service.governance.workflows.elements.nodes.userTask.impl; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; + import java.util.List; +import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.BpmnError; import org.flowable.engine.delegate.TaskListener; import org.flowable.task.service.delegate.DelegateTask; import org.openmetadata.service.util.JsonUtils; +@Slf4j public class SetCandidateUsersImpl implements TaskListener { private Expression assigneesVarNameExpr; @Override public void notify(DelegateTask delegateTask) { - List assignees = - JsonUtils.readOrConvertValue( - delegateTask.getVariable(assigneesVarNameExpr.getValue(delegateTask).toString()), - List.class); - delegateTask.addCandidateUsers(assignees); + try { + List assignees = + JsonUtils.readOrConvertValue( + delegateTask.getVariable(assigneesVarNameExpr.getValue(delegateTask).toString()), + List.class); + delegateTask.addCandidateUsers(assignees); + } catch (Exception exc) { + LOG.error( + String.format( + "[%s] Failure: ", + getProcessDefinitionKeyFromId(delegateTask.getProcessDefinitionId())), + exc); + delegateTask.setVariable(EXCEPTION_VARIABLE, exc.toString()); + throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage()); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java index 9f4a8ce47f8..f8761bf9665 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java @@ -1,15 +1,19 @@ package org.openmetadata.service.governance.workflows.elements.triggers; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; import lombok.Getter; +import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.CallActivity; import org.flowable.bpmn.model.EndEvent; +import org.flowable.bpmn.model.ErrorEventDefinition; import org.flowable.bpmn.model.FieldExtension; import org.flowable.bpmn.model.IOParameter; import org.flowable.bpmn.model.Process; @@ -55,6 +59,21 @@ public class EventBasedEntityTrigger implements TriggerInterface { CallActivity workflowTrigger = getWorkflowTrigger(triggerWorkflowId, mainWorkflowName); process.addFlowElement(workflowTrigger); + ErrorEventDefinition runtimeExceptionDefinition = new ErrorEventDefinition(); + runtimeExceptionDefinition.setErrorCode(WORKFLOW_RUNTIME_EXCEPTION); + + BoundaryEvent runtimeExceptionBoundaryEvent = new BoundaryEvent(); + runtimeExceptionBoundaryEvent.setId( + getFlowableElementId(workflowTrigger.getId(), "runtimeExceptionBoundaryEvent")); + runtimeExceptionBoundaryEvent.addEventDefinition(runtimeExceptionDefinition); + + runtimeExceptionBoundaryEvent.setAttachedToRef(workflowTrigger); + process.addFlowElement(runtimeExceptionBoundaryEvent); + + EndEvent errorEndEvent = + new EndEventBuilder().id(getFlowableElementId(triggerWorkflowId, "errorEndEvent")).build(); + process.addFlowElement(errorEndEvent); + EndEvent endEvent = new EndEventBuilder().id(getFlowableElementId(triggerWorkflowId, "endEvent")).build(); process.addFlowElement(endEvent); @@ -77,6 +96,8 @@ public class EventBasedEntityTrigger implements TriggerInterface { process.addFlowElement(filterNotPassed); // WorkflowTrigger -> End process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), endEvent.getId())); + process.addFlowElement( + new SequenceFlow(runtimeExceptionBoundaryEvent.getId(), errorEndEvent.getId())); this.process = process; this.triggerWorkflowId = triggerWorkflowId; @@ -126,7 +147,12 @@ public class EventBasedEntityTrigger implements TriggerInterface { inputParameter.setSource(RELATED_ENTITY_VARIABLE); inputParameter.setTarget(RELATED_ENTITY_VARIABLE); + IOParameter outputParameter = new IOParameter(); + outputParameter.setSource(EXCEPTION_VARIABLE); + outputParameter.setTarget(EXCEPTION_VARIABLE); + workflowTrigger.setInParameters(List.of(inputParameter)); + workflowTrigger.setOutParameters(List.of(outputParameter)); return workflowTrigger; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java index 84935751035..3a9eb500e80 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java @@ -1,5 +1,6 @@ package org.openmetadata.service.governance.workflows.elements.triggers; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; @@ -121,7 +122,12 @@ public class PeriodicBatchEntityTrigger implements TriggerInterface { inputParameter.setSource(RELATED_ENTITY_VARIABLE); inputParameter.setTarget(RELATED_ENTITY_VARIABLE); + IOParameter outputParameter = new IOParameter(); + outputParameter.setSource(EXCEPTION_VARIABLE); + outputParameter.setTarget(EXCEPTION_VARIABLE); + workflowTrigger.setInParameters(List.of(inputParameter)); + workflowTrigger.setOutParameters(List.of(outputParameter)); workflowTrigger.setLoopCharacteristics(multiInstance); return workflowTrigger; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java index e6d44cc29e1..4fc00c29b66 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java @@ -1,22 +1,28 @@ package org.openmetadata.service.governance.workflows.flowable; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import lombok.Getter; +import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.Process; +import org.flowable.bpmn.model.SequenceFlow; import org.openmetadata.schema.governance.workflows.WorkflowDefinition; import org.openmetadata.schema.governance.workflows.elements.EdgeDefinition; import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface; import org.openmetadata.service.governance.workflows.elements.Edge; import org.openmetadata.service.governance.workflows.elements.NodeFactory; +import org.openmetadata.service.governance.workflows.elements.NodeInterface; +import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent; import org.openmetadata.service.util.JsonUtils; @Getter public class MainWorkflow { private final BpmnModel model; private final String workflowName; + private final List runtimeExceptionBoundaryEvents = new ArrayList<>(); public MainWorkflow(WorkflowDefinition workflowDefinition) { BpmnModel model = new BpmnModel(); @@ -33,8 +39,12 @@ public class MainWorkflow { // Add Nodes for (Object nodeDefinitionObj : (List) workflowDefinition.getNodes()) { - NodeFactory.createNode(JsonUtils.readOrConvertValue(nodeDefinitionObj, Map.class)) - .addToWorkflow(model, process); + NodeInterface node = + NodeFactory.createNode(JsonUtils.readOrConvertValue(nodeDefinitionObj, Map.class)); + node.addToWorkflow(model, process); + + Optional.ofNullable(node.getRuntimeExceptionBoundaryEvent()) + .ifPresent(runtimeExceptionBoundaryEvents::add); } // Add Edges @@ -43,7 +53,18 @@ public class MainWorkflow { edge.addToWorkflow(model, process); } + // Configure Exception Flow + configureRuntimeExceptionFlow(process); + this.model = model; this.workflowName = workflowName; } + + private void configureRuntimeExceptionFlow(Process process) { + EndEvent errorEndEvent = new EndEvent("Error"); + process.addFlowElement(errorEndEvent.getEndEvent()); + for (BoundaryEvent event : runtimeExceptionBoundaryEvents) { + process.addFlowElement(new SequenceFlow(event.getId(), errorEndEvent.getEndEvent().getId())); + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java index acf8de971d9..17103ba77a0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java @@ -197,6 +197,9 @@ public class FeedRepository { if (repository.supportsTags) { fieldList.add("tags"); } + if (repository.supportsReviewers) { + fieldList.add("reviewers"); + } return String.join(",", fieldList.toArray(new String[0])); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/GlossaryTermRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/GlossaryTermRepository.java index 000dcda39d9..4e01a8cb944 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/GlossaryTermRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/GlossaryTermRepository.java @@ -598,6 +598,9 @@ public class GlossaryTermRepository extends EntityRepository { @Override public EntityInterface performTask(String user, ResolveTask resolveTask) { // TODO: Resolve this outside + GlossaryTerm glossaryTerm = (GlossaryTerm) threadContext.getAboutEntity(); + checkUpdatedByReviewer(glossaryTerm, user); + UUID taskId = threadContext.getThread().getId(); Map variables = new HashMap<>(); variables.put(RESULT_VARIABLE, resolveTask.getNewValue().equalsIgnoreCase("approved")); @@ -607,7 +610,6 @@ public class GlossaryTermRepository extends EntityRepository { // TODO: performTask returns the updated Entity and the flow applies the new value. // This should be changed with the new Governance Workflows. - GlossaryTerm glossaryTerm = (GlossaryTerm) threadContext.getAboutEntity(); // glossaryTerm.setStatus(Status.APPROVED); return glossaryTerm; } @@ -652,7 +654,7 @@ public class GlossaryTermRepository extends EntityRepository { } } - private void checkUpdatedByReviewer(GlossaryTerm term, String updatedBy) { + public static void checkUpdatedByReviewer(GlossaryTerm term, String updatedBy) { // Only list of allowed reviewers can change the status from DRAFT to APPROVED List reviewers = term.getReviewers(); if (!nullOrEmpty(reviewers)) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java index 07d314eca3a..33c71f7c643 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java @@ -1,5 +1,7 @@ package org.openmetadata.service.jdbi3; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; + import java.util.Map; import java.util.UUID; import org.openmetadata.schema.governance.workflows.WorkflowInstance; @@ -41,12 +43,17 @@ public class WorkflowInstanceRepository extends EntityTimeSeriesRepository variables) { WorkflowInstance workflowInstance = JsonUtils.readValue(timeSeriesDao.getById(workflowInstanceId), WorkflowInstance.class); workflowInstance.setEndedAt(endedAt); + if (variables.containsKey(EXCEPTION_VARIABLE)) { + workflowInstance.setException(true); + } + getTimeSeriesDao().update(JsonUtils.pojoToJson(workflowInstance), workflowInstanceId); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java index c578a7db902..bcea6ac0b62 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java @@ -1,5 +1,7 @@ package org.openmetadata.service.jdbi3; +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; + import java.util.Map; import java.util.UUID; import org.openmetadata.schema.governance.workflows.Stage; @@ -61,6 +63,10 @@ public class WorkflowInstanceStateRepository stage.setEndedAt(endedAt); stage.setVariables(variables); + if (variables.containsKey(EXCEPTION_VARIABLE)) { + workflowInstanceState.setException(true); + } + workflowInstanceState.setStage(stage); getTimeSeriesDao().update(JsonUtils.pojoToJson(workflowInstanceState), workflowInstanceStateId); diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstance.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstance.json index 81beb132d15..c63feb258fa 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstance.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstance.json @@ -30,6 +30,10 @@ "timestamp": { "description": "Timestamp on which the workflow instance state was created.", "$ref": "../../type/basic.json#/definitions/timestamp" + }, + "exception": { + "description": "If the Workflow Instance has errors, 'True'. Else, 'False'.", + "type": "boolean" } }, "required": [], diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstanceState.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstanceState.json index 01ed9dc19a1..fb11362f3b7 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstanceState.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstanceState.json @@ -54,6 +54,10 @@ "timestamp": { "description": "Timestamp on which the workflow instance state was created.", "$ref": "../../type/basic.json#/definitions/timestamp" + }, + "exception": { + "description": "If the Workflow Instance has errors, 'True'. Else, 'False'.", + "type": "boolean" } }, "required": [],