diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java index e19cc37f6dd..8980c618263 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java @@ -2,7 +2,6 @@ package org.openmetadata.service.apps; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.HashMap; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.entity.app.App; import org.openmetadata.service.exception.UnhandledServerException; @@ -12,12 +11,6 @@ import org.openmetadata.service.search.SearchRepository; @Slf4j public class ApplicationHandler { - private static HashMap instances = new HashMap<>(); - - public static Object getAppInstance(String className) { - return instances.get(className); - } - private ApplicationHandler() { /*Helper*/ } @@ -53,8 +46,6 @@ public class ApplicationHandler { Method initMethod = resource.getClass().getMethod("init", App.class); initMethod.invoke(resource, app); - instances.put(app.getClassName(), resource); - return resource; } @@ -63,11 +54,7 @@ public class ApplicationHandler { App app, CollectionDAO daoCollection, SearchRepository searchRepository, String methodName) { // Native Application try { - Object resource = getAppInstance(app.getClassName()); - if (resource == null) { - resource = runAppInit(app, daoCollection, searchRepository); - } - + Object resource = runAppInit(app, daoCollection, searchRepository); // Call method on demand Method scheduleMethod = resource.getClass().getMethod(methodName); scheduleMethod.invoke(resource); @@ -77,13 +64,9 @@ public class ApplicationHandler { | IllegalAccessException | InvocationTargetException e) { LOG.error("Exception encountered", e); - throw new UnhandledServerException("Exception encountered", e); + throw new UnhandledServerException(e.getCause().getMessage()); } catch (ClassNotFoundException e) { - throw new UnhandledServerException("Exception encountered", e); + throw new UnhandledServerException(e.getCause().getMessage()); } } - - public static void removeUninstalledApp(String className) { - instances.remove(className); - } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 9127443e6db..c0aa0686b5c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -1,9 +1,9 @@ package org.openmetadata.service.apps.bundles.searchIndex; +import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess; -import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex; import java.util.ArrayList; @@ -166,8 +166,8 @@ public class SearchIndexApp extends AbstractNativeApplication { } // Run ReIndexing - entitiesReIndex(); - dataInsightReindex(); + entitiesReIndex(jobExecutionContext); + dataInsightReindex(jobExecutionContext); // Mark Job as Completed updateJobStatus(); } catch (Exception ex) { @@ -182,12 +182,8 @@ public class SearchIndexApp extends AbstractNativeApplication { jobData.setStatus(EventPublisherJob.Status.FAILED); jobData.setFailure(indexingError); } finally { - // store job details in Database - jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats()); - // Update Record to db - updateRecordToDb(jobExecutionContext); // Send update - sendUpdates(); + sendUpdates(jobExecutionContext); } } @@ -212,7 +208,7 @@ public class SearchIndexApp extends AbstractNativeApplication { pushAppStatusUpdates(jobExecutionContext, appRecord, true); } - private void entitiesReIndex() { + private void entitiesReIndex(JobExecutionContext jobExecutionContext) { Map contextData = new HashMap<>(); for (PaginatedEntitiesSource paginatedEntitiesSource : paginatedEntitiesSources) { reCreateIndexes(paginatedEntitiesSource.getEntityType()); @@ -222,18 +218,32 @@ public class SearchIndexApp extends AbstractNativeApplication { try { resultList = paginatedEntitiesSource.readNext(null); if (!resultList.getData().isEmpty()) { - searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData); + if (!resultList.getErrors().isEmpty()) { + searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData); + throw new SearchIndexException( + new IndexingError() + .withErrorSource(READER) + .withLastFailedCursor(paginatedEntitiesSource.getLastFailedCursor()) + .withSubmittedCount(paginatedEntitiesSource.getBatchSize()) + .withSuccessCount(resultList.getData().size()) + .withFailedCount(resultList.getErrors().size()) + .withMessage( + "Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.") + .withFailedEntities(resultList.getErrors())); + } } } catch (SearchIndexException rx) { + jobData.setStatus(EventPublisherJob.Status.FAILED); jobData.setFailure(rx.getIndexingError()); + } finally { + updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats()); + sendUpdates(jobExecutionContext); } } - updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats()); - sendUpdates(); } } - private void dataInsightReindex() { + private void dataInsightReindex(JobExecutionContext jobExecutionContext) { Map contextData = new HashMap<>(); for (PaginatedDataInsightSource paginatedDataInsightSource : paginatedDataInsightSources) { reCreateIndexes(paginatedDataInsightSource.getEntityType()); @@ -247,17 +257,23 @@ public class SearchIndexApp extends AbstractNativeApplication { dataInsightProcessor.process(resultList, contextData), contextData); } } catch (SearchIndexException ex) { + jobData.setStatus(EventPublisherJob.Status.FAILED); jobData.setFailure(ex.getIndexingError()); + } finally { + updateStats( + paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats()); + sendUpdates(jobExecutionContext); } } - updateStats( - paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats()); - sendUpdates(); } } - private void sendUpdates() { + private void sendUpdates(JobExecutionContext jobExecutionContext) { try { + // store job details in Database + jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats()); + // Update Record to db + updateRecordToDb(jobExecutionContext); if (WebSocketManager.getInstance() != null) { WebSocketManager.getInstance() .broadCastMessageToAll( @@ -286,8 +302,17 @@ public class SearchIndexApp extends AbstractNativeApplication { new StepStats() .withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), collectionDAO)); } - getUpdatedStats( - stats, currentEntityStats.getSuccessRecords(), currentEntityStats.getFailedRecords()); + + stats.setSuccessRecords( + entityLevelStats.getAdditionalProperties().values().stream() + .map(s -> (StepStats) s) + .mapToInt(StepStats::getSuccessRecords) + .sum()); + stats.setFailedRecords( + entityLevelStats.getAdditionalProperties().values().stream() + .map(s -> (StepStats) s) + .mapToInt(StepStats::getFailedRecords) + .sum()); // Update for the Job jobDataStats.setJobStats(stats); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java index 2d4afc2733b..72be015f76c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java @@ -125,6 +125,10 @@ public abstract class AbstractOmAppJobListener implements JobListener { JobExecutionContext context, AppRunRecord runRecord, boolean update) { JobDataMap dataMap = context.getJobDetail().getJobDataMap(); if (dataMap.containsKey(SCHEDULED_APP_RUN_EXTENSION)) { + // Update the Run Record in Data Map + dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); + + // Push Updates to the Database App jobApp = JsonUtils.readOrConvertValue( context.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index 3aebf1f85e8..7c6affad96c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -135,14 +135,13 @@ public class AppScheduler { public void addApplicationSchedule(App application) { try { - if (scheduler.getJobDetail(new JobKey(application.getId().toString(), APPS_JOB_GROUP)) - != null) { + if (scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP)) != null) { LOG.info("Job already exists for the application, skipping the scheduling"); return; } AppRuntime context = getAppRuntime(application); if (Boolean.TRUE.equals(context.getEnabled())) { - JobDetail jobDetail = jobBuilder(application, application.getId().toString()); + JobDetail jobDetail = jobBuilder(application, application.getName()); Trigger trigger = trigger(application); scheduler.scheduleJob(jobDetail, trigger); } else { @@ -155,8 +154,18 @@ public class AppScheduler { } public void deleteScheduledApplication(App app) throws SchedulerException { - scheduler.deleteJob(new JobKey(app.getId().toString(), APPS_JOB_GROUP)); - scheduler.unscheduleJob(new TriggerKey(app.getId().toString(), APPS_TRIGGER_GROUP)); + // Scheduled Jobs + scheduler.deleteJob(new JobKey(app.getName(), APPS_JOB_GROUP)); + scheduler.unscheduleJob(new TriggerKey(app.getName(), APPS_TRIGGER_GROUP)); + + // OnDemand Jobs + scheduler.deleteJob( + new JobKey( + String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()), APPS_JOB_GROUP)); + scheduler.unscheduleJob( + new TriggerKey( + String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()), + APPS_TRIGGER_GROUP)); } private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException { @@ -175,7 +184,7 @@ public class AppScheduler { private Trigger trigger(App app) { return TriggerBuilder.newTrigger() - .withIdentity(app.getId().toString(), APPS_TRIGGER_GROUP) + .withIdentity(app.getName(), APPS_TRIGGER_GROUP) .withSchedule(getCronSchedule(app.getAppSchedule())) .build(); } @@ -210,11 +219,11 @@ public class AppScheduler { public void triggerOnDemandApplication(App application) { try { JobDetail jobDetailScheduled = - scheduler.getJobDetail(new JobKey(application.getId().toString(), APPS_JOB_GROUP)); + scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP)); JobDetail jobDetailOnDemand = scheduler.getJobDetail( new JobKey( - String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()), + String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()), APPS_JOB_GROUP)); // Check if the job is already running List currentJobs = scheduler.getCurrentlyExecutingJobs(); @@ -233,12 +242,12 @@ public class AppScheduler { JobDetail newJobDetail = jobBuilder( application, - String.format("%s-%s", application.getId(), AppRunType.OnDemand.value())); + String.format("%s-%s", application.getName(), AppRunType.OnDemand.value())); newJobDetail.getJobDataMap().put("triggerType", AppRunType.OnDemand.value()); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity( - String.format("%s-%s", application.getId(), AppRunType.OnDemand.value()), + String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()), APPS_TRIGGER_GROUP) .startNow() .build(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java index 30491c76fab..78257ddca41 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java @@ -132,7 +132,8 @@ public class EventSubscriptionScheduler { private Trigger trigger(EventSubscription eventSubscription) { return TriggerBuilder.newTrigger() .withIdentity(eventSubscription.getId().toString(), ALERT_TRIGGER_GROUP) - .withSchedule(SimpleScheduleBuilder.repeatMinutelyForever(1)) + .withSchedule( + SimpleScheduleBuilder.repeatSecondlyForever(eventSubscription.getPollInterval())) .startNow() .build(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index dde01a07652..7d9c5190852 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -3223,7 +3223,7 @@ public interface CollectionDAO { @Bind("eventType") String eventType, @Bind("timestamp") long timestamp); @SqlQuery( - "SELECT json FROM change_event where offset > :offset ORDER BY eventTime ASC LIMIT :limit") + "SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC LIMIT :limit") List list(@Bind("limit") long limit, @Bind("offset") long offset); @SqlQuery("SELECT count(*) FROM change_event") diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 6464059363c..3cf18bb2d13 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -5,7 +5,6 @@ import static org.openmetadata.schema.type.Include.ALL; import static org.openmetadata.service.Entity.APPLICATION; import static org.openmetadata.service.Entity.BOT; import static org.openmetadata.service.Entity.FIELD_OWNER; -import static org.openmetadata.service.apps.ApplicationHandler.removeUninstalledApp; import static org.openmetadata.service.jdbi3.EntityRepository.getEntitiesFromSeedData; import io.swagger.v3.oas.annotations.ExternalDocumentation; @@ -1031,8 +1030,5 @@ public class AppResource extends EntityResource { } } } - - // Remove App from instances Map Lookup - removeUninstalledApp(installedApp.getClassName()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java index c6900e0cfaa..2a2a412350d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedEntitiesSource.java @@ -72,7 +72,7 @@ public class PaginatedEntitiesSource implements Source read(String cursor) throws SearchIndexException { LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize); EntityRepository entityRepository = Entity.getEntityRepository(entityType); - ResultList result = null; + ResultList result; try { result = entityRepository.listAfterWithSkipFailure( @@ -83,31 +83,19 @@ public class PaginatedEntitiesSource implements Source authHeaders) throws IOException { + if (!runWebhookTests) { + return; + } validateChangeEvents( entityInterface, timestamp, @@ -2754,6 +2756,9 @@ public abstract class EntityResourceTest authHeaders) { + if (!runWebhookTests) { + return; + } String updatedBy = SecurityUtil.getPrincipalName(authHeaders); EventHolder eventHolder = new EventHolder(); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/events/EventSubscriptionResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/events/EventSubscriptionResourceTest.java index 09ef641f200..ed46384bf8a 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/events/EventSubscriptionResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/events/EventSubscriptionResourceTest.java @@ -643,7 +643,7 @@ public class EventSubscriptionResourceTest .withEnabled(true) .withBatchSize(10) .withRetries(0) - .withPollInterval(0) + .withPollInterval(1) .withAlertType(CreateEventSubscription.AlertType.NOTIFICATION); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java index 9e99c42a54c..7bee5c9a094 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java @@ -48,6 +48,7 @@ public class QueryResourceTest extends EntityResourceTest { super( Entity.QUERY, Query.class, QueryResource.QueryList.class, "queries", QueryResource.FIELDS); supportsSearchIndex = true; + runWebhookTests = false; } @BeforeAll diff --git a/openmetadata-spec/src/main/resources/json/schema/events/eventSubscription.json b/openmetadata-spec/src/main/resources/json/schema/events/eventSubscription.json index 256d55f5bfe..dc33000d548 100644 --- a/openmetadata-spec/src/main/resources/json/schema/events/eventSubscription.json +++ b/openmetadata-spec/src/main/resources/json/schema/events/eventSubscription.json @@ -334,7 +334,7 @@ "pollInterval": { "description": "Poll Interval in seconds.", "type": "integer", - "default": 10 + "default": 60 }, "input": { "description": "Input for the Filters.",