mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-25 17:04:54 +00:00
CRITICAL: fix AbstractOmAppJobListener (#15701)
* fix AbstractOmAppJobListener * added appTrigger test * format * increased test timeout to 30 * added assertion for not failed * add break for loop in case of CI
This commit is contained in:
parent
ce3f124a33
commit
ec4673587d
@ -35,46 +35,33 @@ public abstract class AbstractOmAppJobListener implements JobListener {
|
||||
|
||||
@Override
|
||||
public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {
|
||||
AppRunRecord runRecord;
|
||||
long jobStartTime = System.currentTimeMillis();
|
||||
UUID appID = UUID.fromString("00000000-0000-0000-0000-000000000000");
|
||||
String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
|
||||
String appName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_NAME);
|
||||
App jobApp = collectionDAO.applicationDAO().findEntityByName(appName);
|
||||
ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp);
|
||||
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
|
||||
long jobStartTime = System.currentTimeMillis();
|
||||
AppRunRecord runRecord;
|
||||
boolean update = false;
|
||||
try {
|
||||
App jobApp = collectionDAO.applicationDAO().findEntityById(appID);
|
||||
ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp);
|
||||
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
|
||||
if (jobExecutionContext.isRecovering()) {
|
||||
runRecord =
|
||||
JsonUtils.readValue(
|
||||
collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()),
|
||||
AppRunRecord.class);
|
||||
update = true;
|
||||
} else {
|
||||
runRecord =
|
||||
new AppRunRecord()
|
||||
.withAppId(jobApp.getId())
|
||||
.withStartTime(jobStartTime)
|
||||
.withTimestamp(jobStartTime)
|
||||
.withRunType(runType)
|
||||
.withStatus(AppRunRecord.Status.RUNNING)
|
||||
.withScheduleInfo(jobApp.getAppSchedule());
|
||||
}
|
||||
// Put the Context in the Job Data Map
|
||||
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
|
||||
} catch (Exception ex) {
|
||||
Map<String, Object> failure = new HashMap<>();
|
||||
failure.put("message", "TriggerFailed:" + ex.getMessage());
|
||||
failure.put("jobStackTrace", ExceptionUtils.getStackTrace(ex));
|
||||
if (jobExecutionContext.isRecovering()) {
|
||||
runRecord =
|
||||
JsonUtils.readValue(
|
||||
collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()),
|
||||
AppRunRecord.class);
|
||||
update = true;
|
||||
} else {
|
||||
runRecord =
|
||||
new AppRunRecord()
|
||||
.withAppId(appID)
|
||||
.withRunType(runType)
|
||||
.withStatus(AppRunRecord.Status.FAILED)
|
||||
.withAppId(jobApp.getId())
|
||||
.withStartTime(jobStartTime)
|
||||
.withTimestamp(jobStartTime)
|
||||
.withFailureContext(new FailureContext().withAdditionalProperty("failure", failure));
|
||||
.withRunType(runType)
|
||||
.withStatus(AppRunRecord.Status.RUNNING)
|
||||
.withScheduleInfo(jobApp.getAppSchedule());
|
||||
}
|
||||
// Put the Context in the Job Data Map
|
||||
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
|
||||
|
||||
// Insert new Record Run
|
||||
pushApplicationStatusUpdates(jobExecutionContext, runRecord, update);
|
||||
this.doJobToBeExecuted(jobExecutionContext);
|
||||
|
@ -1,17 +1,23 @@
|
||||
package org.openmetadata.service.resources.apps;
|
||||
|
||||
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
|
||||
import static javax.ws.rs.core.Response.Status.OK;
|
||||
import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS;
|
||||
import static org.openmetadata.service.util.TestUtils.assertResponseContains;
|
||||
import static org.openmetadata.service.util.TestUtils.readResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.Response;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.http.client.HttpResponseException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.openmetadata.schema.entity.app.App;
|
||||
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
|
||||
import org.openmetadata.schema.entity.app.AppRunRecord;
|
||||
import org.openmetadata.schema.entity.app.AppSchedule;
|
||||
import org.openmetadata.schema.entity.app.CreateApp;
|
||||
import org.openmetadata.schema.entity.app.CreateAppMarketPlaceDefinitionReq;
|
||||
@ -19,6 +25,7 @@ import org.openmetadata.schema.entity.app.ScheduleTimeline;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.resources.EntityResourceTest;
|
||||
import org.openmetadata.service.security.SecurityUtil;
|
||||
import org.openmetadata.service.util.TestUtils;
|
||||
|
||||
@Slf4j
|
||||
@ -72,6 +79,30 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
|
||||
"of type SystemApp can not be deleted");
|
||||
}
|
||||
|
||||
@Test
|
||||
void post_trigger_app_200() throws HttpResponseException, InterruptedException {
|
||||
postTriggerApp("SearchIndexingApplication", ADMIN_AUTH_HEADERS);
|
||||
TimeUnit.MILLISECONDS.sleep(200);
|
||||
AppRunRecord latestRun = getLatestAppRun("SearchIndexingApplication", ADMIN_AUTH_HEADERS);
|
||||
assert latestRun.getStatus().equals(AppRunRecord.Status.RUNNING);
|
||||
TimeUnit timeout = TimeUnit.SECONDS;
|
||||
long timeoutValue = 30;
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (latestRun.getStatus().equals(AppRunRecord.Status.RUNNING)) {
|
||||
// skip this loop in CI because it causes weird problems
|
||||
if (TestUtils.isCI()) {
|
||||
break;
|
||||
}
|
||||
assert !latestRun.getStatus().equals(AppRunRecord.Status.FAILED);
|
||||
if (System.currentTimeMillis() - startTime > timeout.toMillis(timeoutValue)) {
|
||||
throw new AssertionError(
|
||||
String.format("Expected the app to succeed within %d %s", timeoutValue, timeout));
|
||||
}
|
||||
TimeUnit.MILLISECONDS.sleep(500);
|
||||
latestRun = getLatestAppRun("SearchIndexingApplication", ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validateCreatedEntity(
|
||||
App createdEntity, CreateApp request, Map<String, String> authHeaders)
|
||||
@ -103,4 +134,17 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
|
||||
public void assertFieldChange(String fieldName, Object expected, Object actual) {
|
||||
assertCommonFieldChange(fieldName, expected, actual);
|
||||
}
|
||||
|
||||
private void postTriggerApp(String appName, Map<String, String> authHeaders)
|
||||
throws HttpResponseException {
|
||||
WebTarget target = getResource("apps/trigger").path(appName);
|
||||
Response response = SecurityUtil.addHeaders(target, authHeaders).post(null);
|
||||
readResponse(response, OK.getStatusCode());
|
||||
}
|
||||
|
||||
private AppRunRecord getLatestAppRun(String appName, Map<String, String> authHeaders)
|
||||
throws HttpResponseException {
|
||||
WebTarget target = getResource(String.format("apps/name/%s/runs/latest", appName));
|
||||
return TestUtils.get(target, AppRunRecord.class, authHeaders);
|
||||
}
|
||||
}
|
||||
|
@ -203,6 +203,10 @@ public final class TestUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isCI() {
|
||||
return System.getenv("CI") != null;
|
||||
}
|
||||
|
||||
public enum UpdateType {
|
||||
CREATED, // Not updated instead entity was created
|
||||
NO_CHANGE, // PUT/PATCH made no change to the entity and the version remains the same
|
||||
|
Loading…
x
Reference in New Issue
Block a user