From 8cee06f58369d19cefd6187af9c4aa54d7ffea22 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Thu, 4 Sep 2025 22:55:50 +0530 Subject: [PATCH] Concurrency issues in search (#23249) * Fix - 23213 : WebSocketManager Issue on concurrent modification * Await till consumers comelete the job --------- Co-authored-by: Pere Miquel Brull (cherry picked from commit 080f0d21fb35a5c8a8150dc46592e07c8ecae588) --- .../service/apps/bundles/searchIndex/SearchIndexApp.java | 7 +++---- .../org/openmetadata/service/socket/WebSocketManager.java | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 58e940f85f3..e6911298ace 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -756,10 +756,9 @@ public class SearchIndexApp extends AbstractNativeApplication { private void waitForConsumersToComplete(CountDownLatch consumerLatch) throws InterruptedException { - boolean finished = consumerLatch.await(5, TimeUnit.MINUTES); - if (!finished) { - LOG.warn("Consumers did not finish within timeout"); - } + LOG.info("Waiting for all consumers to complete their work..."); + consumerLatch.await(); // Wait indefinitely for consumers to finish all work + LOG.info("All consumers have completed their work"); } private void handleInterruption(InterruptedException e) throws InterruptedException { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java index 6b57fa59d47..e41dc1fd622 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java @@ -107,7 +107,7 @@ public class WebSocketManager { userSocketConnections = activityFeedEndpoints.containsKey(id) ? activityFeedEndpoints.get(id) - : new HashMap<>(); + : new ConcurrentHashMap<>(); userSocketConnections.put(socket.getId(), socket); activityFeedEndpoints.put(id, userSocketConnections); }