mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-10 15:59:57 +00:00
MINOR: Add run app task (#19841)
* Add Run App Task * Work on Run App Task * Rollback task changes * Fixing type convertion * Fix Run External App wait * Fix Run App Task --------- Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com>
This commit is contained in:
parent
b45bc77302
commit
9d51add739
@ -4,6 +4,7 @@ import org.openmetadata.schema.governance.workflows.elements.NodeSubType;
|
|||||||
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
|
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
|
||||||
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition;
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition;
|
||||||
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CreateIngestionPipelineTaskDefinition;
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CreateIngestionPipelineTaskDefinition;
|
||||||
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition;
|
||||||
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunIngestionPipelineTaskDefinition;
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunIngestionPipelineTaskDefinition;
|
||||||
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTaskDefinition;
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTaskDefinition;
|
||||||
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTaskDefinition;
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTaskDefinition;
|
||||||
@ -16,6 +17,7 @@ import org.openmetadata.service.governance.workflows.elements.nodes.automatedTas
|
|||||||
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.RunIngestionPipelineTask;
|
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.RunIngestionPipelineTask;
|
||||||
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTask;
|
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTask;
|
||||||
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTask;
|
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTask;
|
||||||
|
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.runApp.RunAppTask;
|
||||||
import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent;
|
import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent;
|
||||||
import org.openmetadata.service.governance.workflows.elements.nodes.gateway.ParallelGateway;
|
import org.openmetadata.service.governance.workflows.elements.nodes.gateway.ParallelGateway;
|
||||||
import org.openmetadata.service.governance.workflows.elements.nodes.startEvent.StartEvent;
|
import org.openmetadata.service.governance.workflows.elements.nodes.startEvent.StartEvent;
|
||||||
@ -37,6 +39,7 @@ public class NodeFactory {
|
|||||||
(CreateIngestionPipelineTaskDefinition) nodeDefinition);
|
(CreateIngestionPipelineTaskDefinition) nodeDefinition);
|
||||||
case RUN_INGESTION_PIPELINE_TASK -> new RunIngestionPipelineTask(
|
case RUN_INGESTION_PIPELINE_TASK -> new RunIngestionPipelineTask(
|
||||||
(RunIngestionPipelineTaskDefinition) nodeDefinition);
|
(RunIngestionPipelineTaskDefinition) nodeDefinition);
|
||||||
|
case RUN_APP_TASK -> new RunAppTask((RunAppTaskDefinition) nodeDefinition);
|
||||||
case PARALLEL_GATEWAY -> new ParallelGateway((ParallelGatewayDefinition) nodeDefinition);
|
case PARALLEL_GATEWAY -> new ParallelGateway((ParallelGatewayDefinition) nodeDefinition);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,64 @@
|
|||||||
|
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.runApp;
|
||||||
|
|
||||||
|
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||||
|
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
|
||||||
|
import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE;
|
||||||
|
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
|
||||||
|
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.flowable.common.engine.api.delegate.Expression;
|
||||||
|
import org.flowable.engine.delegate.BpmnError;
|
||||||
|
import org.flowable.engine.delegate.DelegateExecution;
|
||||||
|
import org.flowable.engine.delegate.JavaDelegate;
|
||||||
|
import org.openmetadata.sdk.PipelineServiceClientInterface;
|
||||||
|
import org.openmetadata.service.governance.workflows.WorkflowVariableHandler;
|
||||||
|
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||||
|
import org.openmetadata.service.util.JsonUtils;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class RunAppDelegate implements JavaDelegate {
|
||||||
|
private Expression inputNamespaceMapExpr;
|
||||||
|
private Expression pipelineServiceClientExpr;
|
||||||
|
private Expression appNameExpr;
|
||||||
|
private Expression waitForCompletionExpr;
|
||||||
|
private Expression timeoutSecondsExpr;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(DelegateExecution execution) {
|
||||||
|
WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution);
|
||||||
|
try {
|
||||||
|
Map<String, String> inputNamespaceMap =
|
||||||
|
JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class);
|
||||||
|
|
||||||
|
String appName = (String) appNameExpr.getValue(execution);
|
||||||
|
boolean waitForCompletion =
|
||||||
|
Boolean.parseBoolean((String) waitForCompletionExpr.getValue(execution));
|
||||||
|
long timeoutSeconds = Long.parseLong((String) timeoutSecondsExpr.getValue(execution));
|
||||||
|
|
||||||
|
PipelineServiceClientInterface pipelineServiceClient =
|
||||||
|
(PipelineServiceClientInterface) pipelineServiceClientExpr.getValue(execution);
|
||||||
|
|
||||||
|
MessageParser.EntityLink entityLink =
|
||||||
|
MessageParser.EntityLink.parse(
|
||||||
|
(String)
|
||||||
|
varHandler.getNamespacedVariable(
|
||||||
|
inputNamespaceMap.get(RELATED_ENTITY_VARIABLE), RELATED_ENTITY_VARIABLE));
|
||||||
|
|
||||||
|
boolean success =
|
||||||
|
new RunAppImpl()
|
||||||
|
.execute(
|
||||||
|
pipelineServiceClient, appName, waitForCompletion, timeoutSeconds, entityLink);
|
||||||
|
|
||||||
|
varHandler.setNodeVariable(RESULT_VARIABLE, success);
|
||||||
|
} catch (Exception exc) {
|
||||||
|
LOG.error(
|
||||||
|
String.format(
|
||||||
|
"[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())),
|
||||||
|
exc);
|
||||||
|
varHandler.setGlobalVariable(EXCEPTION_VARIABLE, exc.toString());
|
||||||
|
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,199 @@
|
|||||||
|
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.runApp;
|
||||||
|
|
||||||
|
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||||
|
import static org.openmetadata.service.util.EntityUtil.Fields.EMPTY_FIELDS;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import javax.json.JsonPatch;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import org.openmetadata.schema.ServiceEntityInterface;
|
||||||
|
import org.openmetadata.schema.entity.app.App;
|
||||||
|
import org.openmetadata.schema.entity.app.AppRunRecord;
|
||||||
|
import org.openmetadata.schema.entity.app.AppType;
|
||||||
|
import org.openmetadata.schema.entity.app.external.CollateAIAppConfig;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;
|
||||||
|
import org.openmetadata.schema.type.EntityReference;
|
||||||
|
import org.openmetadata.schema.type.Include;
|
||||||
|
import org.openmetadata.sdk.PipelineServiceClientInterface;
|
||||||
|
import org.openmetadata.service.Entity;
|
||||||
|
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||||
|
import org.openmetadata.service.apps.ApplicationHandler;
|
||||||
|
import org.openmetadata.service.jdbi3.AppRepository;
|
||||||
|
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
|
||||||
|
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||||
|
import org.openmetadata.service.util.EntityUtil;
|
||||||
|
import org.openmetadata.service.util.JsonUtils;
|
||||||
|
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
|
||||||
|
|
||||||
|
public class RunAppImpl {
|
||||||
|
public boolean execute(
|
||||||
|
PipelineServiceClientInterface pipelineServiceClient,
|
||||||
|
String appName,
|
||||||
|
boolean waitForCompletion,
|
||||||
|
long timeoutSeconds,
|
||||||
|
MessageParser.EntityLink entityLink) {
|
||||||
|
ServiceEntityInterface service = Entity.getEntity(entityLink, "owners", Include.NON_DELETED);
|
||||||
|
|
||||||
|
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
|
||||||
|
App app =
|
||||||
|
appRepository.getByName(null, appName, new EntityUtil.Fields(Set.of("bot", "pipelines")));
|
||||||
|
|
||||||
|
if (!validateAppShouldRun(app, service)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
App updatedApp = getUpdatedApp(app, service);
|
||||||
|
|
||||||
|
updateApp(appRepository, app, updatedApp);
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
long timeoutMillis = timeoutSeconds * 1000;
|
||||||
|
boolean success = true;
|
||||||
|
|
||||||
|
if (app.getAppType().equals(AppType.Internal)) {
|
||||||
|
success = runApp(appRepository, app, waitForCompletion, startTime, timeoutMillis);
|
||||||
|
} else {
|
||||||
|
success = runApp(pipelineServiceClient, app, waitForCompletion, startTime, timeoutMillis);
|
||||||
|
}
|
||||||
|
|
||||||
|
updateApp(appRepository, updatedApp, app);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean validateAppShouldRun(App app, ServiceEntityInterface service) {
|
||||||
|
// We only want to run the CollateAIApplication for Databases
|
||||||
|
if (Entity.getEntityTypeFromObject(service).equals(Entity.DATABASE_SERVICE)
|
||||||
|
&& app.getName().equals("CollateAIApplication")) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private App getUpdatedApp(App app, ServiceEntityInterface service) {
|
||||||
|
App updatedApp = JsonUtils.deepCopy(app, App.class);
|
||||||
|
Object updatedConfig = JsonUtils.deepCopy(app.getAppConfiguration(), Object.class);
|
||||||
|
|
||||||
|
if (app.getName().equals("CollateAIApplication")) {
|
||||||
|
(JsonUtils.convertValue(updatedConfig, CollateAIAppConfig.class))
|
||||||
|
.withFilter(
|
||||||
|
String.format(
|
||||||
|
"{\"query\":{\"bool\":{\"must\":[{\"bool\":{\"must\":[{\"term\":{\"Tier.TagFQN\":\"Tier.Tier1\"}}]}},{\"bool\":{\"must\":[{\"term\":{\"entityType\":\"table\"}}]}},{\"bool\":{\"must\":[{\"term\":{\"service.name.keyword\":\"%s\"}}]}}]}}}",
|
||||||
|
service.getName().toLowerCase()));
|
||||||
|
}
|
||||||
|
updatedApp.withAppConfiguration(updatedConfig);
|
||||||
|
return updatedApp;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateApp(AppRepository repository, App originalApp, App updatedApp) {
|
||||||
|
JsonPatch patch = JsonUtils.getJsonPatch(originalApp, updatedApp);
|
||||||
|
repository.patch(null, originalApp.getId(), "admin", patch);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Internal App Logic
|
||||||
|
@SneakyThrows
|
||||||
|
private boolean runApp(
|
||||||
|
AppRepository repository,
|
||||||
|
App app,
|
||||||
|
boolean waitForCompletion,
|
||||||
|
long startTime,
|
||||||
|
long timeoutMillis) {
|
||||||
|
ApplicationHandler.getInstance()
|
||||||
|
.triggerApplicationOnDemand(app, Entity.getCollectionDAO(), Entity.getSearchRepository());
|
||||||
|
|
||||||
|
if (waitForCompletion) {
|
||||||
|
return waitForCompletion(repository, app, startTime, timeoutMillis);
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean waitForCompletion(
|
||||||
|
AppRepository repository, App app, long startTime, long timeoutMillis) {
|
||||||
|
AppRunRecord appRunRecord = null;
|
||||||
|
|
||||||
|
do {
|
||||||
|
try {
|
||||||
|
if (System.currentTimeMillis() - startTime > timeoutMillis) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
appRunRecord = repository.getLatestAppRunsAfterStartTime(app, startTime);
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
}
|
||||||
|
} while (!isRunCompleted(appRunRecord));
|
||||||
|
|
||||||
|
return appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS)
|
||||||
|
|| appRunRecord.getStatus().equals(AppRunRecord.Status.COMPLETED);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isRunCompleted(AppRunRecord appRunRecord) {
|
||||||
|
if (appRunRecord == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return !nullOrEmpty(appRunRecord.getExecutionTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
// External App Logic
|
||||||
|
private boolean runApp(
|
||||||
|
PipelineServiceClientInterface pipelineServiceClient,
|
||||||
|
App app,
|
||||||
|
boolean waitForCompletion,
|
||||||
|
long startTime,
|
||||||
|
long timeoutMillis) {
|
||||||
|
EntityReference pipelineRef = app.getPipelines().get(0);
|
||||||
|
|
||||||
|
IngestionPipelineRepository repository =
|
||||||
|
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
|
||||||
|
OpenMetadataApplicationConfig config = repository.getOpenMetadataApplicationConfig();
|
||||||
|
|
||||||
|
IngestionPipeline ingestionPipeline = repository.get(null, pipelineRef.getId(), EMPTY_FIELDS);
|
||||||
|
ingestionPipeline.setOpenMetadataServerConnection(
|
||||||
|
new OpenMetadataConnectionBuilder(config).build());
|
||||||
|
|
||||||
|
ServiceEntityInterface service =
|
||||||
|
Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED);
|
||||||
|
|
||||||
|
pipelineServiceClient.deployPipeline(ingestionPipeline, service);
|
||||||
|
pipelineServiceClient.runPipeline(ingestionPipeline, service);
|
||||||
|
|
||||||
|
if (waitForCompletion) {
|
||||||
|
return waitForCompletion(repository, ingestionPipeline, startTime, timeoutMillis);
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean waitForCompletion(
|
||||||
|
IngestionPipelineRepository repository,
|
||||||
|
IngestionPipeline ingestionPipeline,
|
||||||
|
long startTime,
|
||||||
|
long timeoutMillis) {
|
||||||
|
while (true) {
|
||||||
|
if (System.currentTimeMillis() - startTime > timeoutMillis) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<PipelineStatus> statuses =
|
||||||
|
repository
|
||||||
|
.listPipelineStatus(
|
||||||
|
ingestionPipeline.getFullyQualifiedName(), startTime, startTime + timeoutMillis)
|
||||||
|
.getData();
|
||||||
|
|
||||||
|
if (statuses.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
PipelineStatus status = statuses.get(statuses.size() - 1);
|
||||||
|
|
||||||
|
if (status.getPipelineState().equals(PipelineStatusType.FAILED)) {
|
||||||
|
return false;
|
||||||
|
} else if (status.getPipelineState().equals(PipelineStatusType.SUCCESS)
|
||||||
|
|| status.getPipelineState().equals(PipelineStatusType.PARTIAL_SUCCESS)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,110 @@
|
|||||||
|
package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.runApp;
|
||||||
|
|
||||||
|
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;
|
||||||
|
|
||||||
|
import org.flowable.bpmn.model.BoundaryEvent;
|
||||||
|
import org.flowable.bpmn.model.BpmnModel;
|
||||||
|
import org.flowable.bpmn.model.EndEvent;
|
||||||
|
import org.flowable.bpmn.model.FieldExtension;
|
||||||
|
import org.flowable.bpmn.model.Process;
|
||||||
|
import org.flowable.bpmn.model.SequenceFlow;
|
||||||
|
import org.flowable.bpmn.model.ServiceTask;
|
||||||
|
import org.flowable.bpmn.model.StartEvent;
|
||||||
|
import org.flowable.bpmn.model.SubProcess;
|
||||||
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition;
|
||||||
|
import org.openmetadata.service.governance.workflows.elements.NodeInterface;
|
||||||
|
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
|
||||||
|
import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder;
|
||||||
|
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
|
||||||
|
import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder;
|
||||||
|
import org.openmetadata.service.governance.workflows.flowable.builders.SubProcessBuilder;
|
||||||
|
import org.openmetadata.service.util.JsonUtils;
|
||||||
|
|
||||||
|
public class RunAppTask implements NodeInterface {
|
||||||
|
private final SubProcess subProcess;
|
||||||
|
private final BoundaryEvent runtimeExceptionBoundaryEvent;
|
||||||
|
|
||||||
|
public RunAppTask(RunAppTaskDefinition nodeDefinition) {
|
||||||
|
String subProcessId = nodeDefinition.getName();
|
||||||
|
|
||||||
|
SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build();
|
||||||
|
|
||||||
|
StartEvent startEvent =
|
||||||
|
new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build();
|
||||||
|
|
||||||
|
ServiceTask runApp =
|
||||||
|
getRunAppServiceTask(
|
||||||
|
subProcessId,
|
||||||
|
nodeDefinition.getConfig().getAppName(),
|
||||||
|
nodeDefinition.getConfig().getWaitForCompletion(),
|
||||||
|
nodeDefinition.getConfig().getTimeoutSeconds(),
|
||||||
|
JsonUtils.pojoToJson(nodeDefinition.getInputNamespaceMap()));
|
||||||
|
|
||||||
|
EndEvent endEvent =
|
||||||
|
new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build();
|
||||||
|
|
||||||
|
subProcess.addFlowElement(startEvent);
|
||||||
|
subProcess.addFlowElement(runApp);
|
||||||
|
subProcess.addFlowElement(endEvent);
|
||||||
|
|
||||||
|
subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), runApp.getId()));
|
||||||
|
subProcess.addFlowElement(new SequenceFlow(runApp.getId(), endEvent.getId()));
|
||||||
|
|
||||||
|
this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(subProcess);
|
||||||
|
this.subProcess = subProcess;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
|
||||||
|
return runtimeExceptionBoundaryEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ServiceTask getRunAppServiceTask(
|
||||||
|
String subProcessId,
|
||||||
|
String appName,
|
||||||
|
boolean waitForCompletion,
|
||||||
|
long timeoutSeconds,
|
||||||
|
String inputNamespaceMap) {
|
||||||
|
FieldExtension appNameExpr =
|
||||||
|
new FieldExtensionBuilder().fieldName("appNameExpr").fieldValue(appName).build();
|
||||||
|
|
||||||
|
FieldExtension waitExpr =
|
||||||
|
new FieldExtensionBuilder()
|
||||||
|
.fieldName("waitForCompletionExpr")
|
||||||
|
.fieldValue(String.valueOf(waitForCompletion))
|
||||||
|
.build();
|
||||||
|
FieldExtension timeoutSecondsExpr =
|
||||||
|
new FieldExtensionBuilder()
|
||||||
|
.fieldName("timeoutSecondsExpr")
|
||||||
|
.fieldValue(String.valueOf(timeoutSeconds))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
FieldExtension inputNamespaceMapExpr =
|
||||||
|
new FieldExtensionBuilder()
|
||||||
|
.fieldName("inputNamespaceMapExpr")
|
||||||
|
.fieldValue(inputNamespaceMap)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
FieldExtension pipelineServiceClientExpr =
|
||||||
|
new FieldExtensionBuilder()
|
||||||
|
.fieldName("pipelineServiceClientExpr")
|
||||||
|
.expression("${PipelineServiceClient}")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return new ServiceTaskBuilder()
|
||||||
|
.id(getFlowableElementId(subProcessId, "triggerIngestionWorkflow"))
|
||||||
|
.implementation(RunAppDelegate.class.getName())
|
||||||
|
.addFieldExtension(appNameExpr)
|
||||||
|
.addFieldExtension(waitExpr)
|
||||||
|
.addFieldExtension(timeoutSecondsExpr)
|
||||||
|
.addFieldExtension(inputNamespaceMapExpr)
|
||||||
|
.addFieldExtension(pipelineServiceClientExpr)
|
||||||
|
.setAsync(true)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addToWorkflow(BpmnModel model, Process process) {
|
||||||
|
process.addFlowElement(subProcess);
|
||||||
|
process.addFlowElement(runtimeExceptionBoundaryEvent);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,44 @@
|
|||||||
|
package org.openmetadata.service.governance.workflows.flowable;
|
||||||
|
|
||||||
|
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
|
||||||
|
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
|
||||||
|
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.flowable.common.engine.api.delegate.Expression;
|
||||||
|
import org.flowable.engine.delegate.BpmnError;
|
||||||
|
import org.flowable.engine.delegate.DelegateExecution;
|
||||||
|
import org.flowable.engine.delegate.JavaDelegate;
|
||||||
|
import org.openmetadata.service.governance.workflows.WorkflowVariableHandler;
|
||||||
|
import org.openmetadata.service.util.JsonUtils;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public abstract class BaseDelegate implements JavaDelegate {
|
||||||
|
private Expression inputNamespaceMapExpr;
|
||||||
|
private Expression configMapExpr;
|
||||||
|
|
||||||
|
protected WorkflowVariableHandler varHandler;
|
||||||
|
protected Map<String, String> inputNamespaceMap;
|
||||||
|
protected Map<String, Object> configMap;
|
||||||
|
|
||||||
|
protected abstract void innerExecute(DelegateExecution execution);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(DelegateExecution execution) {
|
||||||
|
varHandler = new WorkflowVariableHandler(execution);
|
||||||
|
try {
|
||||||
|
inputNamespaceMap =
|
||||||
|
JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class);
|
||||||
|
configMap = JsonUtils.readOrConvertValue(configMapExpr.getValue(execution), Map.class);
|
||||||
|
innerExecute(execution);
|
||||||
|
} catch (Exception exc) {
|
||||||
|
LOG.error(
|
||||||
|
String.format(
|
||||||
|
"[%s] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())),
|
||||||
|
exc);
|
||||||
|
varHandler.setGlobalVariable(EXCEPTION_VARIABLE, exc.toString());
|
||||||
|
throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -8,6 +8,7 @@ import java.util.Map;
|
|||||||
import org.openmetadata.common.utils.CommonUtil;
|
import org.openmetadata.common.utils.CommonUtil;
|
||||||
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition;
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition;
|
||||||
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CreateIngestionPipelineTaskDefinition;
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CreateIngestionPipelineTaskDefinition;
|
||||||
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition;
|
||||||
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunIngestionPipelineTaskDefinition;
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunIngestionPipelineTaskDefinition;
|
||||||
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTaskDefinition;
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTaskDefinition;
|
||||||
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTaskDefinition;
|
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTaskDefinition;
|
||||||
@ -36,6 +37,7 @@ import org.openmetadata.schema.governance.workflows.elements.nodes.userTask.User
|
|||||||
@JsonSubTypes.Type(
|
@JsonSubTypes.Type(
|
||||||
value = RunIngestionPipelineTaskDefinition.class,
|
value = RunIngestionPipelineTaskDefinition.class,
|
||||||
name = "runIngestionPipelineTask"),
|
name = "runIngestionPipelineTask"),
|
||||||
|
@JsonSubTypes.Type(value = RunAppTaskDefinition.class, name = "runAppTask"),
|
||||||
@JsonSubTypes.Type(value = ParallelGatewayDefinition.class, name = "parallelGateway"),
|
@JsonSubTypes.Type(value = ParallelGatewayDefinition.class, name = "parallelGateway"),
|
||||||
})
|
})
|
||||||
public interface WorkflowNodeDefinitionInterface {
|
public interface WorkflowNodeDefinitionInterface {
|
||||||
|
|||||||
@ -14,6 +14,7 @@
|
|||||||
"startEvent",
|
"startEvent",
|
||||||
"createIngestionPipelineTask",
|
"createIngestionPipelineTask",
|
||||||
"runIngestionPipelineTask",
|
"runIngestionPipelineTask",
|
||||||
|
"runAppTask",
|
||||||
"parallelGateway"
|
"parallelGateway"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,14 +18,17 @@
|
|||||||
"default": "createIngestionPipelineTask"
|
"default": "createIngestionPipelineTask"
|
||||||
},
|
},
|
||||||
"name": {
|
"name": {
|
||||||
|
"title": "Name",
|
||||||
"description": "Name that identifies this Node.",
|
"description": "Name that identifies this Node.",
|
||||||
"$ref": "../../../../../type/basic.json#/definitions/entityName"
|
"$ref": "../../../../../type/basic.json#/definitions/entityName"
|
||||||
},
|
},
|
||||||
"displayName": {
|
"displayName": {
|
||||||
|
"title": "Display Name",
|
||||||
"description": "Display Name that identifies this Node.",
|
"description": "Display Name that identifies this Node.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"description": {
|
"description": {
|
||||||
|
"title": "Description",
|
||||||
"description": "Description of the Node.",
|
"description": "Description of the Node.",
|
||||||
"$ref": "../../../../../type/basic.json#/definitions/markdown"
|
"$ref": "../../../../../type/basic.json#/definitions/markdown"
|
||||||
},
|
},
|
||||||
@ -33,9 +36,13 @@
|
|||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
"pipelineType": {
|
"pipelineType": {
|
||||||
|
"title": "Pipeline Type",
|
||||||
|
"description": "Define which ingestion pipeline type should be created",
|
||||||
"$ref": "../../../../../entity/services/ingestionPipelines/ingestionPipeline.json#/definitions/pipelineType"
|
"$ref": "../../../../../entity/services/ingestionPipelines/ingestionPipeline.json#/definitions/pipelineType"
|
||||||
},
|
},
|
||||||
"deploy": {
|
"deploy": {
|
||||||
|
"title": "Deploy",
|
||||||
|
"description": "Set if the created pipeline should also be deployed",
|
||||||
"type": "boolean",
|
"type": "boolean",
|
||||||
"default": true
|
"default": true
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,87 @@
|
|||||||
|
{
|
||||||
|
"$id": "https://open-metadata.org/schema/governance/workflows/elements/nodes/automatedTask/runAppTask.json",
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"title": "RunAppTaskDefinition",
|
||||||
|
"description": "Runs an App based on its name.",
|
||||||
|
"javaInterfaces": [
|
||||||
|
"org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface"
|
||||||
|
],
|
||||||
|
"javaType": "org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"type": {
|
||||||
|
"type": "string",
|
||||||
|
"default": "automatedTask"
|
||||||
|
},
|
||||||
|
"subType": {
|
||||||
|
"type": "string",
|
||||||
|
"default": "runAppTask"
|
||||||
|
},
|
||||||
|
"name": {
|
||||||
|
"title": "Name",
|
||||||
|
"description": "Name that identifies this Node.",
|
||||||
|
"$ref": "../../../../../type/basic.json#/definitions/entityName"
|
||||||
|
},
|
||||||
|
"displayName": {
|
||||||
|
"title": "Display Name",
|
||||||
|
"description": "Display Name that identifies this Node.",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"description": {
|
||||||
|
"title": "Description",
|
||||||
|
"description": "Description of the Node.",
|
||||||
|
"$ref": "../../../../../type/basic.json#/definitions/markdown"
|
||||||
|
},
|
||||||
|
"config": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"appName": {
|
||||||
|
"title": "App Name",
|
||||||
|
"description": "Set which App should Run",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"waitForCompletion": {
|
||||||
|
"title": "Wait for Completion",
|
||||||
|
"description": "Set if this step should wait until the App finishes running",
|
||||||
|
"type": "boolean",
|
||||||
|
"default": true
|
||||||
|
},
|
||||||
|
"timeoutSeconds": {
|
||||||
|
"title": "Timeout Seconds",
|
||||||
|
"description": "Set the amount of seconds to wait before defining the App has timed out.",
|
||||||
|
"type": "integer",
|
||||||
|
"default": 3600
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"additionalProperties": false,
|
||||||
|
"required": ["appName", "waitForCompletion", "timeoutSeconds"]
|
||||||
|
},
|
||||||
|
"input": {
|
||||||
|
"type": "array",
|
||||||
|
"items": { "type": "string" },
|
||||||
|
"default": ["relatedEntity"],
|
||||||
|
"additionalItems": false,
|
||||||
|
"minItems": 1,
|
||||||
|
"maxItems": 1
|
||||||
|
},
|
||||||
|
"inputNamespaceMap": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"relatedEntity": {
|
||||||
|
"type": "string",
|
||||||
|
"default": "global"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"additionalProperties": false,
|
||||||
|
"required": ["relatedEntity"]
|
||||||
|
},
|
||||||
|
"branches": {
|
||||||
|
"type": "array",
|
||||||
|
"items": { "type": "string" },
|
||||||
|
"default": ["true", "false"],
|
||||||
|
"additionalItems": false,
|
||||||
|
"minItems": 2,
|
||||||
|
"maxItems": 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -18,14 +18,17 @@
|
|||||||
"default": "runIngestionPipelineTask"
|
"default": "runIngestionPipelineTask"
|
||||||
},
|
},
|
||||||
"name": {
|
"name": {
|
||||||
|
"title": "Name",
|
||||||
"description": "Name that identifies this Node.",
|
"description": "Name that identifies this Node.",
|
||||||
"$ref": "../../../../../type/basic.json#/definitions/entityName"
|
"$ref": "../../../../../type/basic.json#/definitions/entityName"
|
||||||
},
|
},
|
||||||
"displayName": {
|
"displayName": {
|
||||||
|
"title": "Display Name",
|
||||||
"description": "Display Name that identifies this Node.",
|
"description": "Display Name that identifies this Node.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"description": {
|
"description": {
|
||||||
|
"title": "Description",
|
||||||
"description": "Description of the Node.",
|
"description": "Description of the Node.",
|
||||||
"$ref": "../../../../../type/basic.json#/definitions/markdown"
|
"$ref": "../../../../../type/basic.json#/definitions/markdown"
|
||||||
},
|
},
|
||||||
@ -33,10 +36,14 @@
|
|||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
"waitForCompletion": {
|
"waitForCompletion": {
|
||||||
|
"title": "Wait for Completion",
|
||||||
|
"description": "Set if this step should wait until the Ingestion Pipeline finishes running",
|
||||||
"type": "boolean",
|
"type": "boolean",
|
||||||
"default": true
|
"default": true
|
||||||
},
|
},
|
||||||
"timeoutSeconds": {
|
"timeoutSeconds": {
|
||||||
|
"title": "Timeout Seconds",
|
||||||
|
"description": "Set the amount of seconds to wait before defining the Ingestion Pipeline has timed out.",
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"default": 3600
|
"default": 3600
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,6 +18,7 @@ export enum NodeSubType {
|
|||||||
CreateIngestionPipelineTask = "createIngestionPipelineTask",
|
CreateIngestionPipelineTask = "createIngestionPipelineTask",
|
||||||
EndEvent = "endEvent",
|
EndEvent = "endEvent",
|
||||||
ParallelGateway = "parallelGateway",
|
ParallelGateway = "parallelGateway",
|
||||||
|
RunAppTask = "runAppTask",
|
||||||
RunIngestionPipelineTask = "runIngestionPipelineTask",
|
RunIngestionPipelineTask = "runIngestionPipelineTask",
|
||||||
SetEntityCertificationTask = "setEntityCertificationTask",
|
SetEntityCertificationTask = "setEntityCertificationTask",
|
||||||
SetGlossaryTermStatusTask = "setGlossaryTermStatusTask",
|
SetGlossaryTermStatusTask = "setGlossaryTermStatusTask",
|
||||||
|
|||||||
@ -37,11 +37,19 @@ export interface CreateIngestionPipelineTask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export interface Config {
|
export interface Config {
|
||||||
deploy: boolean;
|
/**
|
||||||
|
* Set if the created pipeline should also be deployed
|
||||||
|
*/
|
||||||
|
deploy: boolean;
|
||||||
|
/**
|
||||||
|
* Define which ingestion pipeline type should be created
|
||||||
|
*/
|
||||||
pipelineType: PipelineType;
|
pipelineType: PipelineType;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Define which ingestion pipeline type should be created
|
||||||
|
*
|
||||||
* Type of Pipeline - metadata, usage
|
* Type of Pipeline - metadata, usage
|
||||||
*/
|
*/
|
||||||
export enum PipelineType {
|
export enum PipelineType {
|
||||||
|
|||||||
@ -0,0 +1,55 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2025 Collate.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* Runs an App based on its name.
|
||||||
|
*/
|
||||||
|
export interface RunAppTask {
|
||||||
|
branches?: string[];
|
||||||
|
config?: Config;
|
||||||
|
/**
|
||||||
|
* Description of the Node.
|
||||||
|
*/
|
||||||
|
description?: string;
|
||||||
|
/**
|
||||||
|
* Display Name that identifies this Node.
|
||||||
|
*/
|
||||||
|
displayName?: string;
|
||||||
|
input?: string[];
|
||||||
|
inputNamespaceMap?: InputNamespaceMap;
|
||||||
|
/**
|
||||||
|
* Name that identifies this Node.
|
||||||
|
*/
|
||||||
|
name?: string;
|
||||||
|
subType?: string;
|
||||||
|
type?: string;
|
||||||
|
[property: string]: any;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface Config {
|
||||||
|
/**
|
||||||
|
* Set which App should Run
|
||||||
|
*/
|
||||||
|
appName: string;
|
||||||
|
/**
|
||||||
|
* Set the amount of seconds to wait before defining the App has timed out.
|
||||||
|
*/
|
||||||
|
timeoutSeconds: number;
|
||||||
|
/**
|
||||||
|
* Set if this step should wait until the App finishes running
|
||||||
|
*/
|
||||||
|
waitForCompletion: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface InputNamespaceMap {
|
||||||
|
relatedEntity: string;
|
||||||
|
}
|
||||||
@ -36,7 +36,13 @@ export interface RunIngestionPipelineTask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export interface Config {
|
export interface Config {
|
||||||
timeoutSeconds: number;
|
/**
|
||||||
|
* Set the amount of seconds to wait before defining the Ingestion Pipeline has timed out.
|
||||||
|
*/
|
||||||
|
timeoutSeconds: number;
|
||||||
|
/**
|
||||||
|
* Set if this step should wait until the Ingestion Pipeline finishes running
|
||||||
|
*/
|
||||||
waitForCompletion: boolean;
|
waitForCompletion: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user