diff --git a/bootstrap/sql/migrations/native/1.6.0/mysql/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.6.0/mysql/postDataMigrationSQLScript.sql index 85774e35576..f756307bb5a 100644 --- a/bootstrap/sql/migrations/native/1.6.0/mysql/postDataMigrationSQLScript.sql +++ b/bootstrap/sql/migrations/native/1.6.0/mysql/postDataMigrationSQLScript.sql @@ -36,4 +36,9 @@ UPDATE test_suite SET json = JSON_REMOVE(json, '$.testCaseResultSummary'); UPDATE test_case -SET json = JSON_REMOVE(json, '$.testCaseResult'); \ No newline at end of file +SET json = JSON_REMOVE(json, '$.testCaseResult'); + +-- Add Supports interrupts to SearchIndexingApplication +UPDATE installed_apps SET json = JSON_SET(json, '$.supportsInterrupt', true) where name = 'SearchIndexingApplication'; +UPDATE apps_marketplace SET json = JSON_SET(json, '$.supportsInterrupt', true) where name = 'SearchIndexingApplication'; + diff --git a/bootstrap/sql/migrations/native/1.6.0/postgres/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.6.0/postgres/postDataMigrationSQLScript.sql index 206e9c68692..7f911147079 100644 --- a/bootstrap/sql/migrations/native/1.6.0/postgres/postDataMigrationSQLScript.sql +++ b/bootstrap/sql/migrations/native/1.6.0/postgres/postDataMigrationSQLScript.sql @@ -41,3 +41,20 @@ SET json = json - 'testCaseResultSummary'; UPDATE test_case SET json = json - 'testCaseResult'; + +-- Add Supports interrupts to SearchIndexingApplication +UPDATE apps_marketplace +SET json = jsonb_set( + json::jsonb, + '{supportsInterrupt}', + to_jsonb(true) +) +where name = 'SearchIndexingApplication'; + +UPDATE installed_apps +SET json = jsonb_set( + json::jsonb, + '{supportsInterrupt}', + to_jsonb(true) +) +where name = 'SearchIndexingApplication'; \ No newline at end of file diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java index ab10a1107fc..7cc3328c3c5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java @@ -41,6 +41,7 @@ import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.OpenMetadataConnectionBuilder; import org.quartz.JobExecutionContext; import org.quartz.SchedulerException; +import org.quartz.UnableToInterruptJobException; @Getter @Slf4j @@ -48,6 +49,7 @@ public class AbstractNativeApplication implements NativeApplication { protected CollectionDAO collectionDAO; private App app; protected SearchRepository searchRepository; + protected boolean isJobInterrupted = false; // Default service that contains external apps' Ingestion Pipelines private static final String SERVICE_NAME = "OpenMetadata"; @@ -296,4 +298,10 @@ public class AbstractNativeApplication implements NativeApplication { OmAppJobListener listener = getJobListener(jobExecutionContext); listener.pushApplicationStatusUpdates(jobExecutionContext, appRecord, update); } + + @Override + public void interrupt() throws UnableToInterruptJobException { + LOG.info("Interrupting the job for app: {}", this.app.getName()); + isJobInterrupted = true; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/NativeApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/NativeApplication.java index 12afceec62b..0836d8a879d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/NativeApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/NativeApplication.java @@ -1,10 +1,10 @@ package org.openmetadata.service.apps; import org.openmetadata.schema.entity.app.App; -import org.quartz.Job; +import org.quartz.InterruptableJob; import org.quartz.JobExecutionContext; -public interface NativeApplication extends Job { +public interface NativeApplication extends InterruptableJob { void init(App app); void install(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index d05dcd12e0d..977f82cd5e0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -235,7 +235,7 @@ public class SearchIndexApp extends AbstractNativeApplication { reCreateIndexes(paginatedSource.getEntityType()); contextData.put(ENTITY_TYPE_KEY, paginatedSource.getEntityType()); Object resultList; - while (!stopped && !paginatedSource.isDone()) { + while (!isJobInterrupted && !stopped && !paginatedSource.isDone()) { try { resultList = paginatedSource.readNext(null); if (!TIME_SERIES_ENTITIES.contains(paginatedSource.getEntityType())) { @@ -264,6 +264,10 @@ public class SearchIndexApp extends AbstractNativeApplication { paginatedSource.updateStats( rx.getIndexingError().getSuccessCount(), rx.getIndexingError().getFailedCount()); } finally { + if (isJobInterrupted) { + LOG.info("Search Indexing will now return since the Job has been interrupted."); + jobData.setStatus(EventPublisherJob.Status.STOPPED); + } updateStats(paginatedSource.getEntityType(), paginatedSource.getStats()); sendUpdates(jobExecutionContext); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index c8319680733..38bd2c2c870 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -261,4 +261,32 @@ public class AppScheduler { LOG.error("Failed in running job", ex); } } + + public void stopApplicationRun(App application) { + if (application.getFullyQualifiedName() == null) { + throw new IllegalArgumentException("Application's fullyQualifiedName is null."); + } + try { + // Interrupt any scheduled job + JobDetail jobDetailScheduled = + scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP)); + if (jobDetailScheduled != null) { + LOG.debug("Stopping Scheduled Execution for App : {}", application.getName()); + scheduler.interrupt(jobDetailScheduled.getKey()); + } + + // Interrupt any on-demand job + JobDetail jobDetailOnDemand = + scheduler.getJobDetail( + new JobKey( + String.format("%s-%s", application.getName(), ON_DEMAND_JOB), APPS_JOB_GROUP)); + + if (jobDetailOnDemand != null) { + LOG.debug("Stopping On Demand Execution for App : {}", application.getName()); + scheduler.interrupt(jobDetailOnDemand.getKey()); + } + } catch (Exception ex) { + LOG.error("Failed to stop job execution.", ex); + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppMarketPlaceResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppMarketPlaceResource.java index f8dbb6d4ff9..01049f76a36 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppMarketPlaceResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppMarketPlaceResource.java @@ -480,7 +480,8 @@ public class AppMarketPlaceResource .withFeatures(create.getFeatures()) .withSourcePythonClass(create.getSourcePythonClass()) .withAllowConfiguration(create.getAllowConfiguration()) - .withSystem(create.getSystem()); + .withSystem(create.getSystem()) + .withSupportsInterrupt(create.getSupportsInterrupt()); // Validate App validateApplication(app); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 043b69b4328..871008186f6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -730,7 +730,7 @@ public class AppResource extends EntityResource { limits.invalidateCache(entityType); // Remove from Pipeline Service - deleteApp(securityContext, app, hardDelete); + deleteApp(securityContext, app); return deleteByName(uriInfo, securityContext, name, true, hardDelete); } @@ -766,7 +766,7 @@ public class AppResource extends EntityResource { .performCleanup(app, Entity.getCollectionDAO(), searchRepository); // Remove from Pipeline Service - deleteApp(securityContext, app, hardDelete); + deleteApp(securityContext, app); // Remove from repository return delete(uriInfo, securityContext, id, true, hardDelete); } @@ -881,7 +881,7 @@ public class AppResource extends EntityResource { @Operation( operationId = "triggerApplicationRun", summary = "Trigger an Application run", - description = "Trigger a Application run by id.", + description = "Trigger a Application run by name.", responses = { @ApiResponse( responseCode = "200", @@ -905,15 +905,7 @@ public class AppResource extends EntityResource { return Response.status(Response.Status.OK).entity("Application Triggered").build(); } else { if (!app.getPipelines().isEmpty()) { - EntityReference pipelineRef = app.getPipelines().get(0); - IngestionPipelineRepository ingestionPipelineRepository = - (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); - - IngestionPipeline ingestionPipeline = - ingestionPipelineRepository.get( - uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS)); - ingestionPipeline.setOpenMetadataServerConnection(app.getOpenMetadataServerConnection()); - decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true); + IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); ServiceEntityInterface service = Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED); PipelineServiceClientResponse response = @@ -924,6 +916,47 @@ public class AppResource extends EntityResource { throw new BadRequestException("Failed to trigger application."); } + @POST + @Path("/stop/{name}") + @Operation( + operationId = "stopApplicationRun", + summary = "Stop a Application run", + description = "Stop a application run by name.", + responses = { + @ApiResponse( + responseCode = "200", + description = "Application stopped status code", + content = @Content(mediaType = "application/json")), + @ApiResponse( + responseCode = "404", + description = "Application for instance {id} is not found") + }) + public Response stopApplicationRun( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Name of the App", schema = @Schema(type = "string")) + @PathParam("name") + String name) { + EntityUtil.Fields fields = getFields(String.format("%s,bot,pipelines", FIELD_OWNERS)); + App app = repository.getByName(uriInfo, name, fields); + if (Boolean.TRUE.equals(app.getSupportsInterrupt())) { + if (app.getAppType().equals(AppType.Internal)) { + AppScheduler.getInstance().stopApplicationRun(app); + return Response.status(Response.Status.OK) + .entity("Application will be stopped in some time.") + .build(); + } else { + if (!app.getPipelines().isEmpty()) { + IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); + PipelineServiceClientResponse response = + pipelineServiceClient.killIngestion(ingestionPipeline); + return Response.status(response.getCode()).entity(response).build(); + } + } + } + throw new BadRequestException("Application does not support Interrupts."); + } + @POST @Path("/deploy/{name}") @Operation( @@ -953,21 +986,14 @@ public class AppResource extends EntityResource { return Response.status(Response.Status.OK).entity("Application Deployed").build(); } else { if (!app.getPipelines().isEmpty()) { - EntityReference pipelineRef = app.getPipelines().get(0); - IngestionPipelineRepository ingestionPipelineRepository = - (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); - - IngestionPipeline ingestionPipeline = - ingestionPipelineRepository.get( - uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS)); - - ingestionPipeline.setOpenMetadataServerConnection(app.getOpenMetadataServerConnection()); - decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true); + IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); ServiceEntityInterface service = Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED); PipelineServiceClientResponse status = pipelineServiceClient.deployPipeline(ingestionPipeline, service); if (status.getCode() == 200) { + IngestionPipelineRepository ingestionPipelineRepository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); ingestionPipelineRepository.createOrUpdate(uriInfo, ingestionPipeline); } else { ingestionPipeline.setDeployed(false); @@ -1032,7 +1058,8 @@ public class AppResource extends EntityResource { .withFeatures(marketPlaceDefinition.getFeatures()) .withSourcePythonClass(marketPlaceDefinition.getSourcePythonClass()) .withAllowConfiguration(marketPlaceDefinition.getAllowConfiguration()) - .withSystem(marketPlaceDefinition.getSystem()); + .withSystem(marketPlaceDefinition.getSystem()) + .withSupportsInterrupt(marketPlaceDefinition.getSupportsInterrupt()); // validate Bot if provided validateAndAddBot(app, createAppRequest.getBot()); @@ -1048,7 +1075,23 @@ public class AppResource extends EntityResource { } } - private void deleteApp(SecurityContext securityContext, App installedApp, boolean hardDelete) { + private IngestionPipeline getIngestionPipeline( + UriInfo uriInfo, SecurityContext securityContext, App app) { + EntityReference pipelineRef = app.getPipelines().get(0); + IngestionPipelineRepository ingestionPipelineRepository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + + IngestionPipeline ingestionPipeline = + ingestionPipelineRepository.get( + uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS)); + + ingestionPipeline.setOpenMetadataServerConnection(app.getOpenMetadataServerConnection()); + decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true); + + return ingestionPipeline; + } + + private void deleteApp(SecurityContext securityContext, App installedApp) { if (installedApp.getAppType().equals(AppType.Internal)) { try { AppScheduler.getInstance().deleteScheduledApplication(installedApp); @@ -1058,13 +1101,8 @@ public class AppResource extends EntityResource { } } else { if (!nullOrEmpty(installedApp.getPipelines())) { - EntityReference pipelineRef = installedApp.getPipelines().get(0); - IngestionPipelineRepository ingestionPipelineRepository = - (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); - IngestionPipeline ingestionPipeline = - ingestionPipelineRepository.get( - null, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS)); + getIngestionPipeline(null, securityContext, installedApp); try { pipelineServiceClient.deletePipeline(ingestionPipeline); } catch (Exception ex) { diff --git a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json index 1cd3e21d6d5..285124f81f5 100644 --- a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json +++ b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json @@ -50,5 +50,6 @@ "appSchedule": { "scheduleTimeline": "Custom", "cronExpression": "0 0 * * *" - } + }, + "supportsInterrupt": true } diff --git a/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json b/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json index fbb2e167f72..c8bb323a914 100644 --- a/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json +++ b/openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/SearchIndexingApplication.json @@ -15,6 +15,7 @@ "runtime": { "enabled": true }, + "supportsInterrupt": true, "appConfiguration": { "entities": [ "table", diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java index 7437db7365c..2a3ac09f7b9 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/apps/AppsResourceTest.java @@ -335,10 +335,11 @@ public class AppsResourceTest extends EntityResourceTest { void post_trigger_app_200() throws HttpResponseException { String appName = "SearchIndexingApplication"; postTriggerApp(appName, ADMIN_AUTH_HEADERS); - assertAppRanAfterTrigger(appName); + assertAppStatusAvailableAfterTrigger(appName); + assertAppRanAfterTriggerWithStatus(appName, AppRunRecord.Status.SUCCESS); } - private void assertAppRanAfterTrigger(String appName) { + private void assertAppStatusAvailableAfterTrigger(String appName) { assertEventually( "appIsRunning", () -> { @@ -349,12 +350,13 @@ public class AppsResourceTest extends EntityResourceTest { } }, APP_TRIGGER_RETRY); + } + + private void assertAppRanAfterTriggerWithStatus(String appName, AppRunRecord.Status status) { assertEventually( - "appSuccess", + "appStatus", () -> { - assert getLatestAppRun(appName, ADMIN_AUTH_HEADERS) - .getStatus() - .equals(AppRunRecord.Status.SUCCESS); + assert getLatestAppRun(appName, ADMIN_AUTH_HEADERS).getStatus().equals(status); }, APP_TRIGGER_RETRY); } @@ -406,6 +408,13 @@ public class AppsResourceTest extends EntityResourceTest { readResponse(response, OK.getStatusCode()); } + private void postAppStop(String appName, Map authHeaders) + throws HttpResponseException { + WebTarget target = getResource("apps/stop").path(appName); + Response response = SecurityUtil.addHeaders(target, authHeaders).post(null); + readResponse(response, OK.getStatusCode()); + } + private AppRunRecord getLatestAppRun(String appName, Map authHeaders) throws HttpResponseException { WebTarget target = getResource(String.format("apps/name/%s/runs/latest", appName)); diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/app.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/app.json index 743b91fa33f..830224c3f16 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/app.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/app.json @@ -246,6 +246,11 @@ "domain" : { "description": "Domain the asset belongs to. When not set, the asset inherits the domain from the parent it belongs to.", "$ref": "../../type/entityReference.json" + }, + "supportsInterrupt": { + "description": "If the app run can be interrupted as part of the execution.", + "type": "boolean", + "default": false } }, "additionalProperties": false, diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/createAppRequest.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/createAppRequest.json index c0e6377c6eb..76dcc1a6eb5 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/createAppRequest.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/createAppRequest.json @@ -38,6 +38,11 @@ "domain" : { "description": "Fully qualified name of the domain the Table belongs to.", "type": "string" + }, + "supportsInterrupt": { + "description": "If the app run can be interrupted as part of the execution.", + "type": "boolean", + "default": false } }, "additionalProperties": false diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/marketplace/appMarketPlaceDefinition.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/marketplace/appMarketPlaceDefinition.json index fd885dc5a9e..12644408cd5 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/marketplace/appMarketPlaceDefinition.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/marketplace/appMarketPlaceDefinition.json @@ -144,6 +144,11 @@ "domain" : { "description": "Domain the asset belongs to. When not set, the asset inherits the domain from the parent it belongs to.", "$ref": "../../../type/entityReference.json" + }, + "supportsInterrupt": { + "description": "If the app run can be interrupted as part of the execution.", + "type": "boolean", + "default": false } }, "additionalProperties": false, diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/marketplace/createAppMarketPlaceDefinitionReq.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/marketplace/createAppMarketPlaceDefinitionReq.json index c4b7e423250..b5ebcbfc4e2 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/marketplace/createAppMarketPlaceDefinitionReq.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/marketplace/createAppMarketPlaceDefinitionReq.json @@ -106,6 +106,11 @@ "domain" : { "description": "Fully qualified name of the domain the Table belongs to.", "type": "string" + }, + "supportsInterrupt": { + "description": "If the app run can be interrupted as part of the execution.", + "type": "boolean", + "default": false } }, "additionalProperties": false,