- Fix Poll Interval (#15453)

* - Fix Poll Interval

* make poll interval as 1

* - Add random tests
- Disable Tests for Subscription for Query

* - Checkstyle Fix

* - Remove ondemand jobs as well
- On update remove already init instance

* - use alias

* - typo

* - Search Index Fix
This commit is contained in:
Mohit Yadav 2024-03-06 02:21:28 +05:30 committed by GitHub
parent dd05b67449
commit 9491e04088
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 102 additions and 90 deletions

View File

@ -2,7 +2,6 @@ package org.openmetadata.service.apps;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.service.exception.UnhandledServerException;
@ -12,12 +11,6 @@ import org.openmetadata.service.search.SearchRepository;
@Slf4j
public class ApplicationHandler {
private static HashMap<String, Object> instances = new HashMap<>();
public static Object getAppInstance(String className) {
return instances.get(className);
}
private ApplicationHandler() {
/*Helper*/
}
@ -53,8 +46,6 @@ public class ApplicationHandler {
Method initMethod = resource.getClass().getMethod("init", App.class);
initMethod.invoke(resource, app);
instances.put(app.getClassName(), resource);
return resource;
}
@ -63,11 +54,7 @@ public class ApplicationHandler {
App app, CollectionDAO daoCollection, SearchRepository searchRepository, String methodName) {
// Native Application
try {
Object resource = getAppInstance(app.getClassName());
if (resource == null) {
resource = runAppInit(app, daoCollection, searchRepository);
}
Object resource = runAppInit(app, daoCollection, searchRepository);
// Call method on demand
Method scheduleMethod = resource.getClass().getMethod(methodName);
scheduleMethod.invoke(resource);
@ -77,13 +64,9 @@ public class ApplicationHandler {
| IllegalAccessException
| InvocationTargetException e) {
LOG.error("Exception encountered", e);
throw new UnhandledServerException("Exception encountered", e);
throw new UnhandledServerException(e.getCause().getMessage());
} catch (ClassNotFoundException e) {
throw new UnhandledServerException("Exception encountered", e);
throw new UnhandledServerException(e.getCause().getMessage());
}
}
public static void removeUninstalledApp(String className) {
instances.remove(className);
}
}

View File

@ -1,9 +1,9 @@
package org.openmetadata.service.apps.bundles.searchIndex;
import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;
import java.util.ArrayList;
@ -166,8 +166,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
// Run ReIndexing
entitiesReIndex();
dataInsightReindex();
entitiesReIndex(jobExecutionContext);
dataInsightReindex(jobExecutionContext);
// Mark Job as Completed
updateJobStatus();
} catch (Exception ex) {
@ -182,12 +182,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(indexingError);
} finally {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
// Update Record to db
updateRecordToDb(jobExecutionContext);
// Send update
sendUpdates();
sendUpdates(jobExecutionContext);
}
}
@ -212,7 +208,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
pushAppStatusUpdates(jobExecutionContext, appRecord, true);
}
private void entitiesReIndex() {
private void entitiesReIndex(JobExecutionContext jobExecutionContext) {
Map<String, Object> contextData = new HashMap<>();
for (PaginatedEntitiesSource paginatedEntitiesSource : paginatedEntitiesSources) {
reCreateIndexes(paginatedEntitiesSource.getEntityType());
@ -222,18 +218,32 @@ public class SearchIndexApp extends AbstractNativeApplication {
try {
resultList = paginatedEntitiesSource.readNext(null);
if (!resultList.getData().isEmpty()) {
searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData);
if (!resultList.getErrors().isEmpty()) {
searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData);
throw new SearchIndexException(
new IndexingError()
.withErrorSource(READER)
.withLastFailedCursor(paginatedEntitiesSource.getLastFailedCursor())
.withSubmittedCount(paginatedEntitiesSource.getBatchSize())
.withSuccessCount(resultList.getData().size())
.withFailedCount(resultList.getErrors().size())
.withMessage(
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
.withFailedEntities(resultList.getErrors()));
}
}
} catch (SearchIndexException rx) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(rx.getIndexingError());
} finally {
updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
sendUpdates(jobExecutionContext);
}
}
updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
sendUpdates();
}
}
private void dataInsightReindex() {
private void dataInsightReindex(JobExecutionContext jobExecutionContext) {
Map<String, Object> contextData = new HashMap<>();
for (PaginatedDataInsightSource paginatedDataInsightSource : paginatedDataInsightSources) {
reCreateIndexes(paginatedDataInsightSource.getEntityType());
@ -247,17 +257,23 @@ public class SearchIndexApp extends AbstractNativeApplication {
dataInsightProcessor.process(resultList, contextData), contextData);
}
} catch (SearchIndexException ex) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(ex.getIndexingError());
} finally {
updateStats(
paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
sendUpdates(jobExecutionContext);
}
}
updateStats(
paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
sendUpdates();
}
}
private void sendUpdates() {
private void sendUpdates(JobExecutionContext jobExecutionContext) {
try {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
// Update Record to db
updateRecordToDb(jobExecutionContext);
if (WebSocketManager.getInstance() != null) {
WebSocketManager.getInstance()
.broadCastMessageToAll(
@ -286,8 +302,17 @@ public class SearchIndexApp extends AbstractNativeApplication {
new StepStats()
.withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), collectionDAO));
}
getUpdatedStats(
stats, currentEntityStats.getSuccessRecords(), currentEntityStats.getFailedRecords());
stats.setSuccessRecords(
entityLevelStats.getAdditionalProperties().values().stream()
.map(s -> (StepStats) s)
.mapToInt(StepStats::getSuccessRecords)
.sum());
stats.setFailedRecords(
entityLevelStats.getAdditionalProperties().values().stream()
.map(s -> (StepStats) s)
.mapToInt(StepStats::getFailedRecords)
.sum());
// Update for the Job
jobDataStats.setJobStats(stats);

View File

@ -125,6 +125,10 @@ public abstract class AbstractOmAppJobListener implements JobListener {
JobExecutionContext context, AppRunRecord runRecord, boolean update) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
if (dataMap.containsKey(SCHEDULED_APP_RUN_EXTENSION)) {
// Update the Run Record in Data Map
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
// Push Updates to the Database
App jobApp =
JsonUtils.readOrConvertValue(
context.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class);

View File

@ -135,14 +135,13 @@ public class AppScheduler {
public void addApplicationSchedule(App application) {
try {
if (scheduler.getJobDetail(new JobKey(application.getId().toString(), APPS_JOB_GROUP))
!= null) {
if (scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP)) != null) {
LOG.info("Job already exists for the application, skipping the scheduling");
return;
}
AppRuntime context = getAppRuntime(application);
if (Boolean.TRUE.equals(context.getEnabled())) {
JobDetail jobDetail = jobBuilder(application, application.getId().toString());
JobDetail jobDetail = jobBuilder(application, application.getName());
Trigger trigger = trigger(application);
scheduler.scheduleJob(jobDetail, trigger);
} else {
@ -155,8 +154,18 @@ public class AppScheduler {
}
public void deleteScheduledApplication(App app) throws SchedulerException {
scheduler.deleteJob(new JobKey(app.getId().toString(), APPS_JOB_GROUP));
scheduler.unscheduleJob(new TriggerKey(app.getId().toString(), APPS_TRIGGER_GROUP));
// Scheduled Jobs
scheduler.deleteJob(new JobKey(app.getName(), APPS_JOB_GROUP));
scheduler.unscheduleJob(new TriggerKey(app.getName(), APPS_TRIGGER_GROUP));
// OnDemand Jobs
scheduler.deleteJob(
new JobKey(
String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()), APPS_JOB_GROUP));
scheduler.unscheduleJob(
new TriggerKey(
String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()),
APPS_TRIGGER_GROUP));
}
private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException {
@ -175,7 +184,7 @@ public class AppScheduler {
private Trigger trigger(App app) {
return TriggerBuilder.newTrigger()
.withIdentity(app.getId().toString(), APPS_TRIGGER_GROUP)
.withIdentity(app.getName(), APPS_TRIGGER_GROUP)
.withSchedule(getCronSchedule(app.getAppSchedule()))
.build();
}
@ -210,11 +219,11 @@ public class AppScheduler {
public void triggerOnDemandApplication(App application) {
try {
JobDetail jobDetailScheduled =
scheduler.getJobDetail(new JobKey(application.getId().toString(), APPS_JOB_GROUP));
scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP));
JobDetail jobDetailOnDemand =
scheduler.getJobDetail(
new JobKey(
String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()),
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()),
APPS_JOB_GROUP));
// Check if the job is already running
List<JobExecutionContext> currentJobs = scheduler.getCurrentlyExecutingJobs();
@ -233,12 +242,12 @@ public class AppScheduler {
JobDetail newJobDetail =
jobBuilder(
application,
String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()));
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()));
newJobDetail.getJobDataMap().put("triggerType", AppRunType.OnDemand.value());
Trigger trigger =
TriggerBuilder.newTrigger()
.withIdentity(
String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()),
String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()),
APPS_TRIGGER_GROUP)
.startNow()
.build();

View File

@ -132,7 +132,8 @@ public class EventSubscriptionScheduler {
private Trigger trigger(EventSubscription eventSubscription) {
return TriggerBuilder.newTrigger()
.withIdentity(eventSubscription.getId().toString(), ALERT_TRIGGER_GROUP)
.withSchedule(SimpleScheduleBuilder.repeatMinutelyForever(1))
.withSchedule(
SimpleScheduleBuilder.repeatSecondlyForever(eventSubscription.getPollInterval()))
.startNow()
.build();
}

View File

@ -3223,7 +3223,7 @@ public interface CollectionDAO {
@Bind("eventType") String eventType, @Bind("timestamp") long timestamp);
@SqlQuery(
"SELECT json FROM change_event where offset > :offset ORDER BY eventTime ASC LIMIT :limit")
"SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC LIMIT :limit")
List<String> list(@Bind("limit") long limit, @Bind("offset") long offset);
@SqlQuery("SELECT count(*) FROM change_event")

View File

@ -5,7 +5,6 @@ import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.APPLICATION;
import static org.openmetadata.service.Entity.BOT;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import static org.openmetadata.service.apps.ApplicationHandler.removeUninstalledApp;
import static org.openmetadata.service.jdbi3.EntityRepository.getEntitiesFromSeedData;
import io.swagger.v3.oas.annotations.ExternalDocumentation;
@ -1031,8 +1030,5 @@ public class AppResource extends EntityResource<App, AppRepository> {
}
}
}
// Remove App from instances Map Lookup
removeUninstalledApp(installedApp.getClassName());
}
}

