[Fix] Application Fixes, Updated AppRunType and Removed Server App Inits (#15649)

* - Fix App Run Type
- Remove Init on App Start
- Migrate stats setup for SearchIndexing to execution

* fix the AppLogsViewer unit test failing due to appRunType type changes

---------

Co-authored-by: Aniket Katkar <aniketkatkar97@gmail.com>
This commit is contained in:
Mohit Yadav 2024-03-22 08:48:23 +05:30 committed by GitHub
parent 4ddcc0375c
commit f03ae2d6ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 56 additions and 94 deletions

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.apps.bundles.searchIndex;
import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS; import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;
@ -19,7 +20,6 @@ import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord; import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppRunType;
import org.openmetadata.schema.entity.app.FailureContext; import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext; import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
@ -110,58 +110,23 @@ public class SearchIndexApp extends AbstractNativeApplication {
if (request.getEntities().contains(ALL)) { if (request.getEntities().contains(ALL)) {
request.setEntities(ALL_ENTITIES); request.setEntities(ALL_ENTITIES);
} }
int totalRecords = getTotalRequestToProcess(request.getEntities(), collectionDAO); jobData = request;
this.jobData = request;
this.jobData.setStats(
new Stats()
.withJobStats(
new StepStats()
.withTotalRecords(totalRecords)
.withFailedRecords(0)
.withSuccessRecords(0)));
request
.getEntities()
.forEach(
entityType -> {
if (!isDataInsightIndex(entityType)) {
List<String> fields = List.of("*");
PaginatedEntitiesSource source =
new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields);
if (!CommonUtil.nullOrEmpty(request.getAfterCursor())) {
source.setCursor(request.getAfterCursor());
}
paginatedEntitiesSources.add(source);
} else {
paginatedDataInsightSources.add(
new PaginatedDataInsightSource(
collectionDAO, entityType, jobData.getBatchSize()));
}
});
if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords);
this.dataInsightProcessor = new OpenSearchDataInsightProcessor(totalRecords);
this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords);
} else {
this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords);
this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(totalRecords);
this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords);
}
} }
@Override @Override
public void startApp(JobExecutionContext jobExecutionContext) { public void startApp(JobExecutionContext jobExecutionContext) {
try { try {
initializeJob();
LOG.info("Executing Reindexing Job with JobData : {}", jobData); LOG.info("Executing Reindexing Job with JobData : {}", jobData);
// Update Job Status // Update Job Status
jobData.setStatus(EventPublisherJob.Status.RUNNING); jobData.setStatus(EventPublisherJob.Status.RUNNING);
// Make recreate as false for onDemand // Make recreate as false for onDemand
AppRunType runType = String runType =
AppRunType.fromValue( (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
(String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"));
// Schedule Run has recreate as false always // Schedule Run has re-create set to false
if (runType.equals(AppRunType.Scheduled)) { if (!runType.equals(ON_DEMAND_JOB)) {
jobData.setRecreateIndex(false); jobData.setRecreateIndex(false);
} }
@ -187,6 +152,44 @@ public class SearchIndexApp extends AbstractNativeApplication {
} }
} }
private void initializeJob() {
int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO);
this.jobData.setStats(
new Stats()
.withJobStats(
new StepStats()
.withTotalRecords(totalRecords)
.withFailedRecords(0)
.withSuccessRecords(0)));
jobData
.getEntities()
.forEach(
entityType -> {
if (!isDataInsightIndex(entityType)) {
List<String> fields = List.of("*");
PaginatedEntitiesSource source =
new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields);
if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) {
source.setCursor(jobData.getAfterCursor());
}
paginatedEntitiesSources.add(source);
} else {
paginatedDataInsightSources.add(
new PaginatedDataInsightSource(
collectionDAO, entityType, jobData.getBatchSize()));
}
});
if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords);
this.dataInsightProcessor = new OpenSearchDataInsightProcessor(totalRecords);
this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords);
} else {
this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords);
this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(totalRecords);
this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords);
}
}
public void updateRecordToDb(JobExecutionContext jobExecutionContext) { public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
AppRunRecord appRecord = getJobRecord(jobExecutionContext); AppRunRecord appRecord = getJobRecord(jobExecutionContext);

View File

@ -8,7 +8,6 @@ import java.util.UUID;
import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord; import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppRunType;
import org.openmetadata.schema.entity.app.FailureContext; import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext; import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.CollectionDAO;
@ -35,9 +34,7 @@ public abstract class AbstractOmAppJobListener implements JobListener {
@Override @Override
public void jobToBeExecuted(JobExecutionContext jobExecutionContext) { public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {
AppRunType runType = String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
AppRunType.fromValue(
(String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"));
App jobApp = App jobApp =
JsonUtils.readOrConvertValue( JsonUtils.readOrConvertValue(
jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class);

View File

@ -17,7 +17,6 @@ import lombok.extern.slf4j.Slf4j;
import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.AppRuntime; import org.openmetadata.schema.AppRuntime;
import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunType;
import org.openmetadata.schema.entity.app.AppSchedule; import org.openmetadata.schema.entity.app.AppSchedule;
import org.openmetadata.schema.entity.app.ScheduleTimeline; import org.openmetadata.schema.entity.app.ScheduleTimeline;
import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.OpenMetadataApplicationConfig;
@ -44,6 +43,7 @@ import org.quartz.impl.StdSchedulerFactory;
@Slf4j @Slf4j
public class AppScheduler { public class AppScheduler {
private static final Map<String, String> defaultAppScheduleConfig = new HashMap<>(); private static final Map<String, String> defaultAppScheduleConfig = new HashMap<>();
public static final String ON_DEMAND_JOB = "OnDemandJob";
static { static {
defaultAppScheduleConfig.put("org.quartz.scheduler.instanceName", "AppScheduler"); defaultAppScheduleConfig.put("org.quartz.scheduler.instanceName", "AppScheduler");
@ -143,11 +143,7 @@ public class AppScheduler {
AppRuntime context = getAppRuntime(application); AppRuntime context = getAppRuntime(application);
if (Boolean.TRUE.equals(context.getEnabled())) { if (Boolean.TRUE.equals(context.getEnabled())) {
JobDetail jobDetail = jobBuilder(application, application.getName()); JobDetail jobDetail = jobBuilder(application, application.getName());
if (!application if (!application.getAppSchedule().getScheduleTimeline().equals(ScheduleTimeline.NONE)) {
.getAppSchedule()
.getScheduleTimeline()
.value()
.equals(ScheduleTimeline.NONE)) {
Trigger trigger = trigger(application); Trigger trigger = trigger(application);
scheduler.scheduleJob(jobDetail, trigger); scheduler.scheduleJob(jobDetail, trigger);
} }
@ -167,12 +163,9 @@ public class AppScheduler {
// OnDemand Jobs // OnDemand Jobs
scheduler.deleteJob( scheduler.deleteJob(
new JobKey( new JobKey(String.format("%s-%s", app.getName(), ON_DEMAND_JOB), APPS_JOB_GROUP));
String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()), APPS_JOB_GROUP));
scheduler.unscheduleJob( scheduler.unscheduleJob(
new TriggerKey( new TriggerKey(String.format("%s-%s", app.getName(), ON_DEMAND_JOB), APPS_TRIGGER_GROUP));
String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()),
APPS_TRIGGER_GROUP));
} }
private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException { private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException {
@ -230,8 +223,7 @@ public class AppScheduler {
JobDetail jobDetailOnDemand = JobDetail jobDetailOnDemand =
scheduler.getJobDetail( scheduler.getJobDetail(
new JobKey( new JobKey(
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()), String.format("%s-%s", application.getName(), ON_DEMAND_JOB), APPS_JOB_GROUP));
APPS_JOB_GROUP));
// Check if the job is already running // Check if the job is already running
List<JobExecutionContext> currentJobs = scheduler.getCurrentlyExecutingJobs(); List<JobExecutionContext> currentJobs = scheduler.getCurrentlyExecutingJobs();
for (JobExecutionContext context : currentJobs) { for (JobExecutionContext context : currentJobs) {
@ -247,14 +239,12 @@ public class AppScheduler {
AppRuntime context = getAppRuntime(application); AppRuntime context = getAppRuntime(application);
if (Boolean.TRUE.equals(context.getEnabled())) { if (Boolean.TRUE.equals(context.getEnabled())) {
JobDetail newJobDetail = JobDetail newJobDetail =
jobBuilder( jobBuilder(application, String.format("%s-%s", application.getName(), ON_DEMAND_JOB));
application, newJobDetail.getJobDataMap().put("triggerType", ON_DEMAND_JOB);
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()));
newJobDetail.getJobDataMap().put("triggerType", AppRunType.OnDemand.value());
Trigger trigger = Trigger trigger =
TriggerBuilder.newTrigger() TriggerBuilder.newTrigger()
.withIdentity( .withIdentity(
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()), String.format("%s-%s", application.getName(), ON_DEMAND_JOB),
APPS_TRIGGER_GROUP) APPS_TRIGGER_GROUP)
.startNow() .startNow()
.build(); .build();

View File

@ -140,20 +140,6 @@ public class AppResource extends EntityResource<App, AppRepository> {
.installApplication(app, Entity.getCollectionDAO(), searchRepository); .installApplication(app, Entity.getCollectionDAO(), searchRepository);
} }
} }
// Initialize installed applications
for (App installedApp : repository.listAll()) {
App appWithBot = getAppForInit(installedApp.getName());
if (appWithBot == null) {
LOG.error(
String.format(
"Failed to init app [%s]. GET should return the installed app",
installedApp.getName()));
} else {
ApplicationHandler.getInstance().runAppInit(appWithBot, dao, searchRepository);
LOG.info(String.format("Initialized installed app [%s]", installedApp.getName()));
}
}
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Failed in Create App Requests", ex); LOG.error("Failed in Create App Requests", ex);
} }

View File

@ -25,21 +25,8 @@
] ]
}, },
"runType": { "runType": {
"javaType": "org.openmetadata.schema.entity.app.AppRunType",
"description": "This schema defines the type of application Run.", "description": "This schema defines the type of application Run.",
"type": "string", "type": "string"
"enum": [
"Scheduled",
"OnDemand"
],
"javaEnums": [
{
"name": "Scheduled"
},
{
"name": "OnDemand"
}
]
}, },
"startTime": { "startTime": {
"description": "Start of the job status.", "description": "Start of the job status.",

View File

@ -14,7 +14,6 @@ import { render, screen } from '@testing-library/react';
import userEvent from '@testing-library/user-event'; import userEvent from '@testing-library/user-event';
import React from 'react'; import React from 'react';
import { import {
RunType,
ScheduleTimeline, ScheduleTimeline,
Status, Status,
} from '../../../../generated/entity/applications/appRunRecord'; } from '../../../../generated/entity/applications/appRunRecord';
@ -60,7 +59,7 @@ const mockProps1 = {
data: { data: {
appId: '6e4d3dcf-238d-4874-b4e4-dd863ede6544', appId: '6e4d3dcf-238d-4874-b4e4-dd863ede6544',
status: Status.Success, status: Status.Success,
runType: RunType.OnDemand, runType: 'OnDemand',
startTime: 1706871884587, startTime: 1706871884587,
endTime: 1706871891251, endTime: 1706871891251,
timestamp: 1706871884587, timestamp: 1706871884587,