mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-28 07:58:31 +00:00
[ES Reindex] Added ES Reindexing from API (#7717)
* [ES Reindex] Added ES Reindexing from API * [ES Reindex] checkstyle * [ES Reindex] return complete record for status info * fixes * added limit param to get no of records * Review Comments + Stream mode * Review Comments * Fix for failing tests
This commit is contained in:
parent
7f81c28e58
commit
8d33da7f3d
@ -87,7 +87,7 @@ public class ElasticSearchIndexDefinition {
|
||||
return exists;
|
||||
}
|
||||
|
||||
private boolean createIndex(ElasticSearchIndexType elasticSearchIndexType) {
|
||||
public boolean createIndex(ElasticSearchIndexType elasticSearchIndexType) {
|
||||
try {
|
||||
GetIndexRequest gRequest = new GetIndexRequest(elasticSearchIndexType.indexName);
|
||||
gRequest.local(false);
|
||||
@ -132,7 +132,7 @@ public class ElasticSearchIndexDefinition {
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteIndex(ElasticSearchIndexType elasticSearchIndexType) {
|
||||
public void deleteIndex(ElasticSearchIndexType elasticSearchIndexType) {
|
||||
try {
|
||||
GetIndexRequest gRequest = new GetIndexRequest(elasticSearchIndexType.indexName);
|
||||
gRequest.local(false);
|
||||
|
||||
@ -0,0 +1,43 @@
|
||||
package org.openmetadata.service.elasticsearch;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.entity.data.Dashboard;
|
||||
import org.openmetadata.schema.entity.data.GlossaryTerm;
|
||||
import org.openmetadata.schema.entity.data.MlModel;
|
||||
import org.openmetadata.schema.entity.data.Pipeline;
|
||||
import org.openmetadata.schema.entity.data.Table;
|
||||
import org.openmetadata.schema.entity.data.Topic;
|
||||
import org.openmetadata.schema.entity.tags.Tag;
|
||||
import org.openmetadata.schema.entity.teams.Team;
|
||||
import org.openmetadata.schema.entity.teams.User;
|
||||
import org.openmetadata.service.Entity;
|
||||
|
||||
@Slf4j
|
||||
public class ElasticSearchIndexFactory {
|
||||
public static ElasticSearchIndex buildIndex(String entityType, EntityInterface entity) {
|
||||
switch (entityType) {
|
||||
case Entity.TABLE:
|
||||
return new TableIndex((Table) entity);
|
||||
case Entity.DASHBOARD:
|
||||
return new DashboardIndex((Dashboard) entity);
|
||||
case Entity.TOPIC:
|
||||
return new TopicIndex((Topic) entity);
|
||||
case Entity.PIPELINE:
|
||||
return new PipelineIndex((Pipeline) entity);
|
||||
case Entity.USER:
|
||||
return new UserIndex((User) entity);
|
||||
case Entity.TEAM:
|
||||
return new TeamIndex((Team) entity);
|
||||
case Entity.GLOSSARY_TERM:
|
||||
return new GlossaryTermIndex((GlossaryTerm) entity);
|
||||
case Entity.MLMODEL:
|
||||
return new MlModelIndex((MlModel) entity);
|
||||
case Entity.TAG:
|
||||
return new TagIndex((Tag) entity);
|
||||
default:
|
||||
LOG.warn("Ignoring Entity Type {}", entityType);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -7,7 +7,7 @@ import org.openmetadata.schema.entity.data.MlModel;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
public class MlModelIndex {
|
||||
public class MlModelIndex implements ElasticSearchIndex {
|
||||
MlModel mlModel;
|
||||
|
||||
public MlModelIndex(MlModel mlModel) {
|
||||
|
||||
@ -7,7 +7,7 @@ import org.openmetadata.schema.entity.teams.User;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
public class UserIndex {
|
||||
public class UserIndex implements ElasticSearchIndex {
|
||||
User user;
|
||||
final List<String> excludeFields = List.of("owns", "changeDescription");
|
||||
|
||||
|
||||
@ -2983,6 +2983,12 @@ public interface CollectionDAO {
|
||||
+ "ORDER BY timestamp DESC LIMIT 1")
|
||||
String getLatestExtension(@Bind("entityFQN") String entityFQN, @Bind("extension") String extension);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension "
|
||||
+ "ORDER BY timestamp DESC LIMIT :limit")
|
||||
List<String> getLastLatestExtension(
|
||||
@Bind("entityFQN") String entityFQN, @Bind("extension") String extension, @Bind("limit") int limit);
|
||||
|
||||
@RegisterRowMapper(ExtensionMapper.class)
|
||||
@SqlQuery(
|
||||
"SELECT extension, json FROM entity_extension WHERE id = :id AND extension "
|
||||
|
||||
@ -0,0 +1,335 @@
|
||||
package org.openmetadata.service.resources.elasticSearch;
|
||||
|
||||
import static org.openmetadata.service.Entity.DASHBOARD;
|
||||
import static org.openmetadata.service.Entity.GLOSSARY_TERM;
|
||||
import static org.openmetadata.service.Entity.MLMODEL;
|
||||
import static org.openmetadata.service.Entity.PIPELINE;
|
||||
import static org.openmetadata.service.Entity.TABLE;
|
||||
import static org.openmetadata.service.Entity.TAG;
|
||||
import static org.openmetadata.service.Entity.TEAM;
|
||||
import static org.openmetadata.service.Entity.TOPIC;
|
||||
import static org.openmetadata.service.Entity.USER;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.function.BiConsumer;
|
||||
import javax.validation.Valid;
|
||||
import javax.validation.constraints.Max;
|
||||
import javax.validation.constraints.Min;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.SecurityContext;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.CreateEventPublisherJob;
|
||||
import org.openmetadata.schema.api.CreateEventPublisherJob.RunMode;
|
||||
import org.openmetadata.schema.api.configuration.elasticsearch.ElasticSearchConfiguration;
|
||||
import org.openmetadata.schema.entity.data.Table;
|
||||
import org.openmetadata.schema.settings.EventPublisherJob;
|
||||
import org.openmetadata.schema.settings.FailureDetails;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
|
||||
import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository;
|
||||
import org.openmetadata.service.jdbi3.ListFilter;
|
||||
import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.security.Authorizer;
|
||||
import org.openmetadata.service.util.ConfigurationHolder;
|
||||
import org.openmetadata.service.util.ElasticSearchClientUtils;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
|
||||
@Path("/v1/indexResource")
|
||||
@Api(value = "Elastic Search Collection", tags = "Elastic Search Collection")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
@Collection(name = "indexResource")
|
||||
@Slf4j
|
||||
public class BuildSearchIndexResource {
|
||||
public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher";
|
||||
public static final String ELASTIC_SEARCH_ENTITY_FQN = "eventPublisher:ElasticSearch";
|
||||
private final RestHighLevelClient client;
|
||||
private final ElasticSearchIndexDefinition elasticSearchIndexDefinition;
|
||||
private final CollectionDAO dao;
|
||||
private final Authorizer authorizer;
|
||||
private final BulkProcessorListener elasticSearchBulkProcessorListener;
|
||||
private final BulkProcessor bulkProcessor;
|
||||
private final ExecutorService threadScheduler = Executors.newFixedThreadPool(2);
|
||||
|
||||
public BuildSearchIndexResource(CollectionDAO dao, Authorizer authorizer) {
|
||||
this.client =
|
||||
ElasticSearchClientUtils.createElasticSearchClient(
|
||||
ConfigurationHolder.getInstance()
|
||||
.getConfig(
|
||||
ConfigurationHolder.ConfigurationType.ELASTICSEARCHCONFIG, ElasticSearchConfiguration.class));
|
||||
this.dao = dao;
|
||||
this.authorizer = authorizer;
|
||||
this.elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(client);
|
||||
this.elasticSearchBulkProcessorListener = new BulkProcessorListener(dao);
|
||||
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
|
||||
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
|
||||
// Setup a bulk Processor
|
||||
BulkProcessor.Builder builder =
|
||||
BulkProcessor.builder(bulkConsumer, elasticSearchBulkProcessorListener, "es-reindex");
|
||||
builder.setBulkActions(100);
|
||||
builder.setConcurrentRequests(2);
|
||||
builder.setFlushInterval(TimeValue.timeValueSeconds(60L));
|
||||
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
|
||||
this.bulkProcessor = builder.build();
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/reindex")
|
||||
@Operation(
|
||||
operationId = "reindexEntities",
|
||||
summary = "Reindex Entities",
|
||||
tags = "indexResource",
|
||||
description = "Reindex Elastic Search Entities",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "Success"),
|
||||
@ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found")
|
||||
})
|
||||
public Response reindexAllEntities(
|
||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateEventPublisherJob createRequest)
|
||||
throws IOException {
|
||||
// Only admins can issue a reindex request
|
||||
authorizer.authorizeAdmin(securityContext, false);
|
||||
String startedBy = securityContext.getUserPrincipal().getName();
|
||||
return startReindexing(uriInfo, startedBy, createRequest);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/reindex/status")
|
||||
@Operation(
|
||||
operationId = "getReindexAllLastJobStatus",
|
||||
summary = "Get Last Run Reindex All Job Status",
|
||||
tags = "indexResource",
|
||||
description = "Reindex All job last status",
|
||||
responses = {
|
||||
@ApiResponse(responseCode = "200", description = "Success"),
|
||||
@ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found")
|
||||
})
|
||||
public Response reindexAllJobLastStatus(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "Limit the number users returned. (1 to 1000000, default = 10)")
|
||||
@DefaultValue("10")
|
||||
@Min(0)
|
||||
@Max(1000000)
|
||||
@QueryParam("limit")
|
||||
int limitParam)
|
||||
throws IOException {
|
||||
// Only admins can issue a reindex request
|
||||
authorizer.authorizeAdmin(securityContext, false);
|
||||
// Check if there is a running job for reindex for requested entity
|
||||
List<String> records =
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.getLastLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, limitParam);
|
||||
if (records != null) {
|
||||
return Response.status(Response.Status.OK)
|
||||
.entity(JsonUtils.readObjects(records, EventPublisherJob.class))
|
||||
.build();
|
||||
}
|
||||
return Response.status(Response.Status.NOT_FOUND).entity("No Last Run.").build();
|
||||
}
|
||||
|
||||
private synchronized Response startReindexing(
|
||||
UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) throws IOException {
|
||||
String reindexJobString =
|
||||
dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION);
|
||||
EventPublisherJob reindexJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class);
|
||||
if (reindexJob != null
|
||||
&& ((System.currentTimeMillis() - reindexJob.getTimestamp() > 3600000)
|
||||
|| reindexJob.getStatus() == EventPublisherJob.Status.SUCCESS)) {
|
||||
return Response.status(Response.Status.FORBIDDEN)
|
||||
.entity("Reindexing is Running Already. Cannot issue new request.")
|
||||
.build();
|
||||
} else {
|
||||
// create a new Job
|
||||
Long startTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
|
||||
EventPublisherJob newJob =
|
||||
new EventPublisherJob()
|
||||
.withName(createRequest.getName())
|
||||
.withPublisherType(createRequest.getPublisherType())
|
||||
.withRunMode(createRequest.getRunMode())
|
||||
.withStatus(EventPublisherJob.Status.RUNNING)
|
||||
.withTimestamp(startTime)
|
||||
.withStartedBy(startedBy)
|
||||
.withStartTime(startTime)
|
||||
.withEntities(createRequest.getEntities());
|
||||
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.insert(
|
||||
ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, "eventPublisherJob", JsonUtils.pojoToJson(newJob));
|
||||
|
||||
// Update Listener for only Batch
|
||||
if (createRequest.getRunMode() == RunMode.BATCH) {
|
||||
elasticSearchBulkProcessorListener.setRequestIssuer(startedBy);
|
||||
elasticSearchBulkProcessorListener.setCreateRequest(createRequest);
|
||||
elasticSearchBulkProcessorListener.setEntityFQN(ELASTIC_SEARCH_ENTITY_FQN);
|
||||
elasticSearchBulkProcessorListener.setStartTime(startTime);
|
||||
elasticSearchBulkProcessorListener.resetCounters();
|
||||
}
|
||||
|
||||
// Start Full Reindexing
|
||||
threadScheduler.submit(
|
||||
() -> {
|
||||
try {
|
||||
if (createRequest.getEntities().contains("all")) {
|
||||
updateEntity(uriInfo, TABLE, createRequest);
|
||||
updateEntity(uriInfo, TOPIC, createRequest);
|
||||
updateEntity(uriInfo, DASHBOARD, createRequest);
|
||||
updateEntity(uriInfo, PIPELINE, createRequest);
|
||||
updateEntity(uriInfo, USER, createRequest);
|
||||
updateEntity(uriInfo, TEAM, createRequest);
|
||||
updateEntity(uriInfo, GLOSSARY_TERM, createRequest);
|
||||
updateEntity(uriInfo, MLMODEL, createRequest);
|
||||
updateEntity(uriInfo, TAG, createRequest);
|
||||
} else {
|
||||
for (String entityName : createRequest.getEntities()) {
|
||||
updateEntity(uriInfo, entityName, createRequest);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
return Response.status(Response.Status.OK).entity("Reindexing Started").build();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void updateEntity(UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest)
|
||||
throws IOException {
|
||||
elasticSearchBulkProcessorListener.allowTotalRequestUpdate();
|
||||
|
||||
ElasticSearchIndexDefinition.ElasticSearchIndexType indexType =
|
||||
elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType);
|
||||
|
||||
if (createRequest.getRecreateIndex()) {
|
||||
// Delete index
|
||||
elasticSearchIndexDefinition.deleteIndex(indexType);
|
||||
// Create index
|
||||
elasticSearchIndexDefinition.createIndex(indexType);
|
||||
}
|
||||
|
||||
// Start fetching a list of Entities and pushing them to ES
|
||||
EntityRepository<EntityInterface> entityRepository = Entity.getEntityRepository(entityType);
|
||||
List<String> allowedFields = entityRepository.getAllowedFields();
|
||||
String fields = String.join(",", allowedFields);
|
||||
ResultList<EntityInterface> result;
|
||||
String after = null;
|
||||
do {
|
||||
result =
|
||||
entityRepository.listAfter(
|
||||
uriInfo,
|
||||
new EntityUtil.Fields(allowedFields, fields),
|
||||
new ListFilter(Include.ALL),
|
||||
createRequest.getBatchSize(),
|
||||
after);
|
||||
if (createRequest.getRunMode() == RunMode.BATCH) {
|
||||
elasticSearchBulkProcessorListener.addRequests(result.getPaging().getTotal());
|
||||
updateElasticSearchForEntityBatch(entityType, result.getData());
|
||||
} else {
|
||||
updateElasticSearchForEntityStream(entityType, result.getData());
|
||||
}
|
||||
after = result.getPaging().getAfter();
|
||||
} while (after != null);
|
||||
|
||||
if (createRequest.getRunMode() == RunMode.STREAM) {
|
||||
String reindexJobString =
|
||||
dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION);
|
||||
EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class);
|
||||
long lastUpdateTime = latestJob.getTimestamp();
|
||||
Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
|
||||
latestJob.setTimestamp(time);
|
||||
latestJob.setEndTime(time);
|
||||
if (latestJob.getFailureDetails() != null) {
|
||||
latestJob.setStatus(EventPublisherJob.Status.FAILED);
|
||||
} else {
|
||||
latestJob.setStatus(EventPublisherJob.Status.SUCCESS);
|
||||
}
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.update(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(latestJob), lastUpdateTime);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void updateElasticSearchForEntityBatch(String entityType, List<EntityInterface> entities)
|
||||
throws IOException {
|
||||
for (EntityInterface entity : entities) {
|
||||
if (entityType.equals(TABLE)) {
|
||||
((Table) entity).getColumns().forEach(table -> table.setProfile(null));
|
||||
}
|
||||
bulkProcessor.add(getUpdateRequest(entityType, entity));
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void updateElasticSearchForEntityStream(String entityType, List<EntityInterface> entities)
|
||||
throws IOException {
|
||||
String reindexJobString =
|
||||
dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION);
|
||||
EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class);
|
||||
Long lastUpdateTime = latestJob.getTimestamp();
|
||||
for (EntityInterface entity : entities) {
|
||||
if (entityType.equals(TABLE)) {
|
||||
((Table) entity).getColumns().forEach(table -> table.setProfile(null));
|
||||
}
|
||||
FailureDetails failureDetails = null;
|
||||
Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
|
||||
try {
|
||||
client.update(getUpdateRequest(entityType, entity), RequestOptions.DEFAULT);
|
||||
} catch (IOException ex) {
|
||||
failureDetails = new FailureDetails().withLastFailedAt(time).withLastFailedReason(ex.getMessage());
|
||||
latestJob.setFailureDetails(failureDetails);
|
||||
}
|
||||
latestJob.setTimestamp(time);
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.update(ELASTIC_SEARCH_ENTITY_FQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(latestJob), lastUpdateTime);
|
||||
lastUpdateTime = time;
|
||||
}
|
||||
}
|
||||
|
||||
private UpdateRequest getUpdateRequest(String entityType, EntityInterface entity) throws JsonProcessingException {
|
||||
UpdateRequest updateRequest =
|
||||
new UpdateRequest(
|
||||
elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType).indexName, entity.getId().toString());
|
||||
updateRequest.doc(
|
||||
JsonUtils.pojoToJson(
|
||||
Objects.requireNonNull(ElasticSearchIndexFactory.buildIndex(entityType, entity)).buildESDoc()),
|
||||
XContentType.JSON);
|
||||
updateRequest.docAsUpsert(true);
|
||||
return updateRequest;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,143 @@
|
||||
package org.openmetadata.service.resources.elasticSearch;
|
||||
|
||||
import static org.openmetadata.service.resources.elasticSearch.BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Date;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.openmetadata.schema.api.CreateEventPublisherJob;
|
||||
import org.openmetadata.schema.settings.EventPublisherJob;
|
||||
import org.openmetadata.schema.settings.FailureDetails;
|
||||
import org.openmetadata.schema.settings.Stats;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
public class BulkProcessorListener implements BulkProcessor.Listener {
|
||||
private volatile boolean updateTotalRequest = true;
|
||||
private volatile int totalSuccessCount = 0;
|
||||
private volatile int totalFailedCount = 0;
|
||||
private volatile int totalRequests = 0;
|
||||
|
||||
private CreateEventPublisherJob createRequest;
|
||||
private String requestIssuer = "anonymous";
|
||||
private String entityFQN;
|
||||
private final CollectionDAO dao;
|
||||
private Long startTime;
|
||||
private Long originalStartTime;
|
||||
|
||||
public BulkProcessorListener(CollectionDAO dao) {
|
||||
this.dao = dao;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeBulk(long executionId, BulkRequest bulkRequest) {
|
||||
int numberOfActions = bulkRequest.numberOfActions();
|
||||
LOG.info("Executing bulk [{}] with {} requests", executionId, numberOfActions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
|
||||
int failedCount = 0;
|
||||
int successCount;
|
||||
FailureDetails failureDetails = null;
|
||||
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
||||
if (bulkItemResponse.isFailed()) {
|
||||
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
|
||||
failureDetails = new FailureDetails();
|
||||
failureDetails.setLastFailedAt(
|
||||
Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime());
|
||||
failureDetails.setLastFailedReason(
|
||||
String.format("ID [%s]. Reason : %s", failure.getId(), failure.getMessage()));
|
||||
failedCount++;
|
||||
}
|
||||
}
|
||||
successCount = bulkResponse.getItems().length - failedCount;
|
||||
updateFailedAndSuccess(failedCount, successCount);
|
||||
// update stats in DB
|
||||
Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
|
||||
EventPublisherJob updateJob =
|
||||
new EventPublisherJob()
|
||||
.withName(createRequest.getName())
|
||||
.withPublisherType(createRequest.getPublisherType())
|
||||
.withRunMode(createRequest.getRunMode())
|
||||
.withStatus(EventPublisherJob.Status.RUNNING)
|
||||
.withTimestamp(time)
|
||||
.withStats(new Stats().withFailed(totalFailedCount).withSuccess(totalSuccessCount).withTotal(totalRequests))
|
||||
.withStartedBy(requestIssuer)
|
||||
.withFailureDetails(failureDetails)
|
||||
.withStartTime(originalStartTime)
|
||||
.withEntities(createRequest.getEntities());
|
||||
if (totalRequests == totalFailedCount + totalSuccessCount) {
|
||||
updateJob.setStatus(EventPublisherJob.Status.SUCCESS);
|
||||
updateJob.setEndTime(time);
|
||||
}
|
||||
try {
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
.update(entityFQN, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(updateJob), startTime);
|
||||
} catch (JsonProcessingException e) {
|
||||
LOG.error("Failed in Converting to Json.");
|
||||
}
|
||||
startTime = time;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
|
||||
LOG.error("Failed to execute bulk", throwable);
|
||||
updateFailedAndSuccess(bulkRequest.numberOfActions(), 0);
|
||||
}
|
||||
|
||||
public String getEntityFQN() {
|
||||
return entityFQN;
|
||||
}
|
||||
|
||||
public void setRequestIssuer(String adminName) {
|
||||
this.requestIssuer = adminName;
|
||||
}
|
||||
|
||||
public void setEntityFQN(String entityFQN) {
|
||||
this.entityFQN = entityFQN;
|
||||
}
|
||||
|
||||
public synchronized void addRequests(int count) {
|
||||
if (updateTotalRequest) {
|
||||
totalRequests += count;
|
||||
}
|
||||
updateTotalRequest = false;
|
||||
}
|
||||
|
||||
public synchronized void allowTotalRequestUpdate() {
|
||||
updateTotalRequest = true;
|
||||
}
|
||||
|
||||
public synchronized void resetCounters() {
|
||||
totalRequests = 0;
|
||||
totalFailedCount = 0;
|
||||
totalSuccessCount = 0;
|
||||
updateTotalRequest = true;
|
||||
}
|
||||
|
||||
public synchronized void updateFailedAndSuccess(int failedCount, int successCount) {
|
||||
totalFailedCount += failedCount;
|
||||
totalSuccessCount += successCount;
|
||||
}
|
||||
|
||||
public void setStartTime(Long time) {
|
||||
this.startTime = time;
|
||||
this.originalStartTime = time;
|
||||
}
|
||||
|
||||
public CreateEventPublisherJob getCreateRequest() {
|
||||
return createRequest;
|
||||
}
|
||||
|
||||
public void setCreateRequest(CreateEventPublisherJob createRequest) {
|
||||
this.createRequest = createRequest;
|
||||
}
|
||||
}
|
||||
@ -11,7 +11,8 @@ public class ConfigurationHolder {
|
||||
public enum ConfigurationType {
|
||||
AUTHORIZERCONFIG("authorizerConfiguration"),
|
||||
AUTHENTICATIONCONFIG("authenticationConfiguration"),
|
||||
SMTPCONFIG("email");
|
||||
SMTPCONFIG("email"),
|
||||
ELASTICSEARCHCONFIG("elasticsearch");
|
||||
private String value;
|
||||
|
||||
ConfigurationType(String value) {
|
||||
@ -41,27 +42,34 @@ public class ConfigurationHolder {
|
||||
private final ConcurrentHashMap<ConfigurationType, Object> CONFIG_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
public void init(OpenMetadataApplicationConfig config) {
|
||||
for (Field field : OpenMetadataApplicationConfig.class.getDeclaredFields()) {
|
||||
if (field.isAnnotationPresent(JsonProperty.class)) {
|
||||
String configType = field.getAnnotation(JsonProperty.class).value();
|
||||
if (configType != null && !configType.equals("")) {
|
||||
ConfigurationType configTypeForEnum = ConfigurationType.fromValue(configType);
|
||||
if (configTypeForEnum == null) continue;
|
||||
switch (configTypeForEnum) {
|
||||
case AUTHORIZERCONFIG:
|
||||
CONFIG_MAP.put(ConfigurationType.AUTHORIZERCONFIG, config.getAuthorizerConfiguration());
|
||||
break;
|
||||
case AUTHENTICATIONCONFIG:
|
||||
CONFIG_MAP.put(ConfigurationType.AUTHENTICATIONCONFIG, config.getAuthenticationConfiguration());
|
||||
break;
|
||||
case SMTPCONFIG:
|
||||
CONFIG_MAP.put(ConfigurationType.SMTPCONFIG, config.getSmtpSettings());
|
||||
break;
|
||||
default:
|
||||
LOG.info("Currently AuthorizerConfig, AuthenticatioConfig, and SMTP these can be added");
|
||||
try {
|
||||
for (Field field : OpenMetadataApplicationConfig.class.getDeclaredFields()) {
|
||||
if (field.isAnnotationPresent(JsonProperty.class)) {
|
||||
String configType = field.getAnnotation(JsonProperty.class).value();
|
||||
if (configType != null && !configType.equals("")) {
|
||||
ConfigurationType configTypeForEnum = ConfigurationType.fromValue(configType);
|
||||
if (configTypeForEnum == null) continue;
|
||||
switch (configTypeForEnum) {
|
||||
case AUTHORIZERCONFIG:
|
||||
CONFIG_MAP.put(ConfigurationType.AUTHORIZERCONFIG, config.getAuthorizerConfiguration());
|
||||
break;
|
||||
case AUTHENTICATIONCONFIG:
|
||||
CONFIG_MAP.put(ConfigurationType.AUTHENTICATIONCONFIG, config.getAuthenticationConfiguration());
|
||||
break;
|
||||
case SMTPCONFIG:
|
||||
CONFIG_MAP.put(ConfigurationType.SMTPCONFIG, config.getSmtpSettings());
|
||||
break;
|
||||
case ELASTICSEARCHCONFIG:
|
||||
CONFIG_MAP.put(ConfigurationType.ELASTICSEARCHCONFIG, config.getElasticSearchConfiguration());
|
||||
break;
|
||||
default:
|
||||
LOG.info("Currently AuthorizerConfig, AuthenticatioConfig, SMTP and ES these can be added");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Failed in initialising Configuration Holder : Reason : {}", ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -179,5 +179,17 @@ email:
|
||||
password: ""
|
||||
transportationStrategy: "SMTP_TLS"
|
||||
|
||||
elasticsearch:
|
||||
host: "localhost"
|
||||
port: "9200"
|
||||
scheme: "http"
|
||||
username: ""
|
||||
password: ""
|
||||
truststorePath: ""
|
||||
truststorePassword: ""
|
||||
connectionTimeoutSecs: 5
|
||||
socketTimeoutSecs: 60
|
||||
batchSize: 10
|
||||
|
||||
slackChat:
|
||||
slackUrl: "http://localhost:8080"
|
||||
|
||||
@ -0,0 +1,41 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/entity/feed/createEventPublisherJob.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "CreateEventPublisherJob",
|
||||
"description": "This schema defines Event Publisher Run Result.",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.api.CreateEventPublisherJob",
|
||||
"properties": {
|
||||
"name": {
|
||||
"description": "Name of the Job",
|
||||
"type": "string"
|
||||
},
|
||||
"publisherType": {
|
||||
"$ref": "../settings/eventPublisherJob.json#/definitions/publisherType"
|
||||
},
|
||||
"runMode": {
|
||||
"$ref": "../settings/eventPublisherJob.json#/definitions/runMode"
|
||||
},
|
||||
"entities": {
|
||||
"description": "List of Entities to Reindex",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
},
|
||||
"default": ["all"],
|
||||
"uniqueItems": true
|
||||
},
|
||||
"recreateIndex": {
|
||||
"description": "This schema publisher run modes.",
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
},
|
||||
"batchSize": {
|
||||
"description": "Maximum number of events sent in a batch (Default 10).",
|
||||
"type": "integer",
|
||||
"default": 100
|
||||
}
|
||||
},
|
||||
"required": ["name", "publisherType","runMode"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
@ -0,0 +1,110 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/entity/feed/eventPublisherResult.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "EventPublisherResult",
|
||||
"description": "This schema defines Event Publisher Run Result.",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.settings.EventPublisherJob",
|
||||
"definitions": {
|
||||
"stats": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"success": {
|
||||
"description": "Count of Success Record",
|
||||
"type": "integer",
|
||||
"default": 0
|
||||
},
|
||||
"failed": {
|
||||
"description": "Count of Failed Records",
|
||||
"type": "integer",
|
||||
"default": 0
|
||||
},
|
||||
"total": {
|
||||
"description": "Count of Failed Records",
|
||||
"type": "integer",
|
||||
"default": 0
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"runMode": {
|
||||
"description": "This schema publisher run modes.",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"stream",
|
||||
"batch"
|
||||
]
|
||||
},
|
||||
"publisherType": {
|
||||
"description": "This schema event Publisher Types",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"elasticSearch",
|
||||
"kafka"
|
||||
]
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"name": {
|
||||
"description": "Name of the result",
|
||||
"type": "string"
|
||||
},
|
||||
"startedBy": {
|
||||
"description": "Job started by",
|
||||
"type": "string"
|
||||
},
|
||||
"publisherType": {
|
||||
"$ref": "#/definitions/publisherType"
|
||||
},
|
||||
"runMode": {
|
||||
"$ref": "#/definitions/runMode"
|
||||
},
|
||||
"timestamp": {
|
||||
"$ref": "../type/basic.json#/definitions/timestamp"
|
||||
},
|
||||
"startTime": {
|
||||
"$ref": "../type/basic.json#/definitions/timestamp"
|
||||
},
|
||||
"endTime": {
|
||||
"$ref": "../type/basic.json#/definitions/timestamp"
|
||||
},
|
||||
"status": {
|
||||
"description": "This schema publisher run job status.",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"Failed",
|
||||
"Success",
|
||||
"Aborted",
|
||||
"Running"
|
||||
]
|
||||
},
|
||||
"failureDetails": {
|
||||
"description": "Failure details are set only when `status` is not `success`.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"lastFailedAt": {
|
||||
"description": "Last non-successful callback time in UNIX UTC epoch time in milliseconds.",
|
||||
"$ref": "../type/basic.json#/definitions/timestamp"
|
||||
},
|
||||
"lastFailedReason": {
|
||||
"description": "Last non-successful activity response reason received during callback.",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"stats": {
|
||||
"$ref": "#/definitions/stats"
|
||||
},
|
||||
"entities": {
|
||||
"description": "List of Entities to Reindex",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
},
|
||||
"uniqueItems": true
|
||||
}
|
||||
},
|
||||
"required": ["name", "publisherType","runMode", "timestamp", "status"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user