Workflow improvements (#20054)

This commit is contained in:
IceS2 2025-03-04 14:08:45 +01:00 committed by GitHub
parent 6f0dbd01eb
commit 70a2aa1778
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 26 additions and 5 deletions

View File

@ -30,7 +30,7 @@ public class CreateIngestionPipelineTask implements NodeInterface {
String subProcessId = nodeDefinition.getName();
SubProcess subProcess =
new SubProcessBuilder().id(subProcessId).setAsync(true).exclusive(false).build();
new SubProcessBuilder().id(subProcessId).setAsync(true).exclusive(true).build();
StartEvent startEvent =
new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build();

View File

@ -146,15 +146,27 @@ public class CreateIngestionPipelineImpl implements JavaDelegate {
private IngestionPipeline createIngestionPipeline(
IngestionPipelineMapper mapper, PipelineType pipelineType, ServiceEntityInterface service) {
String displayName = String.format("[%s] %s", service.getName(), pipelineType);
IngestionPipelineRepository repository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
for (String ingestionPipelineStr :
repository.listAllByParentFqn(service.getFullyQualifiedName())) {
IngestionPipeline ingestionPipeline =
JsonUtils.readOrConvertValue(ingestionPipelineStr, IngestionPipeline.class);
if (ingestionPipeline.getPipelineType().equals(pipelineType)
&& ingestionPipeline.getDisplayName().equals(displayName)) {
return ingestionPipeline;
}
}
;
org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline create =
new org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline()
.withAirflowConfig(new AirflowConfig().withStartDate(getYesterdayDate()))
.withLoggerLevel(LogLevels.INFO)
.withName(UUID.randomUUID().toString())
.withDisplayName(String.format("[%s] %s", service.getName(), pipelineType))
.withDisplayName(displayName)
.withOwners(service.getOwners())
.withPipelineType(pipelineType)
.withService(service.getEntityReference())

View File

@ -7,6 +7,7 @@ import java.util.List;
import java.util.Set;
import javax.json.JsonPatch;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
@ -29,6 +30,7 @@ import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.apps.ApplicationHandler;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.resources.feeds.MessageParser;
@ -36,6 +38,7 @@ import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
@Slf4j
public class RunAppImpl {
public boolean execute(
PipelineServiceClientInterface pipelineServiceClient,
@ -46,8 +49,14 @@ public class RunAppImpl {
ServiceEntityInterface service = Entity.getEntity(entityLink, "owners", Include.NON_DELETED);
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
App app =
appRepository.getByName(null, appName, new EntityUtil.Fields(Set.of("bot", "pipelines")));
App app;
try {
app =
appRepository.getByName(null, appName, new EntityUtil.Fields(Set.of("bot", "pipelines")));
} catch (EntityNotFoundException ex) {
LOG.warn(String.format("App: '%s' is not Installed. Skipping", appName));
return true;
}
if (!validateAppShouldRun(app, service)) {
return true;

View File

@ -28,7 +28,7 @@ public class RunAppTask implements NodeInterface {
String subProcessId = nodeDefinition.getName();
SubProcess subProcess =
new SubProcessBuilder().id(subProcessId).setAsync(true).exclusive(false).build();
new SubProcessBuilder().id(subProcessId).setAsync(true).exclusive(true).build();
StartEvent startEvent =
new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build();