MINOR: Remove workflow instance states (#18959)

* Remove Workflow Instance States until better solution is found

* Remove Workflow Instance States until better solution is found

* Remove Workflow Instance States until better solution is found

* Fix issue with OpenSearchClient
This commit is contained in:
IceS2 2024-12-09 15:43:37 +01:00 committed by GitHub
parent 663e7f4c50
commit de0889d590
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 12 additions and 34 deletions

View File

@ -1,6 +1,5 @@
package org.openmetadata.service.governance.workflows; package org.openmetadata.service.governance.workflows;
import static org.openmetadata.service.governance.workflows.Workflow.STAGE_INSTANCE_STATE_ID_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
import java.util.UUID; import java.util.UUID;
@ -9,21 +8,12 @@ import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate; import org.flowable.engine.delegate.JavaDelegate;
import org.openmetadata.service.Entity; import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.WorkflowInstanceRepository; import org.openmetadata.service.jdbi3.WorkflowInstanceRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;
@Slf4j @Slf4j
public class MainWorkflowTerminationListener implements JavaDelegate { public class MainWorkflowTerminationListener implements JavaDelegate {
@Override @Override
public void execute(DelegateExecution execution) { public void execute(DelegateExecution execution) {
try { try {
WorkflowInstanceStateRepository workflowInstanceStateRepository =
(WorkflowInstanceStateRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
UUID workflowInstanceStateId = (UUID) execution.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE);
workflowInstanceStateRepository.updateStage(
workflowInstanceStateId, System.currentTimeMillis(), execution.getVariables());
WorkflowInstanceRepository workflowInstanceRepository = WorkflowInstanceRepository workflowInstanceRepository =
(WorkflowInstanceRepository) (WorkflowInstanceRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE); Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE);

View File

@ -45,8 +45,6 @@ public class CheckEntityAttributesTask implements NodeInterface {
subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), checkEntityAttributes.getId())); subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), checkEntityAttributes.getId()));
subProcess.addFlowElement(new SequenceFlow(checkEntityAttributes.getId(), endEvent.getId())); subProcess.addFlowElement(new SequenceFlow(checkEntityAttributes.getId(), endEvent.getId()));
attachWorkflowInstanceStageListeners(subProcess);
this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess); this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess; this.subProcess = subProcess;
} }

View File

@ -50,8 +50,6 @@ public class SetEntityCertificationTask implements NodeInterface {
subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), setEntityCertification.getId())); subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), setEntityCertification.getId()));
subProcess.addFlowElement(new SequenceFlow(setEntityCertification.getId(), endEvent.getId())); subProcess.addFlowElement(new SequenceFlow(setEntityCertification.getId(), endEvent.getId()));
attachWorkflowInstanceStageListeners(subProcess);
this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess); this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess; this.subProcess = subProcess;
} }

View File

@ -46,8 +46,6 @@ public class SetGlossaryTermStatusTask implements NodeInterface {
subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), setGlossaryTermStatus.getId())); subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), setGlossaryTermStatus.getId()));
subProcess.addFlowElement(new SequenceFlow(setGlossaryTermStatus.getId(), endEvent.getId())); subProcess.addFlowElement(new SequenceFlow(setGlossaryTermStatus.getId(), endEvent.getId()));
attachWorkflowInstanceStageListeners(subProcess);
this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess); this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess; this.subProcess = subProcess;
} }

View File

