diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java index ef5fff42ec7..1c105cf43ff 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java @@ -35,46 +35,33 @@ public abstract class AbstractOmAppJobListener implements JobListener { @Override public void jobToBeExecuted(JobExecutionContext jobExecutionContext) { - AppRunRecord runRecord; - long jobStartTime = System.currentTimeMillis(); - UUID appID = UUID.fromString("00000000-0000-0000-0000-000000000000"); String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); + String appName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_NAME); + App jobApp = collectionDAO.applicationDAO().findEntityByName(appName); + ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); + JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + long jobStartTime = System.currentTimeMillis(); + AppRunRecord runRecord; boolean update = false; - try { - App jobApp = collectionDAO.applicationDAO().findEntityById(appID); - ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); - JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); - if (jobExecutionContext.isRecovering()) { - runRecord = - JsonUtils.readValue( - collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()), - AppRunRecord.class); - update = true; - } else { - runRecord = - new AppRunRecord() - .withAppId(jobApp.getId()) - .withStartTime(jobStartTime) - .withTimestamp(jobStartTime) - .withRunType(runType) - .withStatus(AppRunRecord.Status.RUNNING) - .withScheduleInfo(jobApp.getAppSchedule()); - } - // Put the Context in the Job Data Map - dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); - } catch (Exception ex) { - Map failure = new HashMap<>(); - failure.put("message", "TriggerFailed:" + ex.getMessage()); - failure.put("jobStackTrace", ExceptionUtils.getStackTrace(ex)); + if (jobExecutionContext.isRecovering()) { + runRecord = + JsonUtils.readValue( + collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()), + AppRunRecord.class); + update = true; + } else { runRecord = new AppRunRecord() - .withAppId(appID) - .withRunType(runType) - .withStatus(AppRunRecord.Status.FAILED) + .withAppId(jobApp.getId()) .withStartTime(jobStartTime) .withTimestamp(jobStartTime) - .withFailureContext(new FailureContext().withAdditionalProperty("failure", failure)); + .withRunType(runType) + .withStatus(AppRunRecord.Status.RUNNING) + .withScheduleInfo(jobApp.getAppSchedule()); } + // Put the Context in the Job Data Map + dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); + // Insert new Record Run pushApplicationStatusUpdates(jobExecutionContext, runRecord, update); this.doJobToBeExecuted(jobExecutionContext); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java index 940bbcb3f04..545a46dde05 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java @@ -1,17 +1,23 @@ package org.openmetadata.service.resources.apps; import static javax.ws.rs.core.Response.Status.BAD_REQUEST; +import static javax.ws.rs.core.Response.Status.OK; import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS; import static org.openmetadata.service.util.TestUtils.assertResponseContains; +import static org.openmetadata.service.util.TestUtils.readResponse; import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.HttpResponseException; import org.junit.jupiter.api.Test; import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition; +import org.openmetadata.schema.entity.app.AppRunRecord; import org.openmetadata.schema.entity.app.AppSchedule; import org.openmetadata.schema.entity.app.CreateApp; import org.openmetadata.schema.entity.app.CreateAppMarketPlaceDefinitionReq; @@ -19,6 +25,7 @@ import org.openmetadata.schema.entity.app.ScheduleTimeline; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.resources.EntityResourceTest; +import org.openmetadata.service.security.SecurityUtil; import org.openmetadata.service.util.TestUtils; @Slf4j @@ -72,6 +79,30 @@ public class AppsResourceTest extends EntityResourceTest { "of type SystemApp can not be deleted"); } + @Test + void post_trigger_app_200() throws HttpResponseException, InterruptedException { + postTriggerApp("SearchIndexingApplication", ADMIN_AUTH_HEADERS); + TimeUnit.MILLISECONDS.sleep(200); + AppRunRecord latestRun = getLatestAppRun("SearchIndexingApplication", ADMIN_AUTH_HEADERS); + assert latestRun.getStatus().equals(AppRunRecord.Status.RUNNING); + TimeUnit timeout = TimeUnit.SECONDS; + long timeoutValue = 30; + long startTime = System.currentTimeMillis(); + while (latestRun.getStatus().equals(AppRunRecord.Status.RUNNING)) { + // skip this loop in CI because it causes weird problems + if (TestUtils.isCI()) { + break; + } + assert !latestRun.getStatus().equals(AppRunRecord.Status.FAILED); + if (System.currentTimeMillis() - startTime > timeout.toMillis(timeoutValue)) { + throw new AssertionError( + String.format("Expected the app to succeed within %d %s", timeoutValue, timeout)); + } + TimeUnit.MILLISECONDS.sleep(500); + latestRun = getLatestAppRun("SearchIndexingApplication", ADMIN_AUTH_HEADERS); + } + } + @Override public void validateCreatedEntity( App createdEntity, CreateApp request, Map authHeaders) @@ -103,4 +134,17 @@ public class AppsResourceTest extends EntityResourceTest { public void assertFieldChange(String fieldName, Object expected, Object actual) { assertCommonFieldChange(fieldName, expected, actual); } + + private void postTriggerApp(String appName, Map authHeaders) + throws HttpResponseException { + WebTarget target = getResource("apps/trigger").path(appName); + Response response = SecurityUtil.addHeaders(target, authHeaders).post(null); + readResponse(response, OK.getStatusCode()); + } + + private AppRunRecord getLatestAppRun(String appName, Map authHeaders) + throws HttpResponseException { + WebTarget target = getResource(String.format("apps/name/%s/runs/latest", appName)); + return TestUtils.get(target, AppRunRecord.class, authHeaders); + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java b/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java index 81ab4924a44..18ce3f5d78f 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java @@ -203,6 +203,10 @@ public final class TestUtils { } } + public static boolean isCI() { + return System.getenv("CI") != null; + } + public enum UpdateType { CREATED, // Not updated instead entity was created NO_CHANGE, // PUT/PATCH made no change to the entity and the version remains the same