Wait for Indexing Job to complete (#14986)

This commit is contained in:
Mohit Yadav 2024-02-01 17:04:21 +05:30 committed by GitHub
parent 306adfcca0
commit 8f94e54395
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 11 additions and 4 deletions

View File

@ -250,9 +250,11 @@ public class SearchIndexApp extends AbstractNativeApplication {
private void sendUpdates() {
try {
WebSocketManager.getInstance()
.broadCastMessageToAll(
WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, JsonUtils.pojoToJson(jobData));
if (WebSocketManager.getInstance() != null) {
WebSocketManager.getInstance()
.broadCastMessageToAll(
WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, JsonUtils.pojoToJson(jobData));
}
} catch (Exception ex) {
LOG.error("Failed to send updated stats with WebSocket", ex);
}

View File

@ -6,6 +6,7 @@ import static org.quartz.impl.matchers.GroupMatcher.jobGroupEquals;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.AppRuntime;
@ -37,7 +38,7 @@ public class AppScheduler {
public static final String SEARCH_CLIENT_KEY = "searchClientKey";
private static AppScheduler instance;
private static volatile boolean initialized = false;
private final Scheduler scheduler;
@Getter private final Scheduler scheduler;
private static final ConcurrentHashMap<UUID, JobDetail> appJobsKeyMap = new ConcurrentHashMap<>();
private final CollectionDAO collectionDAO;
private final SearchRepository searchClient;

View File

@ -100,6 +100,7 @@ public class TeamRepository extends EntityRepository<Team> {
TEAM_UPDATE_FIELDS);
this.quoteFqn = true;
supportsSearch = true;
initOrganization();
}
@Override

View File

@ -252,6 +252,9 @@ public class OpenMetadataOperations implements Callable<Integer> {
config.getElasticSearchConfiguration().getSearchIndexMappingLanguage()))
.withRuntime(new ScheduledExecutionContext().withEnabled(true));
AppScheduler.getInstance().triggerOnDemandApplication(searchIndexApp);
do {
Thread.sleep(3000l);
} while (!AppScheduler.getInstance().getScheduler().getCurrentlyExecutingJobs().isEmpty());
return 0;
} catch (Exception e) {
LOG.error("Failed to reindex due to ", e);