Stream and Batch Global ES Error Logging added (#7780)

* [ES Reindex] Added ES Reindexing from API

* [ES Reindex] checkstyle

* [ES Reindex] return complete record for status info

* fixes

* added limit param to get no of records

* Review Comments + Stream mode

* Review Comments

* Fix for failing tests

* Global ES monitoring with Stream + batch mode to allow Reindexing
This commit is contained in:
mohitdeuex 2022-09-28 22:55:00 +05:30 committed by GitHub
parent 0b416564a3
commit b934ed4870
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 354 additions and 201 deletions

View File

@ -69,6 +69,7 @@ import org.openmetadata.service.exception.CatalogGenericExceptionMapper;
import org.openmetadata.service.exception.ConstraintViolationExceptionMapper; import org.openmetadata.service.exception.ConstraintViolationExceptionMapper;
import org.openmetadata.service.exception.JsonMappingExceptionMapper; import org.openmetadata.service.exception.JsonMappingExceptionMapper;
import org.openmetadata.service.fernet.Fernet; import org.openmetadata.service.fernet.Fernet;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator; import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
import org.openmetadata.service.migration.Migration; import org.openmetadata.service.migration.Migration;
import org.openmetadata.service.migration.MigrationConfiguration; import org.openmetadata.service.migration.MigrationConfiguration;
@ -145,7 +146,7 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
registerEventFilter(catalogConfig, environment, jdbi); registerEventFilter(catalogConfig, environment, jdbi);
environment.lifecycle().manage(new ManagedShutdown()); environment.lifecycle().manage(new ManagedShutdown());
// Register Event publishers // Register Event publishers
registerEventPublisher(catalogConfig); registerEventPublisher(catalogConfig, jdbi);
// Check if migration is need from local secret manager to configured one and migrate // Check if migration is need from local secret manager to configured one and migrate
new SecretsManagerMigrationService(secretsManager, catalogConfig.getClusterName()) new SecretsManagerMigrationService(secretsManager, catalogConfig.getClusterName())
@ -263,11 +264,12 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
} }
} }
private void registerEventPublisher(OpenMetadataApplicationConfig openMetadataApplicationConfig) { private void registerEventPublisher(OpenMetadataApplicationConfig openMetadataApplicationConfig, Jdbi jdbi) {
// register ElasticSearch Event publisher // register ElasticSearch Event publisher
if (openMetadataApplicationConfig.getElasticSearchConfiguration() != null) { if (openMetadataApplicationConfig.getElasticSearchConfiguration() != null) {
ElasticSearchEventPublisher elasticSearchEventPublisher = ElasticSearchEventPublisher elasticSearchEventPublisher =
new ElasticSearchEventPublisher(openMetadataApplicationConfig.getElasticSearchConfiguration()); new ElasticSearchEventPublisher(
openMetadataApplicationConfig.getElasticSearchConfiguration(), jdbi.onDemand(CollectionDAO.class));
EventPubSub.addEventHandler(elasticSearchEventPublisher); EventPubSub.addEventHandler(elasticSearchEventPublisher);
} }
} }

View File

