From 012aa9b804aff570fce2b3d34d9f266494c2e707 Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Thu, 25 Jul 2024 15:46:01 +0200 Subject: [PATCH] MINOR: fix: scheduled apps (#17167) * fix: scheduled apps run scheduling procedures for apps with Scheduled or ScheduledOrManual schedule type * format --- .../apps/AbstractNativeApplication.java | 20 +++++- .../service/resources/apps/AppResource.java | 16 +++-- .../resources/apps/AppsResourceTest.java | 68 +++++++++++-------- .../openmetadata/service/util/TestUtils.java | 11 +-- 4 files changed, 71 insertions(+), 44 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java index 80a14f2d221..14e811eb1e2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java @@ -3,6 +3,7 @@ package org.openmetadata.service.apps; import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME; import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME; import static org.openmetadata.service.exception.CatalogExceptionMessage.NO_MANUAL_TRIGGER_ERR; +import static org.openmetadata.service.resources.apps.AppResource.SCHEDULED_TYPES; import java.util.List; import lombok.Getter; @@ -24,6 +25,7 @@ import org.openmetadata.schema.metadataIngestion.ApplicationPipeline; import org.openmetadata.schema.metadataIngestion.SourceConfig; import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection; import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.ProviderType; import org.openmetadata.schema.type.Relationship; import org.openmetadata.service.Entity; @@ -68,8 +70,8 @@ public class AbstractNativeApplication implements NativeApplication { && app.getAppSchedule().getScheduleTimeline().equals(ScheduleTimeline.NONE))) { return; } - if (app.getAppType() == AppType.Internal - && app.getScheduleType().equals(ScheduleType.Scheduled)) { + if (app.getAppType().equals(AppType.Internal) + && (SCHEDULED_TYPES.contains(app.getScheduleType()))) { try { ApplicationHandler.getInstance().removeOldJobs(app); ApplicationHandler.getInstance().migrateQuartzConfig(app); @@ -82,7 +84,7 @@ public class AbstractNativeApplication implements NativeApplication { } scheduleInternal(); } else if (app.getAppType() == AppType.External - && app.getScheduleType().equals(ScheduleType.Scheduled)) { + && (SCHEDULED_TYPES.contains(app.getScheduleType()))) { scheduleExternal(); } } @@ -114,6 +116,7 @@ public class AbstractNativeApplication implements NativeApplication { try { bindExistingIngestionToApplication(ingestionPipelineRepository); + updateAppConfig(ingestionPipelineRepository, this.getApp().getAppConfiguration()); } catch (EntityNotFoundException ex) { ApplicationConfig config = JsonUtils.convertValue(this.getApp().getAppConfiguration(), ApplicationConfig.class); @@ -151,6 +154,17 @@ public class AbstractNativeApplication implements NativeApplication { } } + private void updateAppConfig(IngestionPipelineRepository repository, Object appConfiguration) { + String fqn = FullyQualifiedName.add(SERVICE_NAME, this.getApp().getName()); + IngestionPipeline updated = repository.findByName(fqn, Include.NON_DELETED); + ApplicationPipeline appPipeline = + JsonUtils.convertValue(updated.getSourceConfig().getConfig(), ApplicationPipeline.class); + IngestionPipeline original = JsonUtils.deepCopy(updated, IngestionPipeline.class); + updated.setSourceConfig( + updated.getSourceConfig().withConfig(appPipeline.withAppConfig(appConfiguration))); + repository.update(null, original, updated); + } + private void createAndBindIngestionPipeline( IngestionPipelineRepository ingestionPipelineRepository, ApplicationConfig config) { MetadataServiceRepository serviceEntityRepository = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 7cf4705987e..141b1013b2e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -101,6 +101,8 @@ public class AppResource extends EntityResource { private PipelineServiceClientInterface pipelineServiceClient; static final String FIELDS = "owner"; private SearchRepository searchRepository; + public static List SCHEDULED_TYPES = + List.of(ScheduleType.Scheduled, ScheduleType.ScheduledOrManual); @Override public void initialize(OpenMetadataApplicationConfig config) { @@ -145,7 +147,7 @@ public class AppResource extends EntityResource { } // Schedule - if (app.getScheduleType().equals(ScheduleType.Scheduled)) { + if (SCHEDULED_TYPES.contains(app.getScheduleType())) { ApplicationHandler.getInstance() .installApplication(app, Entity.getCollectionDAO(), searchRepository); } @@ -559,7 +561,7 @@ public class AppResource extends EntityResource { securityContext, getResourceContext(), new OperationContext(Entity.APPLICATION, MetadataOperation.CREATE)); - if (app.getScheduleType().equals(ScheduleType.Scheduled)) { + if (SCHEDULED_TYPES.contains(app.getScheduleType())) { ApplicationHandler.getInstance() .installApplication(app, Entity.getCollectionDAO(), searchRepository); ApplicationHandler.getInstance() @@ -604,7 +606,7 @@ public class AppResource extends EntityResource { AppScheduler.getInstance().deleteScheduledApplication(app); Response response = patchInternal(uriInfo, securityContext, id, patch); App updatedApp = (App) response.getEntity(); - if (app.getScheduleType().equals(ScheduleType.Scheduled)) { + if (SCHEDULED_TYPES.contains(app.getScheduleType())) { ApplicationHandler.getInstance() .installApplication(updatedApp, Entity.getCollectionDAO(), searchRepository); } @@ -648,7 +650,7 @@ public class AppResource extends EntityResource { AppScheduler.getInstance().deleteScheduledApplication(app); Response response = patchInternal(uriInfo, securityContext, fqn, patch); App updatedApp = (App) response.getEntity(); - if (app.getScheduleType().equals(ScheduleType.Scheduled)) { + if (SCHEDULED_TYPES.contains(app.getScheduleType())) { ApplicationHandler.getInstance() .installApplication(updatedApp, Entity.getCollectionDAO(), searchRepository); } @@ -683,7 +685,7 @@ public class AppResource extends EntityResource { new EntityUtil.Fields(repository.getMarketPlace().getAllowedFields())); App app = getApplication(definition, create, securityContext.getUserPrincipal().getName()); AppScheduler.getInstance().deleteScheduledApplication(app); - if (app.getScheduleType().equals(ScheduleType.Scheduled)) { + if (SCHEDULED_TYPES.contains(app.getScheduleType())) { ApplicationHandler.getInstance() .installApplication(app, Entity.getCollectionDAO(), searchRepository); } @@ -782,7 +784,7 @@ public class AppResource extends EntityResource { Response response = restoreEntity(uriInfo, securityContext, restore.getId()); if (response.getStatus() == Response.Status.OK.getStatusCode()) { App app = (App) response.getEntity(); - if (app.getScheduleType().equals(ScheduleType.Scheduled)) { + if (SCHEDULED_TYPES.contains(app.getScheduleType())) { ApplicationHandler.getInstance() .installApplication(app, Entity.getCollectionDAO(), searchRepository); } @@ -818,7 +820,7 @@ public class AppResource extends EntityResource { @Context SecurityContext securityContext) { App app = repository.getByName(uriInfo, name, new EntityUtil.Fields(repository.getAllowedFields())); - if (app.getScheduleType().equals(ScheduleType.Scheduled)) { + if (SCHEDULED_TYPES.contains(app.getScheduleType())) { ApplicationHandler.getInstance() .installApplication(app, repository.getDaoCollection(), searchRepository); return Response.status(Response.Status.OK).entity("App is Scheduled.").build(); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java index ce4d65cb150..83550ebcb38 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java @@ -3,12 +3,16 @@ package org.openmetadata.service.resources.apps; import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.OK; import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS; +import static org.openmetadata.service.util.TestUtils.assertEventually; import static org.openmetadata.service.util.TestUtils.assertResponseContains; import static org.openmetadata.service.util.TestUtils.readResponse; +import io.github.resilience4j.retry.RetryConfig; +import io.github.resilience4j.retry.RetryRegistry; import java.io.IOException; +import java.time.Duration; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.Objects; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; import lombok.SneakyThrows; @@ -26,6 +30,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.resources.EntityResourceTest; import org.openmetadata.service.security.SecurityUtil; +import org.openmetadata.service.util.RetryableAssertionError; import org.openmetadata.service.util.TestUtils; @Slf4j @@ -38,6 +43,14 @@ public class AppsResourceTest extends EntityResourceTest { supportedNameCharacters = "_-."; } + public static RetryRegistry appTriggerRetry = + RetryRegistry.of( + RetryConfig.custom() + .maxAttempts(60) // about 30 seconds + .waitDuration(Duration.ofMillis(500)) + .retryExceptions(RetryableAssertionError.class) + .build()); + @Override @SneakyThrows public CreateApp createRequest(String name) { @@ -78,34 +91,31 @@ public class AppsResourceTest extends EntityResourceTest { } @Test - void post_trigger_app_200() throws HttpResponseException, InterruptedException { - postTriggerApp("SearchIndexingApplication", ADMIN_AUTH_HEADERS); - AppRunRecord latestRun = null; - while (latestRun == null) { - try { - latestRun = getLatestAppRun("SearchIndexingApplication", ADMIN_AUTH_HEADERS); - Thread.sleep(1000); - } catch (HttpResponseException ex) { - LOG.info("Waiting for the app to start running"); - } - } - assert latestRun.getStatus().equals(AppRunRecord.Status.RUNNING); - TimeUnit timeout = TimeUnit.SECONDS; - long timeoutValue = 30; - long startTime = System.currentTimeMillis(); - while (latestRun.getStatus().equals(AppRunRecord.Status.RUNNING)) { - // skip this loop in CI because it causes weird problems - if (TestUtils.isCI()) { - break; - } - assert !latestRun.getStatus().equals(AppRunRecord.Status.FAILED); - if (System.currentTimeMillis() - startTime > timeout.toMillis(timeoutValue)) { - throw new AssertionError( - String.format("Expected the app to succeed within %d %s", timeoutValue, timeout)); - } - TimeUnit.MILLISECONDS.sleep(500); - latestRun = getLatestAppRun("SearchIndexingApplication", ADMIN_AUTH_HEADERS); - } + void post_trigger_app_200() throws HttpResponseException { + String appName = "SearchIndexingApplication"; + postTriggerApp(appName, ADMIN_AUTH_HEADERS); + assertAppRanAfterTrigger(appName); + } + + private void assertAppRanAfterTrigger(String appName) { + assertEventually( + "appIsRunning", + () -> { + try { + assert Objects.nonNull(getLatestAppRun(appName, ADMIN_AUTH_HEADERS)); + } catch (HttpResponseException ex) { + throw new AssertionError(ex); + } + }, + appTriggerRetry); + assertEventually( + "appSuccess", + () -> { + assert getLatestAppRun(appName, ADMIN_AUTH_HEADERS) + .getStatus() + .equals(AppRunRecord.Status.SUCCESS); + }, + appTriggerRetry); } @Test diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java b/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java index 00aa857cef3..273d60f00da 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java @@ -227,10 +227,6 @@ public final class TestUtils { } } - public static boolean isCI() { - return System.getenv("CI") != null; - } - public enum UpdateType { CREATED, // Not updated instead entity was created NO_CHANGE, // PUT/PATCH made no change to the entity and the version remains the same @@ -669,9 +665,14 @@ public final class TestUtils { } public static void assertEventually(String name, CheckedRunnable runnable) { + assertEventually(name, runnable, elasticSearchRetryRegistry); + } + + public static void assertEventually( + String name, CheckedRunnable runnable, RetryRegistry retryRegistry) { try { Retry.decorateCheckedRunnable( - elasticSearchRetryRegistry.retry(name), + retryRegistry.retry(name), () -> { try { runnable.run();