From 46bda6dc32be8710fbea16f3a2277a34cef5dcb1 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Fri, 7 Oct 2022 17:48:37 +0530 Subject: [PATCH] Added websocket updates on jobchannel (#8014) --- .../BuildSearchIndexResource.java | 94 ++++++++----------- .../elasticSearch/BulkProcessorListener.java | 26 +++-- .../service/socket/WebSocketManager.java | 1 + 3 files changed, 53 insertions(+), 68 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java index 4573742e968..c695e32a166 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java @@ -13,6 +13,7 @@ import java.time.ZoneId; import java.util.Date; import java.util.List; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -46,6 +47,7 @@ import org.openmetadata.schema.api.CreateEventPublisherJob; import org.openmetadata.schema.api.CreateEventPublisherJob.RunMode; import org.openmetadata.schema.api.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.entity.teams.User; import org.openmetadata.schema.settings.EventPublisherJob; import org.openmetadata.schema.settings.FailureDetails; import org.openmetadata.schema.settings.Stats; @@ -56,6 +58,7 @@ import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.jdbi3.UserRepository; import org.openmetadata.service.resources.Collection; import org.openmetadata.service.security.Authorizer; import org.openmetadata.service.util.ConfigurationHolder; @@ -79,6 +82,7 @@ public class BuildSearchIndexResource { private final CollectionDAO dao; private final Authorizer authorizer; private final ExecutorService threadScheduler; + private final UserRepository userRepository; public BuildSearchIndexResource(CollectionDAO dao, Authorizer authorizer) { if (ConfigurationHolder.getInstance() @@ -92,6 +96,7 @@ public class BuildSearchIndexResource { this.elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(client, dao); } this.dao = dao; + this.userRepository = new UserRepository(dao); this.authorizer = authorizer; this.threadScheduler = new ThreadPoolExecutor( @@ -121,16 +126,16 @@ public class BuildSearchIndexResource { @ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found") }) public Response reindexAllEntities( - @Context UriInfo uriInfo, - @Context SecurityContext securityContext, - @Valid CreateEventPublisherJob createRequest) { + @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateEventPublisherJob createRequest) + throws IOException { // Only admins can issue a reindex request authorizer.authorizeAdmin(securityContext, false); - String startedBy = securityContext.getUserPrincipal().getName(); + User user = + userRepository.getByName(null, securityContext.getUserPrincipal().getName(), userRepository.getFields("id")); if (createRequest.getRunMode() == RunMode.BATCH) { - return startReindexingBatchMode(uriInfo, startedBy, createRequest); + return startReindexingBatchMode(uriInfo, user.getId(), createRequest); } else { - return startReindexingStreamMode(uriInfo, startedBy, createRequest); + return startReindexingStreamMode(uriInfo, user.getId(), createRequest); } } @@ -170,14 +175,14 @@ public class BuildSearchIndexResource { } private synchronized Response startReindexingStreamMode( - UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) { + UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) { // create a new Job threadScheduler.submit(() -> this.submitStreamJob(uriInfo, startedBy, createRequest)); return Response.status(Response.Status.OK).entity("Reindexing Started").build(); } private synchronized Response startReindexingBatchMode( - UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) { + UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) { // create a new Job threadScheduler.submit( () -> { @@ -190,17 +195,38 @@ public class BuildSearchIndexResource { return Response.status(Response.Status.OK).entity("Reindexing Started").build(); } - private synchronized void submitStreamJob(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) { + private synchronized void submitStreamJob(UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) { try { for (String entityName : createRequest.getEntities()) { - updateEntityStream(uriInfo, entityName, createRequest); + updateEntityStream(uriInfo, startedBy, entityName, createRequest); } + + // Mark the Job end + String reindexJobString = + dao.entityExtensionTimeSeriesDao() + .getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION); + EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); + long lastUpdateTime = latestJob.getTimestamp(); + Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + latestJob.setTimestamp(time); + latestJob.setEndTime(time); + if (latestJob.getFailureDetails() != null) { + latestJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR); + } else { + latestJob.setStatus(EventPublisherJob.Status.ACTIVE); + } + dao.entityExtensionTimeSeriesDao() + .update( + ELASTIC_SEARCH_ENTITY_FQN_STREAM, + ELASTIC_SEARCH_EXTENSION, + JsonUtils.pojoToJson(latestJob), + lastUpdateTime); } catch (IOException e) { throw new RuntimeException(e); } } - private synchronized void submitBatchJob(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) + private synchronized void submitBatchJob(UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) throws IOException { long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); String recordString = @@ -208,6 +234,7 @@ public class BuildSearchIndexResource { EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class); long originalLastUpdate = lastRecord.getTimestamp(); lastRecord.setStatus(EventPublisherJob.Status.ACTIVE); + lastRecord.setStats(new Stats().withFailed(0).withTotal(0).withSuccess(0)); lastRecord.setTimestamp(updateTime); lastRecord.setEntities(createRequest.getEntities()); dao.entityExtensionTimeSeriesDao() @@ -218,33 +245,13 @@ public class BuildSearchIndexResource { originalLastUpdate); // Update Listener for only Batch - BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(dao); + BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(dao, startedBy); BulkProcessor processor = getBulkProcessor(bulkProcessorListener, createRequest.getBatchSize(), createRequest.getFlushIntervalInSec()); for (String entityName : createRequest.getEntities()) { updateEntityBatch(processor, bulkProcessorListener, uriInfo, entityName, createRequest); } - - // mark the job as done - EventPublisherJob lastReadRecord = - JsonUtils.readValue( - dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION), - EventPublisherJob.class); - long lastRecordTimestamp = lastReadRecord.getTimestamp(); - lastReadRecord.setStatus(EventPublisherJob.Status.IDLE); - lastReadRecord.setTimestamp(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime()); - lastReadRecord.setStats( - new Stats() - .withTotal(bulkProcessorListener.getTotalRequests()) - .withSuccess(bulkProcessorListener.getTotalSuccessCount()) - .withFailed(bulkProcessorListener.getTotalFailedCount())); - dao.entityExtensionTimeSeriesDao() - .update( - ELASTIC_SEARCH_ENTITY_FQN_BATCH, - ELASTIC_SEARCH_EXTENSION, - JsonUtils.pojoToJson(lastReadRecord), - lastRecordTimestamp); } private synchronized void updateEntityBatch( @@ -294,7 +301,7 @@ public class BuildSearchIndexResource { } private synchronized void updateEntityStream( - UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest) throws IOException { + UriInfo uriInfo, UUID startedBy, String entityType, CreateEventPublisherJob createRequest) throws IOException { ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType); @@ -327,27 +334,6 @@ public class BuildSearchIndexResource { } catch (Exception ex) { LOG.error("Failed in listing all Entities of type : {}", entityType); } - - // Mark the Job end - String reindexJobString = - dao.entityExtensionTimeSeriesDao() - .getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION); - EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); - long lastUpdateTime = latestJob.getTimestamp(); - Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); - latestJob.setTimestamp(time); - latestJob.setEndTime(time); - if (latestJob.getFailureDetails() != null) { - latestJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR); - } else { - latestJob.setStatus(EventPublisherJob.Status.ACTIVE); - } - dao.entityExtensionTimeSeriesDao() - .update( - ELASTIC_SEARCH_ENTITY_FQN_STREAM, - ELASTIC_SEARCH_EXTENSION, - JsonUtils.pojoToJson(latestJob), - lastUpdateTime); } private synchronized void updateElasticSearchForEntityBatch( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java index d940be8ffdb..1207e055a93 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java @@ -6,6 +6,7 @@ import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexR import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Date; +import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; @@ -15,6 +16,7 @@ import org.openmetadata.schema.settings.EventPublisherJob; import org.openmetadata.schema.settings.FailureDetails; import org.openmetadata.schema.settings.Stats; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.socket.WebSocketManager; import org.openmetadata.service.util.JsonUtils; @Slf4j @@ -24,9 +26,11 @@ public class BulkProcessorListener implements BulkProcessor.Listener { private volatile int totalFailedCount = 0; private volatile int totalRequests = 0; private final CollectionDAO dao; + private final UUID startedBy; - public BulkProcessorListener(CollectionDAO dao) { + public BulkProcessorListener(CollectionDAO dao, UUID startedBy) { this.dao = dao; + this.startedBy = startedBy; this.resetCounters(); } @@ -107,7 +111,11 @@ public class BulkProcessorListener implements BulkProcessor.Listener { dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION); EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class); long originalLastUpdate = lastRecord.getTimestamp(); - lastRecord.setStatus(status); + if (totalRequests == totalFailedCount + totalSuccessCount) { + lastRecord.setStatus(EventPublisherJob.Status.IDLE); + } else { + lastRecord.setStatus(status); + } lastRecord.setTimestamp(updateTime); if (failDetails != null) { lastRecord.setFailureDetails( @@ -120,20 +128,10 @@ public class BulkProcessorListener implements BulkProcessor.Listener { ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(lastRecord), originalLastUpdate); + WebSocketManager.getInstance() + .sendToOne(this.startedBy, WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, JsonUtils.pojoToJson(lastRecord)); } catch (Exception e) { LOG.error("Failed to Update Elastic Search Job Info"); } } - - public int getTotalSuccessCount() { - return totalSuccessCount; - } - - public int getTotalFailedCount() { - return totalFailedCount; - } - - public int getTotalRequests() { - return totalRequests; - } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java index 0506cd81b04..c164b595db7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java @@ -20,6 +20,7 @@ public class WebSocketManager { @Getter private final SocketIoServer socketIoServer; public static final String FEED_BROADCAST_CHANNEL = "activityFeed"; public static final String TASK_BROADCAST_CHANNEL = "taskChannel"; + public static final String JOB_STATUS_BROADCAST_CHANNEL = "jobStatus"; public static final String MENTION_CHANNEL = "mentionChannel"; public static final String ANNOUNCEMENT_CHANNEL = "announcementChannel"; private final Map> activityFeedEndpoints = new ConcurrentHashMap<>();