View File

@ -72,7 +72,7 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
private ResultList<? extends EntityInterface> read(String cursor) throws SearchIndexException {
LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize);
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
ResultList<? extends EntityInterface> result = null;
ResultList<? extends EntityInterface> result;
try {
result =
entityRepository.listAfterWithSkipFailure(
@ -83,31 +83,19 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
cursor);
if (!result.getErrors().isEmpty()) {
lastFailedCursor = this.cursor;
throw new SearchIndexException(
new IndexingError()
.withErrorSource(READER)
.withLastFailedCursor(lastFailedCursor)
.withSubmittedCount(batchSize)
.withSuccessCount(result.getData().size())
.withFailedCount(result.getErrors().size())
.withMessage(
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
.withFailedEntities(result.getErrors()));
if (result.getPaging().getAfter() == null) {
isDone = true;
} else {
this.cursor = result.getPaging().getAfter();
}
updateStats(result.getData().size(), result.getErrors().size());
return result;
}
LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, result.getData().size(), result.getErrors().size());
updateStats(result.getData().size(), result.getErrors().size());
} catch (SearchIndexException ex) {
lastFailedCursor = this.cursor;
if (result.getPaging().getAfter() == null) {
isDone = true;
} else {
this.cursor = result.getPaging().getAfter();
}
updateStats(result.getData().size(), result.getErrors().size());
throw ex;
} catch (Exception e) {
lastFailedCursor = this.cursor;
IndexingError indexingError =

View File

@ -370,7 +370,7 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
// Run webhook related tests randomly. This will ensure these tests are not run for every entity
// evey time junit tests are run to save time. But over the course of development of a release,
// when tests are run enough times, the webhook tests are run for all the entities.
public static boolean runWebhookTests;
public boolean runWebhookTests = new Random().nextBoolean();
protected boolean supportsSearchIndex = false;
@ -438,22 +438,21 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
new BotResourceTest().setupBots();
new QueryResourceTest().setupQuery(test);
runWebhookTests = new Random().nextBoolean();
// if (true) {
webhookCallbackResource.clearEvents();
EventSubscriptionResourceTest alertResourceTest = new EventSubscriptionResourceTest();
alertResourceTest.startWebhookSubscription();
alertResourceTest.startWebhookEntitySubscriptions(entityType);
// }
if (runWebhookTests) {
webhookCallbackResource.clearEvents();
EventSubscriptionResourceTest alertResourceTest = new EventSubscriptionResourceTest();
alertResourceTest.startWebhookSubscription();
alertResourceTest.startWebhookEntitySubscriptions(entityType);
}
}
@AfterAll
public void afterAllTests() throws Exception {
// if (true) {
EventSubscriptionResourceTest alertResourceTest = new EventSubscriptionResourceTest();
alertResourceTest.validateWebhookEvents();
alertResourceTest.validateWebhookEntityEvents(entityType);
// }
if (runWebhookTests) {
EventSubscriptionResourceTest alertResourceTest = new EventSubscriptionResourceTest();
alertResourceTest.validateWebhookEvents();
alertResourceTest.validateWebhookEntityEvents(entityType);
}
delete_recursiveTest();
}
@ -2613,6 +2612,9 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
ChangeDescription expectedChangeDescription,
Map<String, String> authHeaders)
throws IOException {
if (!runWebhookTests) {
return;
}
validateChangeEvents(
entityInterface,
timestamp,
@ -2754,6 +2756,9 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
EventType expectedEventType,
Double expectedVersion,
Map<String, String> authHeaders) {
if (!runWebhookTests) {
return;
}
String updatedBy = SecurityUtil.getPrincipalName(authHeaders);
EventHolder eventHolder = new EventHolder();

View File

@ -643,7 +643,7 @@ public class EventSubscriptionResourceTest
.withEnabled(true)
.withBatchSize(10)
.withRetries(0)
.withPollInterval(0)
.withPollInterval(1)
.withAlertType(CreateEventSubscription.AlertType.NOTIFICATION);
}

View File

@ -48,6 +48,7 @@ public class QueryResourceTest extends EntityResourceTest<Query, CreateQuery> {
super(
Entity.QUERY, Query.class, QueryResource.QueryList.class, "queries", QueryResource.FIELDS);
supportsSearchIndex = true;
runWebhookTests = false;
}
@BeforeAll

View File

@ -334,7 +334,7 @@
"pollInterval": {
"description": "Poll Interval in seconds.",
"type": "integer",
"default": 10
"default": 60
},
"input": {
"description": "Input for the Filters.",