[Issue-16487] Add Stop for Search Indexing (#17914)

* Add Stop for Search Indexing

* Update Logs and Message

* test for stop func

* remove test
This commit is contained in:
Mohit Yadav 2024-09-25 18:29:32 +05:30 committed by GitHub
parent 194691b251
commit afd7887bd8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 174 additions and 42 deletions

View File

@ -36,4 +36,9 @@ UPDATE test_suite
SET json = JSON_REMOVE(json, '$.testCaseResultSummary');
UPDATE test_case
SET json = JSON_REMOVE(json, '$.testCaseResult');
SET json = JSON_REMOVE(json, '$.testCaseResult');
-- Add Supports interrupts to SearchIndexingApplication
UPDATE installed_apps SET json = JSON_SET(json, '$.supportsInterrupt', true) where name = 'SearchIndexingApplication';
UPDATE apps_marketplace SET json = JSON_SET(json, '$.supportsInterrupt', true) where name = 'SearchIndexingApplication';

View File

@ -41,3 +41,20 @@ SET json = json - 'testCaseResultSummary';
UPDATE test_case
SET json = json - 'testCaseResult';
-- Add Supports interrupts to SearchIndexingApplication
UPDATE apps_marketplace
SET json = jsonb_set(
json::jsonb,
'{supportsInterrupt}',
to_jsonb(true)
)
where name = 'SearchIndexingApplication';
UPDATE installed_apps
SET json = jsonb_set(
json::jsonb,
'{supportsInterrupt}',
to_jsonb(true)
)
where name = 'SearchIndexingApplication';

View File

@ -41,6 +41,7 @@ import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
import org.quartz.JobExecutionContext;
import org.quartz.SchedulerException;
import org.quartz.UnableToInterruptJobException;
@Getter
@Slf4j
@ -48,6 +49,7 @@ public class AbstractNativeApplication implements NativeApplication {
protected CollectionDAO collectionDAO;
private App app;
protected SearchRepository searchRepository;
protected boolean isJobInterrupted = false;
// Default service that contains external apps' Ingestion Pipelines
private static final String SERVICE_NAME = "OpenMetadata";
@ -296,4 +298,10 @@ public class AbstractNativeApplication implements NativeApplication {
OmAppJobListener listener = getJobListener(jobExecutionContext);
listener.pushApplicationStatusUpdates(jobExecutionContext, appRecord, update);
}
@Override
public void interrupt() throws UnableToInterruptJobException {
LOG.info("Interrupting the job for app: {}", this.app.getName());
isJobInterrupted = true;
}
}

View File

@ -1,10 +1,10 @@
package org.openmetadata.service.apps;
import org.openmetadata.schema.entity.app.App;
import org.quartz.Job;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
public interface NativeApplication extends Job {
public interface NativeApplication extends InterruptableJob {
void init(App app);
void install();

View File

@ -235,7 +235,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
reCreateIndexes(paginatedSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedSource.getEntityType());
Object resultList;
while (!stopped && !paginatedSource.isDone()) {
while (!isJobInterrupted && !stopped && !paginatedSource.isDone()) {
try {
resultList = paginatedSource.readNext(null);
if (!TIME_SERIES_ENTITIES.contains(paginatedSource.getEntityType())) {
@ -264,6 +264,10 @@ public class SearchIndexApp extends AbstractNativeApplication {
paginatedSource.updateStats(
rx.getIndexingError().getSuccessCount(), rx.getIndexingError().getFailedCount());
} finally {
if (isJobInterrupted) {
LOG.info("Search Indexing will now return since the Job has been interrupted.");
jobData.setStatus(EventPublisherJob.Status.STOPPED);
}
updateStats(paginatedSource.getEntityType(), paginatedSource.getStats());
sendUpdates(jobExecutionContext);
}

View File

@ -261,4 +261,32 @@ public class AppScheduler {
LOG.error("Failed in running job", ex);
}
}
public void stopApplicationRun(App application) {
if (application.getFullyQualifiedName() == null) {
throw new IllegalArgumentException("Application's fullyQualifiedName is null.");
}
try {
// Interrupt any scheduled job
JobDetail jobDetailScheduled =
scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP));
if (jobDetailScheduled != null) {
LOG.debug("Stopping Scheduled Execution for App : {}", application.getName());
scheduler.interrupt(jobDetailScheduled.getKey());
}
// Interrupt any on-demand job
JobDetail jobDetailOnDemand =
scheduler.getJobDetail(
new JobKey(
String.format("%s-%s", application.getName(), ON_DEMAND_JOB), APPS_JOB_GROUP));
if (jobDetailOnDemand != null) {
LOG.debug("Stopping On Demand Execution for App : {}", application.getName());
scheduler.interrupt(jobDetailOnDemand.getKey());
}
} catch (Exception ex) {
LOG.error("Failed to stop job execution.", ex);
}
}
}

View File

@ -480,7 +480,8 @@ public class AppMarketPlaceResource
.withFeatures(create.getFeatures())
.withSourcePythonClass(create.getSourcePythonClass())
.withAllowConfiguration(create.getAllowConfiguration())
.withSystem(create.getSystem());
.withSystem(create.getSystem())
.withSupportsInterrupt(create.getSupportsInterrupt());
// Validate App
validateApplication(app);

View File

@ -730,7 +730,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
limits.invalidateCache(entityType);
// Remove from Pipeline Service
deleteApp(securityContext, app, hardDelete);
deleteApp(securityContext, app);
return deleteByName(uriInfo, securityContext, name, true, hardDelete);
}
@ -766,7 +766,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
.performCleanup(app, Entity.getCollectionDAO(), searchRepository);
// Remove from Pipeline Service
deleteApp(securityContext, app, hardDelete);
deleteApp(securityContext, app);
// Remove from repository
return delete(uriInfo, securityContext, id, true, hardDelete);
}
@ -881,7 +881,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
@Operation(
operationId = "triggerApplicationRun",
summary = "Trigger an Application run",
description = "Trigger a Application run by id.",
description = "Trigger a Application run by name.",
responses = {
@ApiResponse(
responseCode = "200",
@ -905,15 +905,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
return Response.status(Response.Status.OK).entity("Application Triggered").build();
} else {
if (!app.getPipelines().isEmpty()) {
EntityReference pipelineRef = app.getPipelines().get(0);
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
ingestionPipeline.setOpenMetadataServerConnection(app.getOpenMetadataServerConnection());
decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true);
IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app);
ServiceEntityInterface service =
Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED);
PipelineServiceClientResponse response =
@ -924,6 +916,47 @@ public class AppResource extends EntityResource<App, AppRepository> {
throw new BadRequestException("Failed to trigger application.");
}
@POST
@Path("/stop/{name}")
@Operation(
operationId = "stopApplicationRun",
summary = "Stop a Application run",
description = "Stop a application run by name.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Application stopped status code",
content = @Content(mediaType = "application/json")),
@ApiResponse(
responseCode = "404",
description = "Application for instance {id} is not found")
})
public Response stopApplicationRun(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Name of the App", schema = @Schema(type = "string"))
@PathParam("name")
String name) {
EntityUtil.Fields fields = getFields(String.format("%s,bot,pipelines", FIELD_OWNERS));
App app = repository.getByName(uriInfo, name, fields);
if (Boolean.TRUE.equals(app.getSupportsInterrupt())) {
if (app.getAppType().equals(AppType.Internal)) {
AppScheduler.getInstance().stopApplicationRun(app);
return Response.status(Response.Status.OK)
.entity("Application will be stopped in some time.")
.build();
} else {
if (!app.getPipelines().isEmpty()) {
IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app);
PipelineServiceClientResponse response =
pipelineServiceClient.killIngestion(ingestionPipeline);
return Response.status(response.getCode()).entity(response).build();
}
}
}
throw new BadRequestException("Application does not support Interrupts.");
}
@POST
@Path("/deploy/{name}")
@Operation(
@ -953,21 +986,14 @@ public class AppResource extends EntityResource<App, AppRepository> {
return Response.status(Response.Status.OK).entity("Application Deployed").build();
} else {
if (!app.getPipelines().isEmpty()) {
EntityReference pipelineRef = app.getPipelines().get(0);
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
ingestionPipeline.setOpenMetadataServerConnection(app.getOpenMetadataServerConnection());
decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true);
IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app);
ServiceEntityInterface service =
Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED);
PipelineServiceClientResponse status =
pipelineServiceClient.deployPipeline(ingestionPipeline, service);
if (status.getCode() == 200) {
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
ingestionPipelineRepository.createOrUpdate(uriInfo, ingestionPipeline);
} else {
ingestionPipeline.setDeployed(false);
@ -1032,7 +1058,8 @@ public class AppResource extends EntityResource<App, AppRepository> {
.withFeatures(marketPlaceDefinition.getFeatures())
.withSourcePythonClass(marketPlaceDefinition.getSourcePythonClass())
.withAllowConfiguration(marketPlaceDefinition.getAllowConfiguration())
.withSystem(marketPlaceDefinition.getSystem());
.withSystem(marketPlaceDefinition.getSystem())
.withSupportsInterrupt(marketPlaceDefinition.getSupportsInterrupt());
// validate Bot if provided
validateAndAddBot(app, createAppRequest.getBot());
@ -1048,7 +1075,23 @@ public class AppResource extends EntityResource<App, AppRepository> {
}
}
private void deleteApp(SecurityContext securityContext, App installedApp, boolean hardDelete) {
private IngestionPipeline getIngestionPipeline(
UriInfo uriInfo, SecurityContext securityContext, App app) {
EntityReference pipelineRef = app.getPipelines().get(0);
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
ingestionPipeline.setOpenMetadataServerConnection(app.getOpenMetadataServerConnection());
decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true);
return ingestionPipeline;
}
private void deleteApp(SecurityContext securityContext, App installedApp) {
if (installedApp.getAppType().equals(AppType.Internal)) {
try {
AppScheduler.getInstance().deleteScheduledApplication(installedApp);
@ -1058,13 +1101,8 @@ public class AppResource extends EntityResource<App, AppRepository> {
}
} else {
if (!nullOrEmpty(installedApp.getPipelines())) {
EntityReference pipelineRef = installedApp.getPipelines().get(0);
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
null, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
getIngestionPipeline(null, securityContext, installedApp);
try {
pipelineServiceClient.deletePipeline(ingestionPipeline);
} catch (Exception ex) {

View File

@ -50,5 +50,6 @@
"appSchedule": {
"scheduleTimeline": "Custom",
"cronExpression": "0 0 * * *"
}
},
"supportsInterrupt": true
}

View File

@ -15,6 +15,7 @@
"runtime": {
"enabled": true
},
"supportsInterrupt": true,
"appConfiguration": {
"entities": [
"table",

View File

@ -335,10 +335,11 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
void post_trigger_app_200() throws HttpResponseException {
String appName = "SearchIndexingApplication";
postTriggerApp(appName, ADMIN_AUTH_HEADERS);
assertAppRanAfterTrigger(appName);
assertAppStatusAvailableAfterTrigger(appName);
assertAppRanAfterTriggerWithStatus(appName, AppRunRecord.Status.SUCCESS);
}
private void assertAppRanAfterTrigger(String appName) {
private void assertAppStatusAvailableAfterTrigger(String appName) {
assertEventually(
"appIsRunning",
() -> {
@ -349,12 +350,13 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
}
},
APP_TRIGGER_RETRY);
}
private void assertAppRanAfterTriggerWithStatus(String appName, AppRunRecord.Status status) {
assertEventually(
"appSuccess",
"appStatus",
() -> {
assert getLatestAppRun(appName, ADMIN_AUTH_HEADERS)
.getStatus()
.equals(AppRunRecord.Status.SUCCESS);
assert getLatestAppRun(appName, ADMIN_AUTH_HEADERS).getStatus().equals(status);
},
APP_TRIGGER_RETRY);
}
@ -406,6 +408,13 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
readResponse(response, OK.getStatusCode());
}
private void postAppStop(String appName, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("apps/stop").path(appName);
Response response = SecurityUtil.addHeaders(target, authHeaders).post(null);
readResponse(response, OK.getStatusCode());
}
private AppRunRecord getLatestAppRun(String appName, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource(String.format("apps/name/%s/runs/latest", appName));

View File

@ -246,6 +246,11 @@
"domain" : {
"description": "Domain the asset belongs to. When not set, the asset inherits the domain from the parent it belongs to.",
"$ref": "../../type/entityReference.json"
},
"supportsInterrupt": {
"description": "If the app run can be interrupted as part of the execution.",
"type": "boolean",
"default": false
}
},
"additionalProperties": false,

View File

@ -38,6 +38,11 @@
"domain" : {
"description": "Fully qualified name of the domain the Table belongs to.",
"type": "string"
},
"supportsInterrupt": {
"description": "If the app run can be interrupted as part of the execution.",
"type": "boolean",
"default": false
}
},
"additionalProperties": false

View File

@ -144,6 +144,11 @@
"domain" : {
"description": "Domain the asset belongs to. When not set, the asset inherits the domain from the parent it belongs to.",
"$ref": "../../../type/entityReference.json"
},
"supportsInterrupt": {
"description": "If the app run can be interrupted as part of the execution.",
"type": "boolean",
"default": false
}
},
"additionalProperties": false,

View File

@ -106,6 +106,11 @@
"domain" : {
"description": "Fully qualified name of the domain the Table belongs to.",
"type": "string"
},
"supportsInterrupt": {
"description": "If the app run can be interrupted as part of the execution.",
"type": "boolean",
"default": false
}
},
"additionalProperties": false,