From b934ed48707f3896d5509cda9f96c77eae42fcd6 Mon Sep 17 00:00:00 2001 From: mohitdeuex <105265192+mohitdeuex@users.noreply.github.com> Date: Wed, 28 Sep 2022 22:55:00 +0530 Subject: [PATCH] Stream and Batch Global ES Error Logging added (#7780) * [ES Reindex] Added ES Reindexing from API * [ES Reindex] checkstyle * [ES Reindex] return complete record for status info * fixes * added limit param to get no of records * Review Comments + Stream mode * Review Comments * Fix for failing tests * Global ES monitoring with Stream + batch mode to allow Reindexing --- .../service/OpenMetadataApplication.java | 8 +- .../ElasticSearchEventPublisher.java | 97 +++++- .../service/jdbi3/CollectionDAO.java | 2 +- .../BuildSearchIndexResource.java | 309 +++++++++++------- .../elasticSearch/BulkProcessorListener.java | 126 ++++--- .../schema/api/createEventPublisherJob.json | 6 +- .../schema/settings/eventPublisherJob.json | 7 +- 7 files changed, 354 insertions(+), 201 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java index a619b2a22eb..dbe524cb0c2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java @@ -69,6 +69,7 @@ import org.openmetadata.service.exception.CatalogGenericExceptionMapper; import org.openmetadata.service.exception.ConstraintViolationExceptionMapper; import org.openmetadata.service.exception.JsonMappingExceptionMapper; import org.openmetadata.service.fernet.Fernet; +import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator; import org.openmetadata.service.migration.Migration; import org.openmetadata.service.migration.MigrationConfiguration; @@ -145,7 +146,7 @@ public class OpenMetadataApplication extends Application()); this.client = ElasticSearchClientUtils.createElasticSearchClient(esConfig); esIndexDefinition = new ElasticSearchIndexDefinition(client); esIndexDefinition.createIndexes(); + this.dao = dao; + // needs Db connection + registerElasticSearchJobs(); } @Override @@ -158,16 +172,27 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { } } catch (DocumentMissingException ex) { LOG.error("Missing Document", ex); + updateElasticSearchFailureStatus( + EventPublisherJob.Status.ACTIVEWITHERROR, "Missing Document while Updating ES."); } catch (ElasticsearchException e) { LOG.error("failed to update ES doc"); LOG.debug(e.getMessage()); if (e.status() == RestStatus.GATEWAY_TIMEOUT || e.status() == RestStatus.REQUEST_TIMEOUT) { LOG.error("Error in publishing to ElasticSearch"); + updateElasticSearchFailureStatus( + EventPublisherJob.Status.ACTIVEWITHERROR, + String.format("Timeout when updating ES request. Reason %s", e.getMessage())); throw new ElasticSearchRetriableException(e.getMessage()); } else { + updateElasticSearchFailureStatus( + EventPublisherJob.Status.ACTIVEWITHERROR, + String.format("Failed while updating ES. Reason %s", e.getMessage())); LOG.error(e.getMessage(), e); } } catch (IOException ie) { + updateElasticSearchFailureStatus( + EventPublisherJob.Status.ACTIVEWITHERROR, + String.format("Issue in updating ES request. Reason %s", ie.getMessage())); throw new EventPublisherException(ie.getMessage()); } } @@ -662,6 +687,76 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { } } + public void registerElasticSearchJobs() { + try { + dao.entityExtensionTimeSeriesDao() + .delete( + BuildSearchIndexResource.ELASTIC_SEARCH_ENTITY_FQN_BATCH, + BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION); + dao.entityExtensionTimeSeriesDao() + .delete(ELASTIC_SEARCH_ENTITY_FQN_STREAM, BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION); + long startTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + FailureDetails failureDetails = new FailureDetails().withLastFailedAt(0L).withLastFailedReason("No Failures"); + EventPublisherJob batchJob = + new EventPublisherJob() + .withName("Elastic Search Batch") + .withPublisherType(CreateEventPublisherJob.PublisherType.ELASTIC_SEARCH) + .withRunMode(CreateEventPublisherJob.RunMode.BATCH) + .withStatus(EventPublisherJob.Status.ACTIVE) + .withTimestamp(startTime) + .withStartedBy("admin") + .withStartTime(startTime) + .withFailureDetails(failureDetails); + EventPublisherJob streamJob = + new EventPublisherJob() + .withName("Elastic Search Stream") + .withPublisherType(CreateEventPublisherJob.PublisherType.ELASTIC_SEARCH) + .withRunMode(CreateEventPublisherJob.RunMode.STREAM) + .withStatus(EventPublisherJob.Status.ACTIVE) + .withTimestamp(startTime) + .withStartedBy("admin") + .withStartTime(startTime) + .withFailureDetails(failureDetails); + dao.entityExtensionTimeSeriesDao() + .insert( + BuildSearchIndexResource.ELASTIC_SEARCH_ENTITY_FQN_BATCH, + BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION, + "eventPublisherJob", + JsonUtils.pojoToJson(batchJob)); + dao.entityExtensionTimeSeriesDao() + .insert( + ELASTIC_SEARCH_ENTITY_FQN_STREAM, + BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION, + "eventPublisherJob", + JsonUtils.pojoToJson(streamJob)); + } catch (Exception e) { + LOG.error("Failed to register Elastic Search Job"); + } + } + + public void updateElasticSearchFailureStatus(EventPublisherJob.Status status, String failureMessage) { + try { + long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + String recordString = + dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION); + EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class); + long originalLastUpdate = lastRecord.getTimestamp(); + lastRecord.setStatus(status); + lastRecord.setTimestamp(updateTime); + lastRecord.setFailureDetails( + new FailureDetails().withLastFailedAt(updateTime).withLastFailedReason(failureMessage)); + + dao.entityExtensionTimeSeriesDao() + .update( + ELASTIC_SEARCH_ENTITY_FQN_STREAM, + ELASTIC_SEARCH_EXTENSION, + JsonUtils.pojoToJson(lastRecord), + originalLastUpdate); + } catch (Exception e) { + LOG.error("Failed to Update Elastic Search Job Info"); + } + } + public void close() { try { this.client.close(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index bcdd64f234d..aecb97deb56 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -2997,7 +2997,7 @@ public interface CollectionDAO { List getExtensions(@Bind("id") String id, @Bind("extensionPrefix") String extensionPrefix); @SqlUpdate("DELETE FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension") - void delete(@Bind("entityId") String id, @Bind("extension") String extension); + void delete(@Bind("entityFQN") String entityFQN, @Bind("extension") String extension); @SqlUpdate("DELETE FROM entity_extension_time_series WHERE entityFQN = :entityFQN") void deleteAll(@Bind("entityFQN") String entityFQN); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java index 4bc2e1d957d..a06bf59e0f6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.java @@ -13,7 +13,6 @@ import static org.openmetadata.service.Entity.USER; import com.fasterxml.jackson.core.JsonProcessingException; import io.swagger.annotations.Api; import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.responses.ApiResponse; import java.io.IOException; import java.time.LocalDateTime; @@ -21,19 +20,18 @@ import java.time.ZoneId; import java.util.Date; import java.util.List; import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import javax.validation.Valid; -import javax.validation.constraints.Max; -import javax.validation.constraints.Min; import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -80,14 +78,13 @@ import org.openmetadata.service.util.ResultList; @Slf4j public class BuildSearchIndexResource { public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher"; - public static final String ELASTIC_SEARCH_ENTITY_FQN = "eventPublisher:ElasticSearch"; + public static final String ELASTIC_SEARCH_ENTITY_FQN_STREAM = "eventPublisher:ElasticSearch:STREAM"; + public static final String ELASTIC_SEARCH_ENTITY_FQN_BATCH = "eventPublisher:ElasticSearch:BATCH"; private final RestHighLevelClient client; private final ElasticSearchIndexDefinition elasticSearchIndexDefinition; private final CollectionDAO dao; private final Authorizer authorizer; - private final BulkProcessorListener elasticSearchBulkProcessorListener; - private final BulkProcessor bulkProcessor; - private final ExecutorService threadScheduler = Executors.newFixedThreadPool(2); + private final ExecutorService threadScheduler; public BuildSearchIndexResource(CollectionDAO dao, Authorizer authorizer) { this.client = @@ -98,17 +95,20 @@ public class BuildSearchIndexResource { this.dao = dao; this.authorizer = authorizer; this.elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(client); - this.elasticSearchBulkProcessorListener = new BulkProcessorListener(dao); + this.threadScheduler = + new ThreadPoolExecutor( + 2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + private BulkProcessor getBulkProcessor(BulkProcessorListener listener) { BiConsumer> bulkConsumer = (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); - // Setup a bulk Processor - BulkProcessor.Builder builder = - BulkProcessor.builder(bulkConsumer, elasticSearchBulkProcessorListener, "es-reindex"); + BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener, "es-reindex"); builder.setBulkActions(100); builder.setConcurrentRequests(2); builder.setFlushInterval(TimeValue.timeValueSeconds(60L)); builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); - this.bulkProcessor = builder.build(); + return builder.build(); } @POST @@ -128,11 +128,15 @@ public class BuildSearchIndexResource { // Only admins can issue a reindex request authorizer.authorizeAdmin(securityContext, false); String startedBy = securityContext.getUserPrincipal().getName(); - return startReindexing(uriInfo, startedBy, createRequest); + if (createRequest.getRunMode() == RunMode.BATCH) { + return startReindexingBatchMode(uriInfo, startedBy, createRequest); + } else { + return startReindexingStreamMode(uriInfo, startedBy, createRequest); + } } @GET - @Path("/reindex/status") + @Path("/reindex/status/{runMode}") @Operation( operationId = "getReindexAllLastJobStatus", summary = "Get Last Run Reindex All Job Status", @@ -143,97 +147,130 @@ public class BuildSearchIndexResource { @ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found") }) public Response reindexAllJobLastStatus( - @Context UriInfo uriInfo, - @Context SecurityContext securityContext, - @Parameter(description = "Limit the number users returned. (1 to 1000000, default = 10)") - @DefaultValue("10") - @Min(0) - @Max(1000000) - @QueryParam("limit") - int limitParam) + @Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("runMode") String runMode) throws IOException { // Only admins can issue a reindex request authorizer.authorizeAdmin(securityContext, false); // Check if there is a running job for reindex for requested entity - List records = - dao.entityExtensionTimeSeriesDao() - .getLastLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, limitParam); - if (records != null) { - return Response.status(Response.Status.OK) - .entity(JsonUtils.readObjects(records, EventPublisherJob.class)) - .build(); + String record; + if (runMode.equals(RunMode.BATCH.toString())) { + record = + dao.entityExtensionTimeSeriesDao() + .getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION); + } else if (runMode.equals(RunMode.STREAM.toString())) { + record = + dao.entityExtensionTimeSeriesDao() + .getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION); + } else { + return Response.status(Response.Status.BAD_REQUEST).entity("Invalid Run Mode").build(); + } + if (record != null) { + return Response.status(Response.Status.OK).entity(JsonUtils.readValue(record, EventPublisherJob.class)).build(); } return Response.status(Response.Status.NOT_FOUND).entity("No Last Run.").build(); } - private synchronized Response startReindexing( + private synchronized Response startReindexingStreamMode( UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) throws IOException { - String reindexJobString = - dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION); - EventPublisherJob reindexJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); - if (reindexJob != null - && ((System.currentTimeMillis() - reindexJob.getTimestamp() > 3600000) - || reindexJob.getStatus() == EventPublisherJob.Status.SUCCESS)) { - return Response.status(Response.Status.FORBIDDEN) - .entity("Reindexing is Running Already. Cannot issue new request.") - .build(); - } else { - // create a new Job - Long startTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); - EventPublisherJob newJob = - new EventPublisherJob() - .withName(createRequest.getName()) - .withPublisherType(createRequest.getPublisherType()) - .withRunMode(createRequest.getRunMode()) - .withStatus(EventPublisherJob.Status.RUNNING) - .withTimestamp(startTime) - .withStartedBy(startedBy) - .withStartTime(startTime) - .withEntities(createRequest.getEntities()); + // create a new Job + threadScheduler.submit( + () -> { + try { + this.submitStreamJob(uriInfo, startedBy, createRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return Response.status(Response.Status.OK).entity("Reindexing Started").build(); + } - dao.entityExtensionTimeSeriesDao() - .insert( - ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, "eventPublisherJob", JsonUtils.pojoToJson(newJob)); + private synchronized Response startReindexingBatchMode( + UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) throws IOException { + // create a new Job + threadScheduler.submit( + () -> { + try { + this.submitBatchJob(uriInfo, startedBy, createRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return Response.status(Response.Status.OK).entity("Reindexing Started").build(); + } - // Update Listener for only Batch - if (createRequest.getRunMode() == RunMode.BATCH) { - elasticSearchBulkProcessorListener.setRequestIssuer(startedBy); - elasticSearchBulkProcessorListener.setCreateRequest(createRequest); - elasticSearchBulkProcessorListener.setEntityFQN(ELASTIC_SEARCH_ENTITY_FQN); - elasticSearchBulkProcessorListener.setStartTime(startTime); - elasticSearchBulkProcessorListener.resetCounters(); + private synchronized void submitStreamJob(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) + throws JsonProcessingException { + try { + if (createRequest.getEntities().contains("all")) { + updateEntityStream(uriInfo, TABLE, createRequest); + updateEntityStream(uriInfo, TOPIC, createRequest); + updateEntityStream(uriInfo, DASHBOARD, createRequest); + updateEntityStream(uriInfo, PIPELINE, createRequest); + updateEntityStream(uriInfo, USER, createRequest); + updateEntityStream(uriInfo, TEAM, createRequest); + updateEntityStream(uriInfo, GLOSSARY_TERM, createRequest); + updateEntityStream(uriInfo, MLMODEL, createRequest); + updateEntityStream(uriInfo, TAG, createRequest); + } else { + for (String entityName : createRequest.getEntities()) { + updateEntityStream(uriInfo, entityName, createRequest); + } } - - // Start Full Reindexing - threadScheduler.submit( - () -> { - try { - if (createRequest.getEntities().contains("all")) { - updateEntity(uriInfo, TABLE, createRequest); - updateEntity(uriInfo, TOPIC, createRequest); - updateEntity(uriInfo, DASHBOARD, createRequest); - updateEntity(uriInfo, PIPELINE, createRequest); - updateEntity(uriInfo, USER, createRequest); - updateEntity(uriInfo, TEAM, createRequest); - updateEntity(uriInfo, GLOSSARY_TERM, createRequest); - updateEntity(uriInfo, MLMODEL, createRequest); - updateEntity(uriInfo, TAG, createRequest); - } else { - for (String entityName : createRequest.getEntities()) { - updateEntity(uriInfo, entityName, createRequest); - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - return Response.status(Response.Status.OK).entity("Reindexing Started").build(); + } catch (IOException e) { + throw new RuntimeException(e); } } - private synchronized void updateEntity(UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest) + private synchronized void submitBatchJob(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) throws IOException { - elasticSearchBulkProcessorListener.allowTotalRequestUpdate(); + long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + String recordString = + dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION); + EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class); + long originalLastUpdate = lastRecord.getTimestamp(); + lastRecord.setStatus(EventPublisherJob.Status.ACTIVE); + lastRecord.setTimestamp(updateTime); + dao.entityExtensionTimeSeriesDao() + .update( + ELASTIC_SEARCH_ENTITY_FQN_BATCH, + ELASTIC_SEARCH_EXTENSION, + JsonUtils.pojoToJson(lastRecord), + originalLastUpdate); + + // Update Listener for only Batch + BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(dao); + ; + BulkProcessor processor = getBulkProcessor(bulkProcessorListener); + + try { + if (createRequest.getEntities().contains("all")) { + updateEntityBatch(processor, bulkProcessorListener, uriInfo, TABLE, createRequest); + updateEntityBatch(processor, bulkProcessorListener, uriInfo, TOPIC, createRequest); + updateEntityBatch(processor, bulkProcessorListener, uriInfo, DASHBOARD, createRequest); + updateEntityBatch(processor, bulkProcessorListener, uriInfo, PIPELINE, createRequest); + updateEntityBatch(processor, bulkProcessorListener, uriInfo, MLMODEL, createRequest); + updateEntityBatch(processor, bulkProcessorListener, uriInfo, USER, createRequest); + updateEntityBatch(processor, bulkProcessorListener, uriInfo, TEAM, createRequest); + updateEntityBatch(processor, bulkProcessorListener, uriInfo, GLOSSARY_TERM, createRequest); + updateEntityBatch(processor, bulkProcessorListener, uriInfo, TAG, createRequest); + } else { + for (String entityName : createRequest.getEntities()) { + updateEntityBatch(processor, bulkProcessorListener, uriInfo, entityName, createRequest); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private synchronized void updateEntityBatch( + BulkProcessor processor, + BulkProcessorListener listener, + UriInfo uriInfo, + String entityType, + CreateEventPublisherJob createRequest) + throws IOException { + listener.allowTotalRequestUpdate(); ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType); @@ -259,35 +296,67 @@ public class BuildSearchIndexResource { new ListFilter(Include.ALL), createRequest.getBatchSize(), after); - if (createRequest.getRunMode() == RunMode.BATCH) { - elasticSearchBulkProcessorListener.addRequests(result.getPaging().getTotal()); - updateElasticSearchForEntityBatch(entityType, result.getData()); - } else { - updateElasticSearchForEntityStream(entityType, result.getData()); - } + listener.addRequests(result.getPaging().getTotal()); + updateElasticSearchForEntityBatch(processor, entityType, result.getData()); + after = result.getPaging().getAfter(); + } while (after != null); + } + + private synchronized void updateEntityStream( + UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest) throws IOException { + + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = + elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType); + + if (createRequest.getRecreateIndex()) { + // Delete index + elasticSearchIndexDefinition.deleteIndex(indexType); + // Create index + elasticSearchIndexDefinition.createIndex(indexType); + } + + // Start fetching a list of Entities and pushing them to ES + EntityRepository entityRepository = Entity.getEntityRepository(entityType); + List allowedFields = entityRepository.getAllowedFields(); + String fields = String.join(",", allowedFields); + ResultList result; + String after = null; + do { + result = + entityRepository.listAfter( + uriInfo, + new EntityUtil.Fields(allowedFields, fields), + new ListFilter(Include.ALL), + createRequest.getBatchSize(), + after); + updateElasticSearchForEntityStream(entityType, result.getData()); after = result.getPaging().getAfter(); } while (after != null); - if (createRequest.getRunMode() == RunMode.STREAM) { - String reindexJobString = - dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION); - EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); - long lastUpdateTime = latestJob.getTimestamp(); - Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); - latestJob.setTimestamp(time); - latestJob.setEndTime(time); - if (latestJob.getFailureDetails() != null) { - latestJob.setStatus(EventPublisherJob.Status.FAILED); - } else { - latestJob.setStatus(EventPublisherJob.Status.SUCCESS); - } - dao.entityExtensionTimeSeriesDao() - .update(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(latestJob), lastUpdateTime); + // Mark the Job end + String reindexJobString = + dao.entityExtensionTimeSeriesDao() + .getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION); + EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); + long lastUpdateTime = latestJob.getTimestamp(); + Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + latestJob.setTimestamp(time); + latestJob.setEndTime(time); + if (latestJob.getFailureDetails() != null) { + latestJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR); + } else { + latestJob.setStatus(EventPublisherJob.Status.ACTIVE); } + dao.entityExtensionTimeSeriesDao() + .update( + ELASTIC_SEARCH_ENTITY_FQN_STREAM, + ELASTIC_SEARCH_EXTENSION, + JsonUtils.pojoToJson(latestJob), + lastUpdateTime); } - private synchronized void updateElasticSearchForEntityBatch(String entityType, List entities) - throws IOException { + private synchronized void updateElasticSearchForEntityBatch( + BulkProcessor bulkProcessor, String entityType, List entities) throws IOException { for (EntityInterface entity : entities) { if (entityType.equals(TABLE)) { ((Table) entity).getColumns().forEach(table -> table.setProfile(null)); @@ -299,7 +368,8 @@ public class BuildSearchIndexResource { private synchronized void updateElasticSearchForEntityStream(String entityType, List entities) throws IOException { String reindexJobString = - dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION); + dao.entityExtensionTimeSeriesDao() + .getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION); EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); Long lastUpdateTime = latestJob.getTimestamp(); for (EntityInterface entity : entities) { @@ -313,10 +383,15 @@ public class BuildSearchIndexResource { } catch (IOException ex) { failureDetails = new FailureDetails().withLastFailedAt(time).withLastFailedReason(ex.getMessage()); latestJob.setFailureDetails(failureDetails); + latestJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR); } latestJob.setTimestamp(time); dao.entityExtensionTimeSeriesDao() - .update(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(latestJob), lastUpdateTime); + .update( + ELASTIC_SEARCH_ENTITY_FQN_STREAM, + ELASTIC_SEARCH_EXTENSION, + JsonUtils.pojoToJson(latestJob), + lastUpdateTime); lastUpdateTime = time; } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java index 27f345499ff..2db5510f202 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticSearch/BulkProcessorListener.java @@ -1,8 +1,8 @@ package org.openmetadata.service.resources.elasticSearch; +import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexResource.ELASTIC_SEARCH_ENTITY_FQN_BATCH; import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION; -import com.fasterxml.jackson.core.JsonProcessingException; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Date; @@ -11,7 +11,6 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.openmetadata.schema.api.CreateEventPublisherJob; import org.openmetadata.schema.settings.EventPublisherJob; import org.openmetadata.schema.settings.FailureDetails; import org.openmetadata.schema.settings.Stats; @@ -24,16 +23,11 @@ public class BulkProcessorListener implements BulkProcessor.Listener { private volatile int totalSuccessCount = 0; private volatile int totalFailedCount = 0; private volatile int totalRequests = 0; - - private CreateEventPublisherJob createRequest; - private String requestIssuer = "anonymous"; - private String entityFQN; private final CollectionDAO dao; - private Long startTime; - private Long originalStartTime; public BulkProcessorListener(CollectionDAO dao) { this.dao = dao; + this.resetCounters(); } @Override @@ -44,65 +38,45 @@ public class BulkProcessorListener implements BulkProcessor.Listener { @Override public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) { - int failedCount = 0; - int successCount; - FailureDetails failureDetails = null; - for (BulkItemResponse bulkItemResponse : bulkResponse) { - if (bulkItemResponse.isFailed()) { - BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - failureDetails = new FailureDetails(); - failureDetails.setLastFailedAt( - Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime()); - failureDetails.setLastFailedReason( - String.format("ID [%s]. Reason : %s", failure.getId(), failure.getMessage())); - failedCount++; - } - } - successCount = bulkResponse.getItems().length - failedCount; - updateFailedAndSuccess(failedCount, successCount); - // update stats in DB - Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); - EventPublisherJob updateJob = - new EventPublisherJob() - .withName(createRequest.getName()) - .withPublisherType(createRequest.getPublisherType()) - .withRunMode(createRequest.getRunMode()) - .withStatus(EventPublisherJob.Status.RUNNING) - .withTimestamp(time) - .withStats(new Stats().withFailed(totalFailedCount).withSuccess(totalSuccessCount).withTotal(totalRequests)) - .withStartedBy(requestIssuer) - .withFailureDetails(failureDetails) - .withStartTime(originalStartTime) - .withEntities(createRequest.getEntities()); - if (totalRequests == totalFailedCount + totalSuccessCount) { - updateJob.setStatus(EventPublisherJob.Status.SUCCESS); - updateJob.setEndTime(time); - } + // Get last Update Details try { - dao.entityExtensionTimeSeriesDao() - .update(entityFQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(updateJob), startTime); - } catch (JsonProcessingException e) { - LOG.error("Failed in Converting to Json."); + boolean batchHasFailures = false; + int failedCount = 0; + // Checking for failure in Items + FailureDetails failureDetails = new FailureDetails(); + for (BulkItemResponse bulkItemResponse : bulkResponse) { + if (bulkItemResponse.isFailed()) { + BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); + ; + failureDetails.setLastFailedReason( + String.format("ID [%s]. Reason : %s", failure.getId(), failure.getMessage())); + failedCount++; + batchHasFailures = true; + } + } + updateFailedAndSuccess(failedCount, bulkResponse.getItems().length - failedCount); + + // update stats in DB + EventPublisherJob.Status status = + batchHasFailures ? EventPublisherJob.Status.ACTIVEWITHERROR : EventPublisherJob.Status.ACTIVE; + Stats stats = new Stats().withFailed(totalFailedCount).withSuccess(totalSuccessCount).withTotal(totalRequests); + FailureDetails hasFailureDetails = batchHasFailures ? failureDetails : null; + updateElasticSearchStatus(status, hasFailureDetails, stats); + } catch (RuntimeException e) { + LOG.error("Error in processing Bulk"); } - startTime = time; } @Override public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) { LOG.error("Failed to execute bulk", throwable); updateFailedAndSuccess(bulkRequest.numberOfActions(), 0); - } - - public String getEntityFQN() { - return entityFQN; - } - - public void setRequestIssuer(String adminName) { - this.requestIssuer = adminName; - } - - public void setEntityFQN(String entityFQN) { - this.entityFQN = entityFQN; + EventPublisherJob.Status status = EventPublisherJob.Status.ACTIVEWITHERROR; + Stats stats = new Stats().withFailed(totalFailedCount).withSuccess(totalSuccessCount).withTotal(totalRequests); + FailureDetails hasFailureDetails = + new FailureDetails() + .withLastFailedReason(String.format("Batch Failed Completely. Reason : %s ", throwable.getMessage())); + updateElasticSearchStatus(status, hasFailureDetails, stats); } public synchronized void addRequests(int count) { @@ -128,16 +102,28 @@ public class BulkProcessorListener implements BulkProcessor.Listener { totalSuccessCount += successCount; } - public void setStartTime(Long time) { - this.startTime = time; - this.originalStartTime = time; - } - - public CreateEventPublisherJob getCreateRequest() { - return createRequest; - } - - public void setCreateRequest(CreateEventPublisherJob createRequest) { - this.createRequest = createRequest; + public void updateElasticSearchStatus(EventPublisherJob.Status status, FailureDetails failDetails, Stats newStats) { + try { + long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + String recordString = + dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION); + EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class); + long originalLastUpdate = lastRecord.getTimestamp(); + lastRecord.setStatus(status); + lastRecord.setTimestamp(updateTime); + if (failDetails != null) { + lastRecord.setFailureDetails( + new FailureDetails().withLastFailedAt(updateTime).withLastFailedReason(failDetails.getLastFailedReason())); + } + lastRecord.setStats(newStats); + dao.entityExtensionTimeSeriesDao() + .update( + ELASTIC_SEARCH_ENTITY_FQN_BATCH, + ELASTIC_SEARCH_EXTENSION, + JsonUtils.pojoToJson(lastRecord), + originalLastUpdate); + } catch (Exception e) { + LOG.error("Failed to Update Elastic Search Job Info"); + } } } diff --git a/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json index f07206aecd8..c9d875eb827 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/createEventPublisherJob.json @@ -6,10 +6,6 @@ "type": "object", "javaType": "org.openmetadata.schema.api.CreateEventPublisherJob", "properties": { - "name": { - "description": "Name of the Job", - "type": "string" - }, "publisherType": { "$ref": "../settings/eventPublisherJob.json#/definitions/publisherType" }, @@ -36,6 +32,6 @@ "default": 100 } }, - "required": ["name", "publisherType","runMode"], + "required": ["publisherType", "runMode"], "additionalProperties": false } diff --git a/openmetadata-spec/src/main/resources/json/schema/settings/eventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/settings/eventPublisherJob.json index 0054ad69f7c..0c82835facb 100644 --- a/openmetadata-spec/src/main/resources/json/schema/settings/eventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/settings/eventPublisherJob.json @@ -72,10 +72,9 @@ "description": "This schema publisher run job status.", "type": "string", "enum": [ - "Failed", - "Success", - "Aborted", - "Running" + "ACTIVE", + "RETRY", + "ACTIVEWITHERROR" ] }, "failureDetails": {