@ -13,12 +13,10 @@ public class EndEvent implements NodeInterface {
public EndEvent(String id) { public EndEvent(String id) {
this.endEvent = new EndEventBuilder().id(id).build(); this.endEvent = new EndEventBuilder().id(id).build();
attachWorkflowInstanceStageListeners(endEvent);
} }
public EndEvent(EndEventDefinition nodeDefinition) { public EndEvent(EndEventDefinition nodeDefinition) {
this.endEvent = new EndEventBuilder().id(nodeDefinition.getName()).build(); this.endEvent = new EndEventBuilder().id(nodeDefinition.getName()).build();
attachWorkflowInstanceStageListeners(endEvent);
} }
public void addToWorkflow(BpmnModel model, Process process) { public void addToWorkflow(BpmnModel model, Process process) {

View File

@ -12,7 +12,6 @@ public class StartEvent implements NodeInterface {
public StartEvent(StartEventDefinition nodeDefinition) { public StartEvent(StartEventDefinition nodeDefinition) {
this.startEvent = new StartEventBuilder().id(nodeDefinition.getName()).build(); this.startEvent = new StartEventBuilder().id(nodeDefinition.getName()).build();
attachWorkflowInstanceExecutionIdSetterListener(startEvent); attachWorkflowInstanceExecutionIdSetterListener(startEvent);
attachWorkflowInstanceStageListeners(startEvent);
} }
public void addToWorkflow(BpmnModel model, Process process) { public void addToWorkflow(BpmnModel model, Process process) {

View File

@ -91,8 +91,6 @@ public class UserApprovalTask implements NodeInterface {
subProcess.addFlowElement(new SequenceFlow(userTask.getId(), endEvent.getId())); subProcess.addFlowElement(new SequenceFlow(userTask.getId(), endEvent.getId()));
subProcess.addFlowElement(new SequenceFlow(terminationEvent.getId(), terminatedEvent.getId())); subProcess.addFlowElement(new SequenceFlow(terminationEvent.getId(), terminatedEvent.getId()));
attachWorkflowInstanceStageListeners(subProcess);
this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess); this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
this.subProcess = subProcess; this.subProcess = subProcess;
} }

View File

@ -2,7 +2,6 @@ package org.openmetadata.service.governance.workflows.elements.nodes.userTask.im
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.STAGE_INSTANCE_STATE_ID_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
@ -27,7 +26,6 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.governance.workflows.WorkflowHandler; import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.jdbi3.FeedRepository; import org.openmetadata.service.jdbi3.FeedRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;
import org.openmetadata.service.resources.feeds.FeedResource; import org.openmetadata.service.resources.feeds.FeedResource;
import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.util.WebsocketNotificationHandler; import org.openmetadata.service.util.WebsocketNotificationHandler;
@ -45,13 +43,6 @@ public class CreateApprovalTaskImpl implements TaskListener {
Thread task = createApprovalTask(entity, assignees); Thread task = createApprovalTask(entity, assignees);
WorkflowHandler.getInstance().setCustomTaskId(delegateTask.getId(), task.getId()); WorkflowHandler.getInstance().setCustomTaskId(delegateTask.getId(), task.getId());
UUID workflowInstanceStateId =
(UUID) delegateTask.getVariable(STAGE_INSTANCE_STATE_ID_VARIABLE);
WorkflowInstanceStateRepository workflowInstanceStateRepository =
(WorkflowInstanceStateRepository)
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
workflowInstanceStateRepository.updateStageWithTask(task.getId(), workflowInstanceStateId);
} catch (Exception exc) { } catch (Exception exc) {
LOG.error( LOG.error(
String.format( String.format(

View File

@ -3,6 +3,7 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import org.openmetadata.schema.governance.workflows.WorkflowInstance; import org.openmetadata.schema.governance.workflows.WorkflowInstance;
import org.openmetadata.service.Entity; import org.openmetadata.service.Entity;
@ -50,7 +51,7 @@ public class WorkflowInstanceRepository extends EntityTimeSeriesRepository<Workf
workflowInstance.setEndedAt(endedAt); workflowInstance.setEndedAt(endedAt);
if (variables.containsKey(EXCEPTION_VARIABLE)) { if (Optional.ofNullable(variables.getOrDefault(EXCEPTION_VARIABLE, null)).isPresent()) {
workflowInstance.setException(true); workflowInstance.setException(true);
} }

View File

@ -737,9 +737,16 @@ public class OpenSearchClient implements SearchClient {
new os.org.opensearch.action.search.SearchRequest(index).source(searchSourceBuilder), new os.org.opensearch.action.search.SearchRequest(index).source(searchSourceBuilder),
RequestOptions.DEFAULT); RequestOptions.DEFAULT);
SearchHits searchHits = response.getHits(); SearchHits searchHits = response.getHits();
SearchHit[] hits = searchHits.getHits(); List<SearchHit> hits = List.of(searchHits.getHits());
Arrays.stream(hits).forEach(hit -> results.add(hit.getSourceAsMap())); Object[] lastHitSortValues = null;
return new SearchResultListMapper(results, searchHits.getTotalHits().value);
if (!hits.isEmpty()) {
lastHitSortValues = hits.get(hits.size() - 1).getSortValues();
}
hits.forEach(hit -> results.add(hit.getSourceAsMap()));
return new SearchResultListMapper(
results, searchHits.getTotalHits().value, lastHitSortValues);
} catch (OpenSearchStatusException e) { } catch (OpenSearchStatusException e) {
if (e.status() == RestStatus.NOT_FOUND) { if (e.status() == RestStatus.NOT_FOUND) {
throw new SearchIndexNotFoundException(String.format("Failed to to find index %s", index)); throw new SearchIndexNotFoundException(String.format("Failed to to find index %s", index));