@ -17,9 +17,14 @@ package org.openmetadata.service.elasticsearch;
import static org.openmetadata.service.Entity.FIELD_FOLLOWERS; import static org.openmetadata.service.Entity.FIELD_FOLLOWERS;
import static org.openmetadata.service.Entity.FIELD_USAGE_SUMMARY; import static org.openmetadata.service.Entity.FIELD_USAGE_SUMMARY;
import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexResource.ELASTIC_SEARCH_ENTITY_FQN_STREAM;
import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION;
import java.io.IOException; import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -41,6 +46,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.ScriptType;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.api.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.api.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.entity.data.Dashboard; import org.openmetadata.schema.entity.data.Dashboard;
import org.openmetadata.schema.entity.data.Database; import org.openmetadata.schema.entity.data.Database;
@ -59,6 +65,8 @@ import org.openmetadata.schema.entity.services.PipelineService;
import org.openmetadata.schema.entity.tags.Tag; import org.openmetadata.schema.entity.tags.Tag;
import org.openmetadata.schema.entity.teams.Team; import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.entity.teams.User; import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.settings.EventPublisherJob;
import org.openmetadata.schema.settings.FailureDetails;
import org.openmetadata.schema.type.ChangeDescription; import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EntityReference;
@ -70,6 +78,8 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType; import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType;
import org.openmetadata.service.events.AbstractEventPublisher; import org.openmetadata.service.events.AbstractEventPublisher;
import org.openmetadata.service.events.errors.EventPublisherException; import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.resources.elasticSearch.BuildSearchIndexResource;
import org.openmetadata.service.resources.events.EventResource.ChangeEventList; import org.openmetadata.service.resources.events.EventResource.ChangeEventList;
import org.openmetadata.service.util.ElasticSearchClientUtils; import org.openmetadata.service.util.ElasticSearchClientUtils;
import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.JsonUtils;
@ -78,14 +88,18 @@ import org.openmetadata.service.util.JsonUtils;
public class ElasticSearchEventPublisher extends AbstractEventPublisher { public class ElasticSearchEventPublisher extends AbstractEventPublisher {
private final RestHighLevelClient client; private final RestHighLevelClient client;
private final ElasticSearchIndexDefinition esIndexDefinition; private final ElasticSearchIndexDefinition esIndexDefinition;
private final CollectionDAO dao;
private static final String SERVICE_NAME = "service.name"; private static final String SERVICE_NAME = "service.name";
private static final String DATABASE_NAME = "database.name"; private static final String DATABASE_NAME = "database.name";
public ElasticSearchEventPublisher(ElasticSearchConfiguration esConfig) { public ElasticSearchEventPublisher(ElasticSearchConfiguration esConfig, CollectionDAO dao) {
super(esConfig.getBatchSize(), new ArrayList<>()); super(esConfig.getBatchSize(), new ArrayList<>());
this.client = ElasticSearchClientUtils.createElasticSearchClient(esConfig); this.client = ElasticSearchClientUtils.createElasticSearchClient(esConfig);
esIndexDefinition = new ElasticSearchIndexDefinition(client); esIndexDefinition = new ElasticSearchIndexDefinition(client);
esIndexDefinition.createIndexes(); esIndexDefinition.createIndexes();
this.dao = dao;
// needs Db connection
registerElasticSearchJobs();
} }
@Override @Override
@ -158,16 +172,27 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
} }
} catch (DocumentMissingException ex) { } catch (DocumentMissingException ex) {
LOG.error("Missing Document", ex); LOG.error("Missing Document", ex);
updateElasticSearchFailureStatus(
EventPublisherJob.Status.ACTIVEWITHERROR, "Missing Document while Updating ES.");
} catch (ElasticsearchException e) { } catch (ElasticsearchException e) {
LOG.error("failed to update ES doc"); LOG.error("failed to update ES doc");
LOG.debug(e.getMessage()); LOG.debug(e.getMessage());
if (e.status() == RestStatus.GATEWAY_TIMEOUT || e.status() == RestStatus.REQUEST_TIMEOUT) { if (e.status() == RestStatus.GATEWAY_TIMEOUT || e.status() == RestStatus.REQUEST_TIMEOUT) {
LOG.error("Error in publishing to ElasticSearch"); LOG.error("Error in publishing to ElasticSearch");
updateElasticSearchFailureStatus(
EventPublisherJob.Status.ACTIVEWITHERROR,
String.format("Timeout when updating ES request. Reason %s", e.getMessage()));
throw new ElasticSearchRetriableException(e.getMessage()); throw new ElasticSearchRetriableException(e.getMessage());
} else { } else {
updateElasticSearchFailureStatus(
EventPublisherJob.Status.ACTIVEWITHERROR,
String.format("Failed while updating ES. Reason %s", e.getMessage()));
LOG.error(e.getMessage(), e); LOG.error(e.getMessage(), e);
} }
} catch (IOException ie) { } catch (IOException ie) {
updateElasticSearchFailureStatus(
EventPublisherJob.Status.ACTIVEWITHERROR,
String.format("Issue in updating ES request. Reason %s", ie.getMessage()));
throw new EventPublisherException(ie.getMessage()); throw new EventPublisherException(ie.getMessage());
} }
} }
@ -662,6 +687,76 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
} }
} }
public void registerElasticSearchJobs() {
try {
dao.entityExtensionTimeSeriesDao()
.delete(
BuildSearchIndexResource.ELASTIC_SEARCH_ENTITY_FQN_BATCH,
BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION);
dao.entityExtensionTimeSeriesDao()
.delete(ELASTIC_SEARCH_ENTITY_FQN_STREAM, BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION);
long startTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
FailureDetails failureDetails = new FailureDetails().withLastFailedAt(0L).withLastFailedReason("No Failures");
EventPublisherJob batchJob =
new EventPublisherJob()
.withName("Elastic Search Batch")
.withPublisherType(CreateEventPublisherJob.PublisherType.ELASTIC_SEARCH)
.withRunMode(CreateEventPublisherJob.RunMode.BATCH)
.withStatus(EventPublisherJob.Status.ACTIVE)
.withTimestamp(startTime)
.withStartedBy("admin")
.withStartTime(startTime)
.withFailureDetails(failureDetails);
EventPublisherJob streamJob =
new EventPublisherJob()
.withName("Elastic Search Stream")
.withPublisherType(CreateEventPublisherJob.PublisherType.ELASTIC_SEARCH)
.withRunMode(CreateEventPublisherJob.RunMode.STREAM)
.withStatus(EventPublisherJob.Status.ACTIVE)
.withTimestamp(startTime)
.withStartedBy("admin")
.withStartTime(startTime)
.withFailureDetails(failureDetails);
dao.entityExtensionTimeSeriesDao()
.insert(
BuildSearchIndexResource.ELASTIC_SEARCH_ENTITY_FQN_BATCH,
BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION,
"eventPublisherJob",
JsonUtils.pojoToJson(batchJob));
dao.entityExtensionTimeSeriesDao()
.insert(
ELASTIC_SEARCH_ENTITY_FQN_STREAM,
BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION,
"eventPublisherJob",
JsonUtils.pojoToJson(streamJob));
} catch (Exception e) {
LOG.error("Failed to register Elastic Search Job");
}
}
public void updateElasticSearchFailureStatus(EventPublisherJob.Status status, String failureMessage) {
try {
long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
String recordString =
dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
long originalLastUpdate = lastRecord.getTimestamp();
lastRecord.setStatus(status);
lastRecord.setTimestamp(updateTime);
lastRecord.setFailureDetails(
new FailureDetails().withLastFailedAt(updateTime).withLastFailedReason(failureMessage));
dao.entityExtensionTimeSeriesDao()
.update(
ELASTIC_SEARCH_ENTITY_FQN_STREAM,
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(lastRecord),
originalLastUpdate);
} catch (Exception e) {
LOG.error("Failed to Update Elastic Search Job Info");
}
}
public void close() { public void close() {
try { try {
this.client.close(); this.client.close();

View File

@ -2997,7 +2997,7 @@ public interface CollectionDAO {
List<ExtensionRecord> getExtensions(@Bind("id") String id, @Bind("extensionPrefix") String extensionPrefix); List<ExtensionRecord> getExtensions(@Bind("id") String id, @Bind("extensionPrefix") String extensionPrefix);
@SqlUpdate("DELETE FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension") @SqlUpdate("DELETE FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension")
void delete(@Bind("entityId") String id, @Bind("extension") String extension); void delete(@Bind("entityFQN") String entityFQN, @Bind("extension") String extension);
@SqlUpdate("DELETE FROM entity_extension_time_series WHERE entityFQN = :entityFQN") @SqlUpdate("DELETE FROM entity_extension_time_series WHERE entityFQN = :entityFQN")
void deleteAll(@Bind("entityFQN") String entityFQN); void deleteAll(@Bind("entityFQN") String entityFQN);

View File

@ -13,7 +13,6 @@ import static org.openmetadata.service.Entity.USER;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException; import java.io.IOException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -21,19 +20,18 @@ import java.time.ZoneId;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import javax.validation.Valid; import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.POST; import javax.ws.rs.POST;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
@ -80,14 +78,13 @@ import org.openmetadata.service.util.ResultList;
@Slf4j @Slf4j
public class BuildSearchIndexResource { public class BuildSearchIndexResource {
public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher"; public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher";
public static final String ELASTIC_SEARCH_ENTITY_FQN = "eventPublisher:ElasticSearch"; public static final String ELASTIC_SEARCH_ENTITY_FQN_STREAM = "eventPublisher:ElasticSearch:STREAM";
public static final String ELASTIC_SEARCH_ENTITY_FQN_BATCH = "eventPublisher:ElasticSearch:BATCH";
private final RestHighLevelClient client; private final RestHighLevelClient client;
private final ElasticSearchIndexDefinition elasticSearchIndexDefinition; private final ElasticSearchIndexDefinition elasticSearchIndexDefinition;
private final CollectionDAO dao; private final CollectionDAO dao;
private final Authorizer authorizer; private final Authorizer authorizer;
private final BulkProcessorListener elasticSearchBulkProcessorListener; private final ExecutorService threadScheduler;
private final BulkProcessor bulkProcessor;
private final ExecutorService threadScheduler = Executors.newFixedThreadPool(2);
public BuildSearchIndexResource(CollectionDAO dao, Authorizer authorizer) { public BuildSearchIndexResource(CollectionDAO dao, Authorizer authorizer) {
this.client = this.client =
@ -98,17 +95,20 @@ public class BuildSearchIndexResource {
this.dao = dao; this.dao = dao;
this.authorizer = authorizer; this.authorizer = authorizer;
this.elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(client); this.elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(client);
this.elasticSearchBulkProcessorListener = new BulkProcessorListener(dao); this.threadScheduler =
new ThreadPoolExecutor(
2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy());
}
private BulkProcessor getBulkProcessor(BulkProcessorListener listener) {
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
// Setup a bulk Processor BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener, "es-reindex");
BulkProcessor.Builder builder =
BulkProcessor.builder(bulkConsumer, elasticSearchBulkProcessorListener, "es-reindex");
builder.setBulkActions(100); builder.setBulkActions(100);
builder.setConcurrentRequests(2); builder.setConcurrentRequests(2);
builder.setFlushInterval(TimeValue.timeValueSeconds(60L)); builder.setFlushInterval(TimeValue.timeValueSeconds(60L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
this.bulkProcessor = builder.build(); return builder.build();
} }
@POST @POST
@ -128,11 +128,15 @@ public class BuildSearchIndexResource {
// Only admins can issue a reindex request // Only admins can issue a reindex request
authorizer.authorizeAdmin(securityContext, false); authorizer.authorizeAdmin(securityContext, false);
String startedBy = securityContext.getUserPrincipal().getName(); String startedBy = securityContext.getUserPrincipal().getName();
return startReindexing(uriInfo, startedBy, createRequest); if (createRequest.getRunMode() == RunMode.BATCH) {
return startReindexingBatchMode(uriInfo, startedBy, createRequest);
} else {
return startReindexingStreamMode(uriInfo, startedBy, createRequest);
}
} }
@GET @GET
@Path("/reindex/status") @Path("/reindex/status/{runMode}")
@Operation( @Operation(
operationId = "getReindexAllLastJobStatus", operationId = "getReindexAllLastJobStatus",
summary = "Get Last Run Reindex All Job Status", summary = "Get Last Run Reindex All Job Status",
@ -143,97 +147,130 @@ public class BuildSearchIndexResource {
@ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found") @ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found")
}) })
public Response reindexAllJobLastStatus( public Response reindexAllJobLastStatus(
@Context UriInfo uriInfo, @Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("runMode") String runMode)
@Context SecurityContext securityContext,
@Parameter(description = "Limit the number users returned. (1 to 1000000, default = 10)")
@DefaultValue("10")
@Min(0)
@Max(1000000)
@QueryParam("limit")
int limitParam)
throws IOException { throws IOException {
// Only admins can issue a reindex request // Only admins can issue a reindex request
authorizer.authorizeAdmin(securityContext, false); authorizer.authorizeAdmin(securityContext, false);
// Check if there is a running job for reindex for requested entity // Check if there is a running job for reindex for requested entity
List<String> records = String record;
dao.entityExtensionTimeSeriesDao() if (runMode.equals(RunMode.BATCH.toString())) {
.getLastLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, limitParam); record =
if (records != null) { dao.entityExtensionTimeSeriesDao()
return Response.status(Response.Status.OK) .getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION);
.entity(JsonUtils.readObjects(records, EventPublisherJob.class)) } else if (runMode.equals(RunMode.STREAM.toString())) {
.build(); record =
dao.entityExtensionTimeSeriesDao()
.getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
} else {
return Response.status(Response.Status.BAD_REQUEST).entity("Invalid Run Mode").build();
}
if (record != null) {
return Response.status(Response.Status.OK).entity(JsonUtils.readValue(record, EventPublisherJob.class)).build();
} }
return Response.status(Response.Status.NOT_FOUND).entity("No Last Run.").build(); return Response.status(Response.Status.NOT_FOUND).entity("No Last Run.").build();
} }
private synchronized Response startReindexing( private synchronized Response startReindexingStreamMode(
UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) throws IOException { UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) throws IOException {
String reindexJobString = // create a new Job
dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION); threadScheduler.submit(
EventPublisherJob reindexJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); () -> {
if (reindexJob != null try {
&& ((System.currentTimeMillis() - reindexJob.getTimestamp() > 3600000) this.submitStreamJob(uriInfo, startedBy, createRequest);
|| reindexJob.getStatus() == EventPublisherJob.Status.SUCCESS)) { } catch (IOException e) {
return Response.status(Response.Status.FORBIDDEN) throw new RuntimeException(e);
.entity("Reindexing is Running Already. Cannot issue new request.") }
.build(); });
} else { return Response.status(Response.Status.OK).entity("Reindexing Started").build();
// create a new Job }
Long startTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
EventPublisherJob newJob =
new EventPublisherJob()
.withName(createRequest.getName())
.withPublisherType(createRequest.getPublisherType())
.withRunMode(createRequest.getRunMode())
.withStatus(EventPublisherJob.Status.RUNNING)
.withTimestamp(startTime)
.withStartedBy(startedBy)
.withStartTime(startTime)
.withEntities(createRequest.getEntities());
dao.entityExtensionTimeSeriesDao() private synchronized Response startReindexingBatchMode(
.insert( UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) throws IOException {
ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, "eventPublisherJob", JsonUtils.pojoToJson(newJob)); // create a new Job
threadScheduler.submit(
() -> {
try {
this.submitBatchJob(uriInfo, startedBy, createRequest);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return Response.status(Response.Status.OK).entity("Reindexing Started").build();
}
// Update Listener for only Batch private synchronized void submitStreamJob(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest)
if (createRequest.getRunMode() == RunMode.BATCH) { throws JsonProcessingException {
elasticSearchBulkProcessorListener.setRequestIssuer(startedBy); try {
elasticSearchBulkProcessorListener.setCreateRequest(createRequest); if (createRequest.getEntities().contains("all")) {
elasticSearchBulkProcessorListener.setEntityFQN(ELASTIC_SEARCH_ENTITY_FQN); updateEntityStream(uriInfo, TABLE, createRequest);
elasticSearchBulkProcessorListener.setStartTime(startTime); updateEntityStream(uriInfo, TOPIC, createRequest);
elasticSearchBulkProcessorListener.resetCounters(); updateEntityStream(uriInfo, DASHBOARD, createRequest);
updateEntityStream(uriInfo, PIPELINE, createRequest);
updateEntityStream(uriInfo, USER, createRequest);
updateEntityStream(uriInfo, TEAM, createRequest);
updateEntityStream(uriInfo, GLOSSARY_TERM, createRequest);
updateEntityStream(uriInfo, MLMODEL, createRequest);
updateEntityStream(uriInfo, TAG, createRequest);
} else {
for (String entityName : createRequest.getEntities()) {
updateEntityStream(uriInfo, entityName, createRequest);
}
} }
} catch (IOException e) {
// Start Full Reindexing throw new RuntimeException(e);
threadScheduler.submit(
() -> {
try {
if (createRequest.getEntities().contains("all")) {
updateEntity(uriInfo, TABLE, createRequest);
updateEntity(uriInfo, TOPIC, createRequest);
updateEntity(uriInfo, DASHBOARD, createRequest);
updateEntity(uriInfo, PIPELINE, createRequest);
updateEntity(uriInfo, USER, createRequest);
updateEntity(uriInfo, TEAM, createRequest);
updateEntity(uriInfo, GLOSSARY_TERM, createRequest);
updateEntity(uriInfo, MLMODEL, createRequest);
updateEntity(uriInfo, TAG, createRequest);
} else {
for (String entityName : createRequest.getEntities()) {
updateEntity(uriInfo, entityName, createRequest);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return Response.status(Response.Status.OK).entity("Reindexing Started").build();
} }
} }
private synchronized void updateEntity(UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest) private synchronized void submitBatchJob(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest)
throws IOException { throws IOException {
elasticSearchBulkProcessorListener.allowTotalRequestUpdate(); long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
String recordString =
dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION);
EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
long originalLastUpdate = lastRecord.getTimestamp();
lastRecord.setStatus(EventPublisherJob.Status.ACTIVE);
lastRecord.setTimestamp(updateTime);
dao.entityExtensionTimeSeriesDao()
.update(
ELASTIC_SEARCH_ENTITY_FQN_BATCH,
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(lastRecord),
originalLastUpdate);
// Update Listener for only Batch
BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(dao);
;
BulkProcessor processor = getBulkProcessor(bulkProcessorListener);
try {
if (createRequest.getEntities().contains("all")) {
updateEntityBatch(processor, bulkProcessorListener, uriInfo, TABLE, createRequest);
updateEntityBatch(processor, bulkProcessorListener, uriInfo, TOPIC, createRequest);
updateEntityBatch(processor, bulkProcessorListener, uriInfo, DASHBOARD, createRequest);
updateEntityBatch(processor, bulkProcessorListener, uriInfo, PIPELINE, createRequest);
updateEntityBatch(processor, bulkProcessorListener, uriInfo, MLMODEL, createRequest);
updateEntityBatch(processor, bulkProcessorListener, uriInfo, USER, createRequest);
updateEntityBatch(processor, bulkProcessorListener, uriInfo, TEAM, createRequest);
updateEntityBatch(processor, bulkProcessorListener, uriInfo, GLOSSARY_TERM, createRequest);
updateEntityBatch(processor, bulkProcessorListener, uriInfo, TAG, createRequest);
} else {
for (String entityName : createRequest.getEntities()) {
updateEntityBatch(processor, bulkProcessorListener, uriInfo, entityName, createRequest);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private synchronized void updateEntityBatch(
BulkProcessor processor,
BulkProcessorListener listener,
UriInfo uriInfo,
String entityType,
CreateEventPublisherJob createRequest)
throws IOException {
listener.allowTotalRequestUpdate();
ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = ElasticSearchIndexDefinition.ElasticSearchIndexType indexType =
elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType); elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType);
@ -259,35 +296,67 @@ public class BuildSearchIndexResource {
new ListFilter(Include.ALL), new ListFilter(Include.ALL),
createRequest.getBatchSize(), createRequest.getBatchSize(),
after); after);
if (createRequest.getRunMode() == RunMode.BATCH) { listener.addRequests(result.getPaging().getTotal());
elasticSearchBulkProcessorListener.addRequests(result.getPaging().getTotal()); updateElasticSearchForEntityBatch(processor, entityType, result.getData());
updateElasticSearchForEntityBatch(entityType, result.getData()); after = result.getPaging().getAfter();
} else { } while (after != null);
updateElasticSearchForEntityStream(entityType, result.getData()); }
}
private synchronized void updateEntityStream(
UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest) throws IOException {
ElasticSearchIndexDefinition.ElasticSearchIndexType indexType =
elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType);
if (createRequest.getRecreateIndex()) {
// Delete index
elasticSearchIndexDefinition.deleteIndex(indexType);
// Create index
elasticSearchIndexDefinition.createIndex(indexType);
}
// Start fetching a list of Entities and pushing them to ES
EntityRepository<EntityInterface> entityRepository = Entity.getEntityRepository(entityType);
List<String> allowedFields = entityRepository.getAllowedFields();
String fields = String.join(",", allowedFields);
ResultList<EntityInterface> result;
String after = null;
do {
result =
entityRepository.listAfter(
uriInfo,
new EntityUtil.Fields(allowedFields, fields),
new ListFilter(Include.ALL),
createRequest.getBatchSize(),
after);
updateElasticSearchForEntityStream(entityType, result.getData());
after = result.getPaging().getAfter(); after = result.getPaging().getAfter();
} while (after != null); } while (after != null);
if (createRequest.getRunMode() == RunMode.STREAM) { // Mark the Job end
String reindexJobString = String reindexJobString =
dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION); dao.entityExtensionTimeSeriesDao()
EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); .getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
long lastUpdateTime = latestJob.getTimestamp(); EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class);
Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); long lastUpdateTime = latestJob.getTimestamp();
latestJob.setTimestamp(time); Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
latestJob.setEndTime(time); latestJob.setTimestamp(time);
if (latestJob.getFailureDetails() != null) { latestJob.setEndTime(time);
latestJob.setStatus(EventPublisherJob.Status.FAILED); if (latestJob.getFailureDetails() != null) {
} else { latestJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR);
latestJob.setStatus(EventPublisherJob.Status.SUCCESS); } else {
} latestJob.setStatus(EventPublisherJob.Status.ACTIVE);
dao.entityExtensionTimeSeriesDao()
.update(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(latestJob), lastUpdateTime);
} }
dao.entityExtensionTimeSeriesDao()
.update(
ELASTIC_SEARCH_ENTITY_FQN_STREAM,
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(latestJob),
lastUpdateTime);
} }
private synchronized void updateElasticSearchForEntityBatch(String entityType, List<EntityInterface> entities) private synchronized void updateElasticSearchForEntityBatch(
throws IOException { BulkProcessor bulkProcessor, String entityType, List<EntityInterface> entities) throws IOException {
for (EntityInterface entity : entities) { for (EntityInterface entity : entities) {
if (entityType.equals(TABLE)) { if (entityType.equals(TABLE)) {
((Table) entity).getColumns().forEach(table -> table.setProfile(null)); ((Table) entity).getColumns().forEach(table -> table.setProfile(null));
@ -299,7 +368,8 @@ public class BuildSearchIndexResource {
private synchronized void updateElasticSearchForEntityStream(String entityType, List<EntityInterface> entities) private synchronized void updateElasticSearchForEntityStream(String entityType, List<EntityInterface> entities)
throws IOException { throws IOException {
String reindexJobString = String reindexJobString =
dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION); dao.entityExtensionTimeSeriesDao()
.getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class);
Long lastUpdateTime = latestJob.getTimestamp(); Long lastUpdateTime = latestJob.getTimestamp();
for (EntityInterface entity : entities) { for (EntityInterface entity : entities) {
@ -313,10 +383,15 @@ public class BuildSearchIndexResource {
} catch (IOException ex) { } catch (IOException ex) {
failureDetails = new FailureDetails().withLastFailedAt(time).withLastFailedReason(ex.getMessage()); failureDetails = new FailureDetails().withLastFailedAt(time).withLastFailedReason(ex.getMessage());
latestJob.setFailureDetails(failureDetails); latestJob.setFailureDetails(failureDetails);
latestJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR);
} }
latestJob.setTimestamp(time); latestJob.setTimestamp(time);
dao.entityExtensionTimeSeriesDao() dao.entityExtensionTimeSeriesDao()
.update(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(latestJob), lastUpdateTime); .update(
ELASTIC_SEARCH_ENTITY_FQN_STREAM,
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(latestJob),
lastUpdateTime);
lastUpdateTime = time; lastUpdateTime = time;
} }
} }

View File

@ -1,8 +1,8 @@
package org.openmetadata.service.resources.elasticSearch; package org.openmetadata.service.resources.elasticSearch;
import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexResource.ELASTIC_SEARCH_ENTITY_FQN_BATCH;
import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION; import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Date; import java.util.Date;
@ -11,7 +11,6 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.settings.EventPublisherJob; import org.openmetadata.schema.settings.EventPublisherJob;
import org.openmetadata.schema.settings.FailureDetails; import org.openmetadata.schema.settings.FailureDetails;
import org.openmetadata.schema.settings.Stats; import org.openmetadata.schema.settings.Stats;
@ -24,16 +23,11 @@ public class BulkProcessorListener implements BulkProcessor.Listener {
private volatile int totalSuccessCount = 0; private volatile int totalSuccessCount = 0;
private volatile int totalFailedCount = 0; private volatile int totalFailedCount = 0;
private volatile int totalRequests = 0; private volatile int totalRequests = 0;
private CreateEventPublisherJob createRequest;
private String requestIssuer = "anonymous";
private String entityFQN;
private final CollectionDAO dao; private final CollectionDAO dao;
private Long startTime;
private Long originalStartTime;
public BulkProcessorListener(CollectionDAO dao) { public BulkProcessorListener(CollectionDAO dao) {
this.dao = dao; this.dao = dao;
this.resetCounters();
} }
@Override @Override
@ -44,65 +38,45 @@ public class BulkProcessorListener implements BulkProcessor.Listener {
@Override @Override
public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) { public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
int failedCount = 0; // Get last Update Details
int successCount;
FailureDetails failureDetails = null;
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
failureDetails = new FailureDetails();
failureDetails.setLastFailedAt(
Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime());
failureDetails.setLastFailedReason(
String.format("ID [%s]. Reason : %s", failure.getId(), failure.getMessage()));
failedCount++;
}
}
successCount = bulkResponse.getItems().length - failedCount;
updateFailedAndSuccess(failedCount, successCount);
// update stats in DB
Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
EventPublisherJob updateJob =
new EventPublisherJob()
.withName(createRequest.getName())
.withPublisherType(createRequest.getPublisherType())
.withRunMode(createRequest.getRunMode())
.withStatus(EventPublisherJob.Status.RUNNING)
.withTimestamp(time)
.withStats(new Stats().withFailed(totalFailedCount).withSuccess(totalSuccessCount).withTotal(totalRequests))
.withStartedBy(requestIssuer)
.withFailureDetails(failureDetails)
.withStartTime(originalStartTime)
.withEntities(createRequest.getEntities());
if (totalRequests == totalFailedCount + totalSuccessCount) {
updateJob.setStatus(EventPublisherJob.Status.SUCCESS);
updateJob.setEndTime(time);
}
try { try {
dao.entityExtensionTimeSeriesDao() boolean batchHasFailures = false;
.update(entityFQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(updateJob), startTime); int failedCount = 0;
} catch (JsonProcessingException e) { // Checking for failure in Items
LOG.error("Failed in Converting to Json."); FailureDetails failureDetails = new FailureDetails();
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
;
failureDetails.setLastFailedReason(
String.format("ID [%s]. Reason : %s", failure.getId(), failure.getMessage()));
failedCount++;
batchHasFailures = true;
}
}
updateFailedAndSuccess(failedCount, bulkResponse.getItems().length - failedCount);
// update stats in DB
EventPublisherJob.Status status =
batchHasFailures ? EventPublisherJob.Status.ACTIVEWITHERROR : EventPublisherJob.Status.ACTIVE;
Stats stats = new Stats().withFailed(totalFailedCount).withSuccess(totalSuccessCount).withTotal(totalRequests);
FailureDetails hasFailureDetails = batchHasFailures ? failureDetails : null;
updateElasticSearchStatus(status, hasFailureDetails, stats);
} catch (RuntimeException e) {
LOG.error("Error in processing Bulk");
} }
startTime = time;
} }
@Override @Override
public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) { public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
LOG.error("Failed to execute bulk", throwable); LOG.error("Failed to execute bulk", throwable);
updateFailedAndSuccess(bulkRequest.numberOfActions(), 0); updateFailedAndSuccess(bulkRequest.numberOfActions(), 0);
} EventPublisherJob.Status status = EventPublisherJob.Status.ACTIVEWITHERROR;
Stats stats = new Stats().withFailed(totalFailedCount).withSuccess(totalSuccessCount).withTotal(totalRequests);
public String getEntityFQN() { FailureDetails hasFailureDetails =
return entityFQN; new FailureDetails()
} .withLastFailedReason(String.format("Batch Failed Completely. Reason : %s ", throwable.getMessage()));
updateElasticSearchStatus(status, hasFailureDetails, stats);
public void setRequestIssuer(String adminName) {
this.requestIssuer = adminName;
}
public void setEntityFQN(String entityFQN) {
this.entityFQN = entityFQN;
} }
public synchronized void addRequests(int count) { public synchronized void addRequests(int count) {
@ -128,16 +102,28 @@ public class BulkProcessorListener implements BulkProcessor.Listener {
totalSuccessCount += successCount; totalSuccessCount += successCount;
} }
public void setStartTime(Long time) { public void updateElasticSearchStatus(EventPublisherJob.Status status, FailureDetails failDetails, Stats newStats) {
this.startTime = time; try {
this.originalStartTime = time; long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
} String recordString =
dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION);
public CreateEventPublisherJob getCreateRequest() { EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
return createRequest; long originalLastUpdate = lastRecord.getTimestamp();
} lastRecord.setStatus(status);
lastRecord.setTimestamp(updateTime);
public void setCreateRequest(CreateEventPublisherJob createRequest) { if (failDetails != null) {
this.createRequest = createRequest; lastRecord.setFailureDetails(
new FailureDetails().withLastFailedAt(updateTime).withLastFailedReason(failDetails.getLastFailedReason()));
}
lastRecord.setStats(newStats);
dao.entityExtensionTimeSeriesDao()
.update(
ELASTIC_SEARCH_ENTITY_FQN_BATCH,
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(lastRecord),
originalLastUpdate);
} catch (Exception e) {
LOG.error("Failed to Update Elastic Search Job Info");
}
} }
} }

View File

@ -6,10 +6,6 @@
"type": "object", "type": "object",
"javaType": "org.openmetadata.schema.api.CreateEventPublisherJob", "javaType": "org.openmetadata.schema.api.CreateEventPublisherJob",
"properties": { "properties": {
"name": {
"description": "Name of the Job",
"type": "string"
},
"publisherType": { "publisherType": {
"$ref": "../settings/eventPublisherJob.json#/definitions/publisherType" "$ref": "../settings/eventPublisherJob.json#/definitions/publisherType"
}, },
@ -36,6 +32,6 @@
"default": 100 "default": 100
} }
}, },
"required": ["name", "publisherType","runMode"], "required": ["publisherType", "runMode"],
"additionalProperties": false "additionalProperties": false
} }

View File

@ -72,10 +72,9 @@
"description": "This schema publisher run job status.", "description": "This schema publisher run job status.",
"type": "string", "type": "string",
"enum": [ "enum": [
"Failed", "ACTIVE",
"Success", "RETRY",
"Aborted", "ACTIVEWITHERROR"
"Running"
] ]
}, },
"failureDetails": { "failureDetails": {