From 70a2aa1778dbcbccdecda3b2ae2b4fa78cccdc7f Mon Sep 17 00:00:00 2001 From: IceS2 Date: Tue, 4 Mar 2025 14:08:45 +0100 Subject: [PATCH] Workflow improvements (#20054) --- .../automatedTask/CreateIngestionPipelineTask.java | 2 +- .../impl/CreateIngestionPipelineImpl.java | 14 +++++++++++++- .../nodes/automatedTask/runApp/RunAppImpl.java | 13 +++++++++++-- .../nodes/automatedTask/runApp/RunAppTask.java | 2 +- 4 files changed, 26 insertions(+), 5 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CreateIngestionPipelineTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CreateIngestionPipelineTask.java index 8de4a530c1e..28acaf5d745 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CreateIngestionPipelineTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CreateIngestionPipelineTask.java @@ -30,7 +30,7 @@ public class CreateIngestionPipelineTask implements NodeInterface { String subProcessId = nodeDefinition.getName(); SubProcess subProcess = - new SubProcessBuilder().id(subProcessId).setAsync(true).exclusive(false).build(); + new SubProcessBuilder().id(subProcessId).setAsync(true).exclusive(true).build(); StartEvent startEvent = new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CreateIngestionPipelineImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CreateIngestionPipelineImpl.java index cbf3858db4b..8a175366874 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CreateIngestionPipelineImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CreateIngestionPipelineImpl.java @@ -146,15 +146,27 @@ public class CreateIngestionPipelineImpl implements JavaDelegate { private IngestionPipeline createIngestionPipeline( IngestionPipelineMapper mapper, PipelineType pipelineType, ServiceEntityInterface service) { + String displayName = String.format("[%s] %s", service.getName(), pipelineType); IngestionPipelineRepository repository = (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + for (String ingestionPipelineStr : + repository.listAllByParentFqn(service.getFullyQualifiedName())) { + IngestionPipeline ingestionPipeline = + JsonUtils.readOrConvertValue(ingestionPipelineStr, IngestionPipeline.class); + if (ingestionPipeline.getPipelineType().equals(pipelineType) + && ingestionPipeline.getDisplayName().equals(displayName)) { + return ingestionPipeline; + } + } + ; + org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline create = new org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline() .withAirflowConfig(new AirflowConfig().withStartDate(getYesterdayDate())) .withLoggerLevel(LogLevels.INFO) .withName(UUID.randomUUID().toString()) - .withDisplayName(String.format("[%s] %s", service.getName(), pipelineType)) + .withDisplayName(displayName) .withOwners(service.getOwners()) .withPipelineType(pipelineType) .withService(service.getEntityReference()) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java index 6bb9b30d336..53be9a78d14 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Set; import javax.json.JsonPatch; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.ServiceEntityInterface; import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppRunRecord; @@ -29,6 +30,7 @@ import org.openmetadata.sdk.PipelineServiceClientInterface; import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.apps.ApplicationHandler; +import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.jdbi3.AppRepository; import org.openmetadata.service.jdbi3.IngestionPipelineRepository; import org.openmetadata.service.resources.feeds.MessageParser; @@ -36,6 +38,7 @@ import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.OpenMetadataConnectionBuilder; +@Slf4j public class RunAppImpl { public boolean execute( PipelineServiceClientInterface pipelineServiceClient, @@ -46,8 +49,14 @@ public class RunAppImpl { ServiceEntityInterface service = Entity.getEntity(entityLink, "owners", Include.NON_DELETED); AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); - App app = - appRepository.getByName(null, appName, new EntityUtil.Fields(Set.of("bot", "pipelines"))); + App app; + try { + app = + appRepository.getByName(null, appName, new EntityUtil.Fields(Set.of("bot", "pipelines"))); + } catch (EntityNotFoundException ex) { + LOG.warn(String.format("App: '%s' is not Installed. Skipping", appName)); + return true; + } if (!validateAppShouldRun(app, service)) { return true; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppTask.java index 8dc98e905d3..1a078a65618 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppTask.java @@ -28,7 +28,7 @@ public class RunAppTask implements NodeInterface { String subProcessId = nodeDefinition.getName(); SubProcess subProcess = - new SubProcessBuilder().id(subProcessId).setAsync(true).exclusive(false).build(); + new SubProcessBuilder().id(subProcessId).setAsync(true).exclusive(true).build(); StartEvent startEvent = new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build();