Added websocket updates on jobchannel (#8014)

This commit is contained in:
Mohit Yadav 2022-10-07 17:48:37 +05:30 committed by GitHub
parent 861c846932
commit 46bda6dc32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 68 deletions

View File

@ -13,6 +13,7 @@ import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@ -46,6 +47,7 @@ import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.api.CreateEventPublisherJob.RunMode;
import org.openmetadata.schema.api.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.settings.EventPublisherJob;
import org.openmetadata.schema.settings.FailureDetails;
import org.openmetadata.schema.settings.Stats;
@ -56,6 +58,7 @@ import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.UserRepository;
import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.util.ConfigurationHolder;
@ -79,6 +82,7 @@ public class BuildSearchIndexResource {
private final CollectionDAO dao;
private final Authorizer authorizer;
private final ExecutorService threadScheduler;
private final UserRepository userRepository;
public BuildSearchIndexResource(CollectionDAO dao, Authorizer authorizer) {
if (ConfigurationHolder.getInstance()
@ -92,6 +96,7 @@ public class BuildSearchIndexResource {
this.elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(client, dao);
}
this.dao = dao;
this.userRepository = new UserRepository(dao);
this.authorizer = authorizer;
this.threadScheduler =
new ThreadPoolExecutor(
@ -121,16 +126,16 @@ public class BuildSearchIndexResource {
@ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found")
})
public Response reindexAllEntities(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Valid CreateEventPublisherJob createRequest) {
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateEventPublisherJob createRequest)
throws IOException {
// Only admins can issue a reindex request
authorizer.authorizeAdmin(securityContext, false);
String startedBy = securityContext.getUserPrincipal().getName();
User user =
userRepository.getByName(null, securityContext.getUserPrincipal().getName(), userRepository.getFields("id"));
if (createRequest.getRunMode() == RunMode.BATCH) {
return startReindexingBatchMode(uriInfo, startedBy, createRequest);
return startReindexingBatchMode(uriInfo, user.getId(), createRequest);
} else {
return startReindexingStreamMode(uriInfo, startedBy, createRequest);
return startReindexingStreamMode(uriInfo, user.getId(), createRequest);
}
}
@ -170,14 +175,14 @@ public class BuildSearchIndexResource {
}
private synchronized Response startReindexingStreamMode(
UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) {
UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) {
// create a new Job
threadScheduler.submit(() -> this.submitStreamJob(uriInfo, startedBy, createRequest));
return Response.status(Response.Status.OK).entity("Reindexing Started").build();
}
private synchronized Response startReindexingBatchMode(
UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) {
UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) {
// create a new Job
threadScheduler.submit(
() -> {
@ -190,17 +195,38 @@ public class BuildSearchIndexResource {
return Response.status(Response.Status.OK).entity("Reindexing Started").build();
}
private synchronized void submitStreamJob(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) {
private synchronized void submitStreamJob(UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) {
try {
for (String entityName : createRequest.getEntities()) {
updateEntityStream(uriInfo, entityName, createRequest);
updateEntityStream(uriInfo, startedBy, entityName, createRequest);
}
// Mark the Job end
String reindexJobString =
dao.entityExtensionTimeSeriesDao()
.getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class);
long lastUpdateTime = latestJob.getTimestamp();
Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
latestJob.setTimestamp(time);
latestJob.setEndTime(time);
if (latestJob.getFailureDetails() != null) {
latestJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR);
} else {
latestJob.setStatus(EventPublisherJob.Status.ACTIVE);
}
dao.entityExtensionTimeSeriesDao()
.update(
ELASTIC_SEARCH_ENTITY_FQN_STREAM,
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(latestJob),
lastUpdateTime);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private synchronized void submitBatchJob(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest)
private synchronized void submitBatchJob(UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest)
throws IOException {
long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
String recordString =
@ -208,6 +234,7 @@ public class BuildSearchIndexResource {
EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
long originalLastUpdate = lastRecord.getTimestamp();
lastRecord.setStatus(EventPublisherJob.Status.ACTIVE);
lastRecord.setStats(new Stats().withFailed(0).withTotal(0).withSuccess(0));
lastRecord.setTimestamp(updateTime);
lastRecord.setEntities(createRequest.getEntities());
dao.entityExtensionTimeSeriesDao()
@ -218,33 +245,13 @@ public class BuildSearchIndexResource {
originalLastUpdate);
// Update Listener for only Batch
BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(dao);
BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(dao, startedBy);
BulkProcessor processor =
getBulkProcessor(bulkProcessorListener, createRequest.getBatchSize(), createRequest.getFlushIntervalInSec());
for (String entityName : createRequest.getEntities()) {
updateEntityBatch(processor, bulkProcessorListener, uriInfo, entityName, createRequest);
}
// mark the job as done
EventPublisherJob lastReadRecord =
JsonUtils.readValue(
dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION),
EventPublisherJob.class);
long lastRecordTimestamp = lastReadRecord.getTimestamp();
lastReadRecord.setStatus(EventPublisherJob.Status.IDLE);
lastReadRecord.setTimestamp(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime());
lastReadRecord.setStats(
new Stats()
.withTotal(bulkProcessorListener.getTotalRequests())
.withSuccess(bulkProcessorListener.getTotalSuccessCount())
.withFailed(bulkProcessorListener.getTotalFailedCount()));
dao.entityExtensionTimeSeriesDao()
.update(
ELASTIC_SEARCH_ENTITY_FQN_BATCH,
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(lastReadRecord),
lastRecordTimestamp);
}
private synchronized void updateEntityBatch(
@ -294,7 +301,7 @@ public class BuildSearchIndexResource {
}
private synchronized void updateEntityStream(
UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest) throws IOException {
UriInfo uriInfo, UUID startedBy, String entityType, CreateEventPublisherJob createRequest) throws IOException {
ElasticSearchIndexDefinition.ElasticSearchIndexType indexType =
elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType);
@ -327,27 +334,6 @@ public class BuildSearchIndexResource {
} catch (Exception ex) {
LOG.error("Failed in listing all Entities of type : {}", entityType);
}
// Mark the Job end
String reindexJobString =
dao.entityExtensionTimeSeriesDao()
.getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class);
long lastUpdateTime = latestJob.getTimestamp();
Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
latestJob.setTimestamp(time);
latestJob.setEndTime(time);
if (latestJob.getFailureDetails() != null) {
latestJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR);
} else {
latestJob.setStatus(EventPublisherJob.Status.ACTIVE);
}
dao.entityExtensionTimeSeriesDao()
.update(
ELASTIC_SEARCH_ENTITY_FQN_STREAM,
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(latestJob),
lastUpdateTime);
}
private synchronized void updateElasticSearchForEntityBatch(

View File

@ -6,6 +6,7 @@ import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexR
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
@ -15,6 +16,7 @@ import org.openmetadata.schema.settings.EventPublisherJob;
import org.openmetadata.schema.settings.FailureDetails;
import org.openmetadata.schema.settings.Stats;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
@ -24,9 +26,11 @@ public class BulkProcessorListener implements BulkProcessor.Listener {
private volatile int totalFailedCount = 0;
private volatile int totalRequests = 0;
private final CollectionDAO dao;
private final UUID startedBy;
public BulkProcessorListener(CollectionDAO dao) {
public BulkProcessorListener(CollectionDAO dao, UUID startedBy) {
this.dao = dao;
this.startedBy = startedBy;
this.resetCounters();
}
@ -107,7 +111,11 @@ public class BulkProcessorListener implements BulkProcessor.Listener {
dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION);
EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
long originalLastUpdate = lastRecord.getTimestamp();
lastRecord.setStatus(status);
if (totalRequests == totalFailedCount + totalSuccessCount) {
lastRecord.setStatus(EventPublisherJob.Status.IDLE);
} else {
lastRecord.setStatus(status);
}
lastRecord.setTimestamp(updateTime);
if (failDetails != null) {
lastRecord.setFailureDetails(
@ -120,20 +128,10 @@ public class BulkProcessorListener implements BulkProcessor.Listener {
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(lastRecord),
originalLastUpdate);
WebSocketManager.getInstance()
.sendToOne(this.startedBy, WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, JsonUtils.pojoToJson(lastRecord));
} catch (Exception e) {
LOG.error("Failed to Update Elastic Search Job Info");
}
}
public int getTotalSuccessCount() {
return totalSuccessCount;
}
public int getTotalFailedCount() {
return totalFailedCount;
}
public int getTotalRequests() {
return totalRequests;
}
}

View File

@ -20,6 +20,7 @@ public class WebSocketManager {
@Getter private final SocketIoServer socketIoServer;
public static final String FEED_BROADCAST_CHANNEL = "activityFeed";
public static final String TASK_BROADCAST_CHANNEL = "taskChannel";
public static final String JOB_STATUS_BROADCAST_CHANNEL = "jobStatus";
public static final String MENTION_CHANNEL = "mentionChannel";
public static final String ANNOUNCEMENT_CHANNEL = "announcementChannel";
private final Map<UUID, Map<String, SocketIoSocket>> activityFeedEndpoints = new ConcurrentHashMap<>();