Missing try catch for es-migrate option

This commit is contained in:
mohitdeuex 2023-06-30 11:58:21 +05:30
parent a96b3ab5b4
commit f51781c32d
2 changed files with 28 additions and 23 deletions

View File

@ -254,29 +254,33 @@ public interface SearchClient {
@SneakyThrows
default void updateElasticSearchFailureStatus(String failedFor, String failureMessage) {
long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
String recordString =
getDao()
.entityExtensionTimeSeriesDao()
.getExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
long originalLastUpdate = lastRecord.getTimestamp();
lastRecord.setStatus(EventPublisherJob.Status.ACTIVE_WITH_ERROR);
lastRecord.setTimestamp(updateTime);
lastRecord.setFailure(
new Failure()
.withSinkError(
new FailureDetails()
.withContext(failedFor)
.withLastFailedAt(updateTime)
.withLastFailedReason(failureMessage)));
try {
long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
String recordString =
getDao()
.entityExtensionTimeSeriesDao()
.getExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
long originalLastUpdate = lastRecord.getTimestamp();
lastRecord.setStatus(EventPublisherJob.Status.ACTIVE_WITH_ERROR);
lastRecord.setTimestamp(updateTime);
lastRecord.setFailure(
new Failure()
.withSinkError(
new FailureDetails()
.withContext(failedFor)
.withLastFailedAt(updateTime)
.withLastFailedReason(failureMessage)));
getDao()
.entityExtensionTimeSeriesDao()
.update(
ELASTIC_SEARCH_ENTITY_FQN_STREAM,
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(lastRecord),
originalLastUpdate);
getDao()
.entityExtensionTimeSeriesDao()
.update(
ELASTIC_SEARCH_ENTITY_FQN_STREAM,
ELASTIC_SEARCH_EXTENSION,
JsonUtils.pojoToJson(lastRecord),
originalLastUpdate);
} catch (Exception e) {
// Failure to update
}
}
}

View File

@ -211,6 +211,7 @@ public class ElasticSearchClientImpl implements SearchClient {
LOG.info("{} Updated {}", elasticSearchIndexType.indexName, putMappingResponse.isAcknowledged());
} else {
CreateIndexRequest request = new CreateIndexRequest(elasticSearchIndexType.indexName);
request.source(elasticSearchIndexMapping, XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
LOG.info("{} Created {}", elasticSearchIndexType.indexName, createIndexResponse.isAcknowledged());
}