MINOR: fix: scheduled apps (#17167)

* fix: scheduled apps

run scheduling procedures for apps with Scheduled or ScheduledOrManual schedule type

* format
This commit is contained in:
Imri Paran 2024-07-25 15:46:01 +02:00 committed by GitHub
parent 70f0268e26
commit 012aa9b804
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 71 additions and 44 deletions

View File

@ -3,6 +3,7 @@ package org.openmetadata.service.apps;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME;
import static org.openmetadata.service.exception.CatalogExceptionMessage.NO_MANUAL_TRIGGER_ERR;
import static org.openmetadata.service.resources.apps.AppResource.SCHEDULED_TYPES;
import java.util.List;
import lombok.Getter;
@ -24,6 +25,7 @@ import org.openmetadata.schema.metadataIngestion.ApplicationPipeline;
import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
@ -68,8 +70,8 @@ public class AbstractNativeApplication implements NativeApplication {
&& app.getAppSchedule().getScheduleTimeline().equals(ScheduleTimeline.NONE))) {
return;
}
if (app.getAppType() == AppType.Internal
&& app.getScheduleType().equals(ScheduleType.Scheduled)) {
if (app.getAppType().equals(AppType.Internal)
&& (SCHEDULED_TYPES.contains(app.getScheduleType()))) {
try {
ApplicationHandler.getInstance().removeOldJobs(app);
ApplicationHandler.getInstance().migrateQuartzConfig(app);
@ -82,7 +84,7 @@ public class AbstractNativeApplication implements NativeApplication {
}
scheduleInternal();
} else if (app.getAppType() == AppType.External
&& app.getScheduleType().equals(ScheduleType.Scheduled)) {
&& (SCHEDULED_TYPES.contains(app.getScheduleType()))) {
scheduleExternal();
}
}
@ -114,6 +116,7 @@ public class AbstractNativeApplication implements NativeApplication {
try {
bindExistingIngestionToApplication(ingestionPipelineRepository);
updateAppConfig(ingestionPipelineRepository, this.getApp().getAppConfiguration());
} catch (EntityNotFoundException ex) {
ApplicationConfig config =
JsonUtils.convertValue(this.getApp().getAppConfiguration(), ApplicationConfig.class);
@ -151,6 +154,17 @@ public class AbstractNativeApplication implements NativeApplication {
}
}
private void updateAppConfig(IngestionPipelineRepository repository, Object appConfiguration) {
String fqn = FullyQualifiedName.add(SERVICE_NAME, this.getApp().getName());
IngestionPipeline updated = repository.findByName(fqn, Include.NON_DELETED);
ApplicationPipeline appPipeline =
JsonUtils.convertValue(updated.getSourceConfig().getConfig(), ApplicationPipeline.class);
IngestionPipeline original = JsonUtils.deepCopy(updated, IngestionPipeline.class);
updated.setSourceConfig(
updated.getSourceConfig().withConfig(appPipeline.withAppConfig(appConfiguration)));
repository.update(null, original, updated);
}
private void createAndBindIngestionPipeline(
IngestionPipelineRepository ingestionPipelineRepository, ApplicationConfig config) {
MetadataServiceRepository serviceEntityRepository =

View File

@ -101,6 +101,8 @@ public class AppResource extends EntityResource<App, AppRepository> {
private PipelineServiceClientInterface pipelineServiceClient;
static final String FIELDS = "owner";
private SearchRepository searchRepository;
public static List<ScheduleType> SCHEDULED_TYPES =
List.of(ScheduleType.Scheduled, ScheduleType.ScheduledOrManual);
@Override
public void initialize(OpenMetadataApplicationConfig config) {
@ -145,7 +147,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
}
// Schedule
if (app.getScheduleType().equals(ScheduleType.Scheduled)) {
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(app, Entity.getCollectionDAO(), searchRepository);
}
@ -559,7 +561,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
securityContext,
getResourceContext(),
new OperationContext(Entity.APPLICATION, MetadataOperation.CREATE));
if (app.getScheduleType().equals(ScheduleType.Scheduled)) {
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(app, Entity.getCollectionDAO(), searchRepository);
ApplicationHandler.getInstance()
@ -604,7 +606,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
AppScheduler.getInstance().deleteScheduledApplication(app);
Response response = patchInternal(uriInfo, securityContext, id, patch);
App updatedApp = (App) response.getEntity();
if (app.getScheduleType().equals(ScheduleType.Scheduled)) {
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(updatedApp, Entity.getCollectionDAO(), searchRepository);
}
@ -648,7 +650,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
AppScheduler.getInstance().deleteScheduledApplication(app);
Response response = patchInternal(uriInfo, securityContext, fqn, patch);
App updatedApp = (App) response.getEntity();
if (app.getScheduleType().equals(ScheduleType.Scheduled)) {
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(updatedApp, Entity.getCollectionDAO(), searchRepository);
}
@ -683,7 +685,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
new EntityUtil.Fields(repository.getMarketPlace().getAllowedFields()));
App app = getApplication(definition, create, securityContext.getUserPrincipal().getName());
AppScheduler.getInstance().deleteScheduledApplication(app);
if (app.getScheduleType().equals(ScheduleType.Scheduled)) {
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(app, Entity.getCollectionDAO(), searchRepository);
}
@ -782,7 +784,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
Response response = restoreEntity(uriInfo, securityContext, restore.getId());
if (response.getStatus() == Response.Status.OK.getStatusCode()) {
App app = (App) response.getEntity();
if (app.getScheduleType().equals(ScheduleType.Scheduled)) {
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(app, Entity.getCollectionDAO(), searchRepository);
}
@ -818,7 +820,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
@Context SecurityContext securityContext) {
App app =
repository.getByName(uriInfo, name, new EntityUtil.Fields(repository.getAllowedFields()));
if (app.getScheduleType().equals(ScheduleType.Scheduled)) {
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(app, repository.getDaoCollection(), searchRepository);
return Response.status(Response.Status.OK).entity("App is Scheduled.").build();

View File

@ -3,12 +3,16 @@ package org.openmetadata.service.resources.apps;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS;
import static org.openmetadata.service.util.TestUtils.assertEventually;
import static org.openmetadata.service.util.TestUtils.assertResponseContains;
import static org.openmetadata.service.util.TestUtils.readResponse;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Objects;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import lombok.SneakyThrows;
@ -26,6 +30,7 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.resources.EntityResourceTest;
import org.openmetadata.service.security.SecurityUtil;
import org.openmetadata.service.util.RetryableAssertionError;
import org.openmetadata.service.util.TestUtils;
@Slf4j
@ -38,6 +43,14 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
supportedNameCharacters = "_-.";
}
public static RetryRegistry appTriggerRetry =
RetryRegistry.of(
RetryConfig.custom()
.maxAttempts(60) // about 30 seconds
.waitDuration(Duration.ofMillis(500))
.retryExceptions(RetryableAssertionError.class)
.build());
@Override
@SneakyThrows
public CreateApp createRequest(String name) {
@ -78,34 +91,31 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
}
@Test
void post_trigger_app_200() throws HttpResponseException, InterruptedException {
postTriggerApp("SearchIndexingApplication", ADMIN_AUTH_HEADERS);
AppRunRecord latestRun = null;
while (latestRun == null) {
try {
latestRun = getLatestAppRun("SearchIndexingApplication", ADMIN_AUTH_HEADERS);
Thread.sleep(1000);
} catch (HttpResponseException ex) {
LOG.info("Waiting for the app to start running");
}
}
assert latestRun.getStatus().equals(AppRunRecord.Status.RUNNING);
TimeUnit timeout = TimeUnit.SECONDS;
long timeoutValue = 30;
long startTime = System.currentTimeMillis();
while (latestRun.getStatus().equals(AppRunRecord.Status.RUNNING)) {
// skip this loop in CI because it causes weird problems
if (TestUtils.isCI()) {
break;
}
assert !latestRun.getStatus().equals(AppRunRecord.Status.FAILED);
if (System.currentTimeMillis() - startTime > timeout.toMillis(timeoutValue)) {
throw new AssertionError(
String.format("Expected the app to succeed within %d %s", timeoutValue, timeout));
}
TimeUnit.MILLISECONDS.sleep(500);
latestRun = getLatestAppRun("SearchIndexingApplication", ADMIN_AUTH_HEADERS);
}
void post_trigger_app_200() throws HttpResponseException {
String appName = "SearchIndexingApplication";
postTriggerApp(appName, ADMIN_AUTH_HEADERS);
assertAppRanAfterTrigger(appName);
}
private void assertAppRanAfterTrigger(String appName) {
assertEventually(
"appIsRunning",
() -> {
try {
assert Objects.nonNull(getLatestAppRun(appName, ADMIN_AUTH_HEADERS));
} catch (HttpResponseException ex) {
throw new AssertionError(ex);
}
},
appTriggerRetry);
assertEventually(
"appSuccess",
() -> {
assert getLatestAppRun(appName, ADMIN_AUTH_HEADERS)
.getStatus()
.equals(AppRunRecord.Status.SUCCESS);
},
appTriggerRetry);
}
@Test

View File

@ -227,10 +227,6 @@ public final class TestUtils {
}
}
public static boolean isCI() {
return System.getenv("CI") != null;
}
public enum UpdateType {
CREATED, // Not updated instead entity was created
NO_CHANGE, // PUT/PATCH made no change to the entity and the version remains the same
@ -669,9 +665,14 @@ public final class TestUtils {
}
public static void assertEventually(String name, CheckedRunnable runnable) {
assertEventually(name, runnable, elasticSearchRetryRegistry);
}
public static void assertEventually(
String name, CheckedRunnable runnable, RetryRegistry retryRegistry) {
try {
Retry.decorateCheckedRunnable(
elasticSearchRetryRegistry.retry(name),
retryRegistry.retry(name),
() -> {
try {
runnable.run();