From 9d51add73926af724da0e59e12908b47c46713a9 Mon Sep 17 00:00:00 2001 From: IceS2 Date: Wed, 19 Feb 2025 13:49:46 +0100 Subject: [PATCH] MINOR: Add run app task (#19841) * Add Run App Task * Work on Run App Task * Rollback task changes * Fixing type convertion * Fix Run External App wait * Fix Run App Task --------- Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com> --- .../workflows/elements/NodeFactory.java | 3 + .../automatedTask/runApp/RunAppDelegate.java | 64 ++++++ .../automatedTask/runApp/RunAppImpl.java | 199 ++++++++++++++++++ .../automatedTask/runApp/RunAppTask.java | 110 ++++++++++ .../workflows/flowable/BaseDelegate.java | 44 ++++ .../WorkflowNodeDefinitionInterface.java | 2 + .../workflows/elements/nodeSubType.json | 1 + .../createIngestionPipelineTask.json | 7 + .../nodes/automatedTask/runAppTask.json | 87 ++++++++ .../runIngestionPipelineTask.json | 7 + .../workflows/elements/nodeSubType.ts | 1 + .../createIngestionPipelineTask.ts | 10 +- .../nodes/automatedTask/runAppTask.ts | 55 +++++ .../automatedTask/runIngestionPipelineTask.ts | 8 +- 14 files changed, 596 insertions(+), 2 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppDelegate.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppTask.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/BaseDelegate.java create mode 100644 openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/runAppTask.json create mode 100644 openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/runAppTask.ts diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeFactory.java index d6037032431..c7265a5c91b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeFactory.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeFactory.java @@ -4,6 +4,7 @@ import org.openmetadata.schema.governance.workflows.elements.NodeSubType; import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CreateIngestionPipelineTaskDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunIngestionPipelineTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTaskDefinition; @@ -16,6 +17,7 @@ import org.openmetadata.service.governance.workflows.elements.nodes.automatedTas import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.RunIngestionPipelineTask; import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTask; import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTask; +import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.runApp.RunAppTask; import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent; import org.openmetadata.service.governance.workflows.elements.nodes.gateway.ParallelGateway; import org.openmetadata.service.governance.workflows.elements.nodes.startEvent.StartEvent; @@ -37,6 +39,7 @@ public class NodeFactory { (CreateIngestionPipelineTaskDefinition) nodeDefinition); case RUN_INGESTION_PIPELINE_TASK -> new RunIngestionPipelineTask( (RunIngestionPipelineTaskDefinition) nodeDefinition); + case RUN_APP_TASK -> new RunAppTask((RunAppTaskDefinition) nodeDefinition); case PARALLEL_GATEWAY -> new ParallelGateway((ParallelGatewayDefinition) nodeDefinition); }; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppDelegate.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppDelegate.java new file mode 100644 index 00000000000..0e1fb26f18d --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppDelegate.java @@ -0,0 +1,64 @@ +package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.runApp; + +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; + +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.BpmnError; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.JavaDelegate; +import org.openmetadata.sdk.PipelineServiceClientInterface; +import org.openmetadata.service.governance.workflows.WorkflowVariableHandler; +import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.util.JsonUtils; + +@Slf4j +public class RunAppDelegate implements JavaDelegate { + private Expression inputNamespaceMapExpr; + private Expression pipelineServiceClientExpr; + private Expression appNameExpr; + private Expression waitForCompletionExpr; + private Expression timeoutSecondsExpr; + + @Override + public void execute(DelegateExecution execution) { + WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution); + try { + Map inputNamespaceMap = + JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class); + + String appName = (String) appNameExpr.getValue(execution); + boolean waitForCompletion = + Boolean.parseBoolean((String) waitForCompletionExpr.getValue(execution)); + long timeoutSeconds = Long.parseLong((String) timeoutSecondsExpr.getValue(execution)); + + PipelineServiceClientInterface pipelineServiceClient = + (PipelineServiceClientInterface) pipelineServiceClientExpr.getValue(execution); + + MessageParser.EntityLink entityLink = + MessageParser.EntityLink.parse( + (String) + varHandler.getNamespacedVariable( + inputNamespaceMap.get(RELATED_ENTITY_VARIABLE), RELATED_ENTITY_VARIABLE)); + + boolean success = + new RunAppImpl() + .execute( + pipelineServiceClient, appName, waitForCompletion, timeoutSeconds, entityLink); + + varHandler.setNodeVariable(RESULT_VARIABLE, success); + } catch (Exception exc) { + LOG.error( + String.format( + "[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())), + exc); + varHandler.setGlobalVariable(EXCEPTION_VARIABLE, exc.toString()); + throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage()); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java new file mode 100644 index 00000000000..c3e9860194d --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java @@ -0,0 +1,199 @@ +package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.runApp; + +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.service.util.EntityUtil.Fields.EMPTY_FIELDS; + +import java.util.List; +import java.util.Set; +import javax.json.JsonPatch; +import lombok.SneakyThrows; +import org.openmetadata.schema.ServiceEntityInterface; +import org.openmetadata.schema.entity.app.App; +import org.openmetadata.schema.entity.app.AppRunRecord; +import org.openmetadata.schema.entity.app.AppType; +import org.openmetadata.schema.entity.app.external.CollateAIAppConfig; +import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType; +import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.Include; +import org.openmetadata.sdk.PipelineServiceClientInterface; +import org.openmetadata.service.Entity; +import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.apps.ApplicationHandler; +import org.openmetadata.service.jdbi3.AppRepository; +import org.openmetadata.service.jdbi3.IngestionPipelineRepository; +import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.util.EntityUtil; +import org.openmetadata.service.util.JsonUtils; +import org.openmetadata.service.util.OpenMetadataConnectionBuilder; + +public class RunAppImpl { + public boolean execute( + PipelineServiceClientInterface pipelineServiceClient, + String appName, + boolean waitForCompletion, + long timeoutSeconds, + MessageParser.EntityLink entityLink) { + ServiceEntityInterface service = Entity.getEntity(entityLink, "owners", Include.NON_DELETED); + + AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); + App app = + appRepository.getByName(null, appName, new EntityUtil.Fields(Set.of("bot", "pipelines"))); + + if (!validateAppShouldRun(app, service)) { + return true; + } + + App updatedApp = getUpdatedApp(app, service); + + updateApp(appRepository, app, updatedApp); + + long startTime = System.currentTimeMillis(); + long timeoutMillis = timeoutSeconds * 1000; + boolean success = true; + + if (app.getAppType().equals(AppType.Internal)) { + success = runApp(appRepository, app, waitForCompletion, startTime, timeoutMillis); + } else { + success = runApp(pipelineServiceClient, app, waitForCompletion, startTime, timeoutMillis); + } + + updateApp(appRepository, updatedApp, app); + return success; + } + + private boolean validateAppShouldRun(App app, ServiceEntityInterface service) { + // We only want to run the CollateAIApplication for Databases + if (Entity.getEntityTypeFromObject(service).equals(Entity.DATABASE_SERVICE) + && app.getName().equals("CollateAIApplication")) { + return true; + } else { + return false; + } + } + + private App getUpdatedApp(App app, ServiceEntityInterface service) { + App updatedApp = JsonUtils.deepCopy(app, App.class); + Object updatedConfig = JsonUtils.deepCopy(app.getAppConfiguration(), Object.class); + + if (app.getName().equals("CollateAIApplication")) { + (JsonUtils.convertValue(updatedConfig, CollateAIAppConfig.class)) + .withFilter( + String.format( + "{\"query\":{\"bool\":{\"must\":[{\"bool\":{\"must\":[{\"term\":{\"Tier.TagFQN\":\"Tier.Tier1\"}}]}},{\"bool\":{\"must\":[{\"term\":{\"entityType\":\"table\"}}]}},{\"bool\":{\"must\":[{\"term\":{\"service.name.keyword\":\"%s\"}}]}}]}}}", + service.getName().toLowerCase())); + } + updatedApp.withAppConfiguration(updatedConfig); + return updatedApp; + } + + private void updateApp(AppRepository repository, App originalApp, App updatedApp) { + JsonPatch patch = JsonUtils.getJsonPatch(originalApp, updatedApp); + repository.patch(null, originalApp.getId(), "admin", patch); + } + + // Internal App Logic + @SneakyThrows + private boolean runApp( + AppRepository repository, + App app, + boolean waitForCompletion, + long startTime, + long timeoutMillis) { + ApplicationHandler.getInstance() + .triggerApplicationOnDemand(app, Entity.getCollectionDAO(), Entity.getSearchRepository()); + + if (waitForCompletion) { + return waitForCompletion(repository, app, startTime, timeoutMillis); + } else { + return true; + } + } + + private boolean waitForCompletion( + AppRepository repository, App app, long startTime, long timeoutMillis) { + AppRunRecord appRunRecord = null; + + do { + try { + if (System.currentTimeMillis() - startTime > timeoutMillis) { + return false; + } + appRunRecord = repository.getLatestAppRunsAfterStartTime(app, startTime); + } catch (Exception ignore) { + } + } while (!isRunCompleted(appRunRecord)); + + return appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS) + || appRunRecord.getStatus().equals(AppRunRecord.Status.COMPLETED); + } + + private boolean isRunCompleted(AppRunRecord appRunRecord) { + if (appRunRecord == null) { + return false; + } + return !nullOrEmpty(appRunRecord.getExecutionTime()); + } + + // External App Logic + private boolean runApp( + PipelineServiceClientInterface pipelineServiceClient, + App app, + boolean waitForCompletion, + long startTime, + long timeoutMillis) { + EntityReference pipelineRef = app.getPipelines().get(0); + + IngestionPipelineRepository repository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + OpenMetadataApplicationConfig config = repository.getOpenMetadataApplicationConfig(); + + IngestionPipeline ingestionPipeline = repository.get(null, pipelineRef.getId(), EMPTY_FIELDS); + ingestionPipeline.setOpenMetadataServerConnection( + new OpenMetadataConnectionBuilder(config).build()); + + ServiceEntityInterface service = + Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED); + + pipelineServiceClient.deployPipeline(ingestionPipeline, service); + pipelineServiceClient.runPipeline(ingestionPipeline, service); + + if (waitForCompletion) { + return waitForCompletion(repository, ingestionPipeline, startTime, timeoutMillis); + } else { + return true; + } + } + + private boolean waitForCompletion( + IngestionPipelineRepository repository, + IngestionPipeline ingestionPipeline, + long startTime, + long timeoutMillis) { + while (true) { + if (System.currentTimeMillis() - startTime > timeoutMillis) { + return false; + } + + List statuses = + repository + .listPipelineStatus( + ingestionPipeline.getFullyQualifiedName(), startTime, startTime + timeoutMillis) + .getData(); + + if (statuses.isEmpty()) { + continue; + } + + PipelineStatus status = statuses.get(statuses.size() - 1); + + if (status.getPipelineState().equals(PipelineStatusType.FAILED)) { + return false; + } else if (status.getPipelineState().equals(PipelineStatusType.SUCCESS) + || status.getPipelineState().equals(PipelineStatusType.PARTIAL_SUCCESS)) { + return true; + } + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppTask.java new file mode 100644 index 00000000000..1e31677752d --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppTask.java @@ -0,0 +1,110 @@ +package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.runApp; + +import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; + +import org.flowable.bpmn.model.BoundaryEvent; +import org.flowable.bpmn.model.BpmnModel; +import org.flowable.bpmn.model.EndEvent; +import org.flowable.bpmn.model.FieldExtension; +import org.flowable.bpmn.model.Process; +import org.flowable.bpmn.model.SequenceFlow; +import org.flowable.bpmn.model.ServiceTask; +import org.flowable.bpmn.model.StartEvent; +import org.flowable.bpmn.model.SubProcess; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition; +import org.openmetadata.service.governance.workflows.elements.NodeInterface; +import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder; +import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder; +import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder; +import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder; +import org.openmetadata.service.governance.workflows.flowable.builders.SubProcessBuilder; +import org.openmetadata.service.util.JsonUtils; + +public class RunAppTask implements NodeInterface { + private final SubProcess subProcess; + private final BoundaryEvent runtimeExceptionBoundaryEvent; + + public RunAppTask(RunAppTaskDefinition nodeDefinition) { + String subProcessId = nodeDefinition.getName(); + + SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build(); + + StartEvent startEvent = + new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build(); + + ServiceTask runApp = + getRunAppServiceTask( + subProcessId, + nodeDefinition.getConfig().getAppName(), + nodeDefinition.getConfig().getWaitForCompletion(), + nodeDefinition.getConfig().getTimeoutSeconds(), + JsonUtils.pojoToJson(nodeDefinition.getInputNamespaceMap())); + + EndEvent endEvent = + new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build(); + + subProcess.addFlowElement(startEvent); + subProcess.addFlowElement(runApp); + subProcess.addFlowElement(endEvent); + + subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), runApp.getId())); + subProcess.addFlowElement(new SequenceFlow(runApp.getId(), endEvent.getId())); + + this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess); + this.subProcess = subProcess; + } + + @Override + public BoundaryEvent getRuntimeExceptionBoundaryEvent() { + return runtimeExceptionBoundaryEvent; + } + + private ServiceTask getRunAppServiceTask( + String subProcessId, + String appName, + boolean waitForCompletion, + long timeoutSeconds, + String inputNamespaceMap) { + FieldExtension appNameExpr = + new FieldExtensionBuilder().fieldName("appNameExpr").fieldValue(appName).build(); + + FieldExtension waitExpr = + new FieldExtensionBuilder() + .fieldName("waitForCompletionExpr") + .fieldValue(String.valueOf(waitForCompletion)) + .build(); + FieldExtension timeoutSecondsExpr = + new FieldExtensionBuilder() + .fieldName("timeoutSecondsExpr") + .fieldValue(String.valueOf(timeoutSeconds)) + .build(); + + FieldExtension inputNamespaceMapExpr = + new FieldExtensionBuilder() + .fieldName("inputNamespaceMapExpr") + .fieldValue(inputNamespaceMap) + .build(); + + FieldExtension pipelineServiceClientExpr = + new FieldExtensionBuilder() + .fieldName("pipelineServiceClientExpr") + .expression("${PipelineServiceClient}") + .build(); + + return new ServiceTaskBuilder() + .id(getFlowableElementId(subProcessId, "triggerIngestionWorkflow")) + .implementation(RunAppDelegate.class.getName()) + .addFieldExtension(appNameExpr) + .addFieldExtension(waitExpr) + .addFieldExtension(timeoutSecondsExpr) + .addFieldExtension(inputNamespaceMapExpr) + .addFieldExtension(pipelineServiceClientExpr) + .setAsync(true) + .build(); + } + + public void addToWorkflow(BpmnModel model, Process process) { + process.addFlowElement(subProcess); + process.addFlowElement(runtimeExceptionBoundaryEvent); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/BaseDelegate.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/BaseDelegate.java new file mode 100644 index 00000000000..01fea92c5e4 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/BaseDelegate.java @@ -0,0 +1,44 @@ +package org.openmetadata.service.governance.workflows.flowable; + +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; + +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.BpmnError; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.JavaDelegate; +import org.openmetadata.service.governance.workflows.WorkflowVariableHandler; +import org.openmetadata.service.util.JsonUtils; + +@Slf4j +public abstract class BaseDelegate implements JavaDelegate { + private Expression inputNamespaceMapExpr; + private Expression configMapExpr; + + protected WorkflowVariableHandler varHandler; + protected Map inputNamespaceMap; + protected Map configMap; + + protected abstract void innerExecute(DelegateExecution execution); + + @Override + public void execute(DelegateExecution execution) { + varHandler = new WorkflowVariableHandler(execution); + try { + inputNamespaceMap = + JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class); + configMap = JsonUtils.readOrConvertValue(configMapExpr.getValue(execution), Map.class); + innerExecute(execution); + } catch (Exception exc) { + LOG.error( + String.format( + "[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())), + exc); + varHandler.setGlobalVariable(EXCEPTION_VARIABLE, exc.toString()); + throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage()); + } + } +} diff --git a/openmetadata-spec/src/main/java/org/openmetadata/schema/governance/workflows/elements/WorkflowNodeDefinitionInterface.java b/openmetadata-spec/src/main/java/org/openmetadata/schema/governance/workflows/elements/WorkflowNodeDefinitionInterface.java index 43cbc321fec..cb732360ff3 100644 --- a/openmetadata-spec/src/main/java/org/openmetadata/schema/governance/workflows/elements/WorkflowNodeDefinitionInterface.java +++ b/openmetadata-spec/src/main/java/org/openmetadata/schema/governance/workflows/elements/WorkflowNodeDefinitionInterface.java @@ -8,6 +8,7 @@ import java.util.Map; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CreateIngestionPipelineTaskDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunIngestionPipelineTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTaskDefinition; @@ -36,6 +37,7 @@ import org.openmetadata.schema.governance.workflows.elements.nodes.userTask.User @JsonSubTypes.Type( value = RunIngestionPipelineTaskDefinition.class, name = "runIngestionPipelineTask"), + @JsonSubTypes.Type(value = RunAppTaskDefinition.class, name = "runAppTask"), @JsonSubTypes.Type(value = ParallelGatewayDefinition.class, name = "parallelGateway"), }) public interface WorkflowNodeDefinitionInterface { diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodeSubType.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodeSubType.json index 8503c16d171..71ef2f61527 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodeSubType.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodeSubType.json @@ -14,6 +14,7 @@ "startEvent", "createIngestionPipelineTask", "runIngestionPipelineTask", + "runAppTask", "parallelGateway" ] } diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/createIngestionPipelineTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/createIngestionPipelineTask.json index de586c05422..470b4974356 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/createIngestionPipelineTask.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/createIngestionPipelineTask.json @@ -18,14 +18,17 @@ "default": "createIngestionPipelineTask" }, "name": { + "title": "Name", "description": "Name that identifies this Node.", "$ref": "../../../../../type/basic.json#/definitions/entityName" }, "displayName": { + "title": "Display Name", "description": "Display Name that identifies this Node.", "type": "string" }, "description": { + "title": "Description", "description": "Description of the Node.", "$ref": "../../../../../type/basic.json#/definitions/markdown" }, @@ -33,9 +36,13 @@ "type": "object", "properties": { "pipelineType": { + "title": "Pipeline Type", + "description": "Define which ingestion pipeline type should be created", "$ref": "../../../../../entity/services/ingestionPipelines/ingestionPipeline.json#/definitions/pipelineType" }, "deploy": { + "title": "Deploy", + "description": "Set if the created pipeline should also be deployed", "type": "boolean", "default": true } diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/runAppTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/runAppTask.json new file mode 100644 index 00000000000..08744a2f6fb --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/runAppTask.json @@ -0,0 +1,87 @@ +{ + "$id": "https://open-metadata.org/schema/governance/workflows/elements/nodes/automatedTask/runAppTask.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "RunAppTaskDefinition", + "description": "Runs an App based on its name.", + "javaInterfaces": [ + "org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface" + ], + "javaType": "org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition", + "type": "object", + "properties": { + "type": { + "type": "string", + "default": "automatedTask" + }, + "subType": { + "type": "string", + "default": "runAppTask" + }, + "name": { + "title": "Name", + "description": "Name that identifies this Node.", + "$ref": "../../../../../type/basic.json#/definitions/entityName" + }, + "displayName": { + "title": "Display Name", + "description": "Display Name that identifies this Node.", + "type": "string" + }, + "description": { + "title": "Description", + "description": "Description of the Node.", + "$ref": "../../../../../type/basic.json#/definitions/markdown" + }, + "config": { + "type": "object", + "properties": { + "appName": { + "title": "App Name", + "description": "Set which App should Run", + "type": "string" + }, + "waitForCompletion": { + "title": "Wait for Completion", + "description": "Set if this step should wait until the App finishes running", + "type": "boolean", + "default": true + }, + "timeoutSeconds": { + "title": "Timeout Seconds", + "description": "Set the amount of seconds to wait before defining the App has timed out.", + "type": "integer", + "default": 3600 + } + }, + "additionalProperties": false, + "required": ["appName", "waitForCompletion", "timeoutSeconds"] + }, + "input": { + "type": "array", + "items": { "type": "string" }, + "default": ["relatedEntity"], + "additionalItems": false, + "minItems": 1, + "maxItems": 1 + }, + "inputNamespaceMap": { + "type": "object", + "properties": { + "relatedEntity": { + "type": "string", + "default": "global" + } + }, + "additionalProperties": false, + "required": ["relatedEntity"] + }, + "branches": { + "type": "array", + "items": { "type": "string" }, + "default": ["true", "false"], + "additionalItems": false, + "minItems": 2, + "maxItems": 2 + } + } +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/runIngestionPipelineTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/runIngestionPipelineTask.json index 75b4d4ed2b4..167058537d0 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/runIngestionPipelineTask.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/runIngestionPipelineTask.json @@ -18,14 +18,17 @@ "default": "runIngestionPipelineTask" }, "name": { + "title": "Name", "description": "Name that identifies this Node.", "$ref": "../../../../../type/basic.json#/definitions/entityName" }, "displayName": { + "title": "Display Name", "description": "Display Name that identifies this Node.", "type": "string" }, "description": { + "title": "Description", "description": "Description of the Node.", "$ref": "../../../../../type/basic.json#/definitions/markdown" }, @@ -33,10 +36,14 @@ "type": "object", "properties": { "waitForCompletion": { + "title": "Wait for Completion", + "description": "Set if this step should wait until the Ingestion Pipeline finishes running", "type": "boolean", "default": true }, "timeoutSeconds": { + "title": "Timeout Seconds", + "description": "Set the amount of seconds to wait before defining the Ingestion Pipeline has timed out.", "type": "integer", "default": 3600 } diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodeSubType.ts b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodeSubType.ts index 56a15977c63..c67fb24c7b9 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodeSubType.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodeSubType.ts @@ -18,6 +18,7 @@ export enum NodeSubType { CreateIngestionPipelineTask = "createIngestionPipelineTask", EndEvent = "endEvent", ParallelGateway = "parallelGateway", + RunAppTask = "runAppTask", RunIngestionPipelineTask = "runIngestionPipelineTask", SetEntityCertificationTask = "setEntityCertificationTask", SetGlossaryTermStatusTask = "setGlossaryTermStatusTask", diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/createIngestionPipelineTask.ts b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/createIngestionPipelineTask.ts index 108355adbbe..3c57056edd3 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/createIngestionPipelineTask.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/createIngestionPipelineTask.ts @@ -37,11 +37,19 @@ export interface CreateIngestionPipelineTask { } export interface Config { - deploy: boolean; + /** + * Set if the created pipeline should also be deployed + */ + deploy: boolean; + /** + * Define which ingestion pipeline type should be created + */ pipelineType: PipelineType; } /** + * Define which ingestion pipeline type should be created + * * Type of Pipeline - metadata, usage */ export enum PipelineType { diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/runAppTask.ts b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/runAppTask.ts new file mode 100644 index 00000000000..8337437f775 --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/runAppTask.ts @@ -0,0 +1,55 @@ +/* + * Copyright 2025 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Runs an App based on its name. + */ +export interface RunAppTask { + branches?: string[]; + config?: Config; + /** + * Description of the Node. + */ + description?: string; + /** + * Display Name that identifies this Node. + */ + displayName?: string; + input?: string[]; + inputNamespaceMap?: InputNamespaceMap; + /** + * Name that identifies this Node. + */ + name?: string; + subType?: string; + type?: string; + [property: string]: any; +} + +export interface Config { + /** + * Set which App should Run + */ + appName: string; + /** + * Set the amount of seconds to wait before defining the App has timed out. + */ + timeoutSeconds: number; + /** + * Set if this step should wait until the App finishes running + */ + waitForCompletion: boolean; +} + +export interface InputNamespaceMap { + relatedEntity: string; +} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/runIngestionPipelineTask.ts b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/runIngestionPipelineTask.ts index ddca1fce27d..eb0ae5615fc 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/runIngestionPipelineTask.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/runIngestionPipelineTask.ts @@ -36,7 +36,13 @@ export interface RunIngestionPipelineTask { } export interface Config { - timeoutSeconds: number; + /** + * Set the amount of seconds to wait before defining the Ingestion Pipeline has timed out. + */ + timeoutSeconds: number; + /** + * Set if this step should wait until the Ingestion Pipeline finishes running + */ waitForCompletion: boolean; }