MINOR - Test Case Incident Manager Reindex (#16123)

* feat: added reindex logic for test cases

* fix: broaden exception to catch ES/OS failure

* style: ran java linting

* fix: update buildESDoc method

* style: ran java linting

* fix ui tests

* fix: update HashMap to Map

---------

Co-authored-by: Chira Madlani <chirag@getcollate.io>
This commit is contained in:
Teddy 2024-05-15 10:38:58 +02:00 committed by GitHub
parent 71b8c797e2
commit 3cf928bcc7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 454 additions and 300 deletions

View File

@ -6,20 +6,18 @@ import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_NAME_LIST_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
@ -33,20 +31,21 @@ import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.elasticsearch.ElasticSearchDataInsightProcessor;
import org.openmetadata.service.search.elasticsearch.ElasticSearchEntitiesProcessor;
import org.openmetadata.service.search.elasticsearch.ElasticSearchEntityTimeSeriesProcessor;
import org.openmetadata.service.search.elasticsearch.ElasticSearchIndexSink;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.search.opensearch.OpenSearchDataInsightProcessor;
import org.openmetadata.service.search.opensearch.OpenSearchEntitiesProcessor;
import org.openmetadata.service.search.opensearch.OpenSearchEntityTimeSeriesProcessor;
import org.openmetadata.service.search.opensearch.OpenSearchIndexSink;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Processor;
import org.openmetadata.service.workflows.interfaces.Sink;
import org.openmetadata.service.workflows.searchIndex.PaginatedDataInsightSource;
import org.openmetadata.service.workflows.interfaces.Source;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntityTimeSeriesSource;
import org.quartz.JobExecutionContext;
@Slf4j
@ -88,11 +87,17 @@ public class SearchIndexApp extends AbstractNativeApplication {
"webAnalyticUserActivityReportData",
"domain",
"storedProcedure",
"storageService");
private final List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList<>();
private final List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList<>();
"storageService",
"testCaseResolutionStatus");
public static final Set<String> TIME_SERIES_ENTITIES =
Set.of(
"entityReportData",
"webAnalyticEntityViewReportData",
"webAnalyticUserActivityReportData",
"testCaseResolutionStatus");
private final List<Source> paginatedSources = new ArrayList<>();
private Processor entityProcessor;
private Processor dataInsightProcessor;
private Processor entityTimeSeriesProcessor;
private Sink searchIndexSink;
@Getter EventPublisherJob jobData;
@ -133,8 +138,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
// Run ReIndexing
entitiesReIndex(jobExecutionContext);
dataInsightReindex(jobExecutionContext);
performReindex(jobExecutionContext);
// Mark Job as Completed
updateJobStatus();
} catch (Exception ex) {
@ -167,27 +171,31 @@ public class SearchIndexApp extends AbstractNativeApplication {
.getEntities()
.forEach(
entityType -> {
if (!isDataInsightIndex(entityType)) {
if (!TIME_SERIES_ENTITIES.contains(entityType)) {
List<String> fields = List.of("*");
PaginatedEntitiesSource source =
new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields);
if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) {
source.setCursor(jobData.getAfterCursor());
}
paginatedEntitiesSources.add(source);
paginatedSources.add(source);
} else {
paginatedDataInsightSources.add(
new PaginatedDataInsightSource(
collectionDAO, entityType, jobData.getBatchSize()));
PaginatedEntityTimeSeriesSource source =
new PaginatedEntityTimeSeriesSource(
entityType, jobData.getBatchSize(), List.of("*"));
if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) {
source.setCursor(jobData.getAfterCursor());
}
paginatedSources.add(source);
}
});
if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords);
this.dataInsightProcessor = new OpenSearchDataInsightProcessor(totalRecords);
this.entityTimeSeriesProcessor = new OpenSearchEntityTimeSeriesProcessor(totalRecords);
this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords);
} else {
this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords);
this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(totalRecords);
this.entityTimeSeriesProcessor = new ElasticSearchEntityTimeSeriesProcessor(totalRecords);
this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords);
}
}
@ -213,89 +221,107 @@ public class SearchIndexApp extends AbstractNativeApplication {
pushAppStatusUpdates(jobExecutionContext, appRecord, true);
}
private void entitiesReIndex(JobExecutionContext jobExecutionContext) {
private void performReindex(JobExecutionContext jobExecutionContext) throws SearchIndexException {
Map<String, Object> contextData = new HashMap<>();
for (PaginatedEntitiesSource paginatedEntitiesSource : paginatedEntitiesSources) {
reCreateIndexes(paginatedEntitiesSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedEntitiesSource.getEntityType());
ResultList<? extends EntityInterface> resultList;
while (!stopped && !paginatedEntitiesSource.isDone()) {
for (Source paginatedSource : paginatedSources) {
List<String> entityName = new ArrayList<>();
reCreateIndexes(paginatedSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedSource.getEntityType());
Object resultList;
while (!stopped && !paginatedSource.isDone()) {
try {
resultList = paginatedEntitiesSource.readNext(null);
List<String> entityName =
resultList.getData().stream()
.map(
entity ->
String.format(
"%s %s",
paginatedEntitiesSource.getEntityType(),
entity.getFullyQualifiedName()))
.collect(Collectors.toList());
contextData.put(ENTITY_NAME_LIST_KEY, entityName);
if (!resultList.getData().isEmpty()) {
searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData);
if (!resultList.getErrors().isEmpty()) {
throw new SearchIndexException(
new IndexingError()
.withErrorSource(READER)
.withLastFailedCursor(paginatedEntitiesSource.getLastFailedCursor())
.withSubmittedCount(paginatedEntitiesSource.getBatchSize())
.withSuccessCount(resultList.getData().size())
.withFailedCount(resultList.getErrors().size())
.withMessage(
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
.withFailedEntities(resultList.getErrors()));
}
paginatedEntitiesSource.updateStats(resultList.getData().size(), 0);
resultList = paginatedSource.readNext(null);
if (!TIME_SERIES_ENTITIES.contains(paginatedSource.getEntityType())) {
entityName =
getEntityNameFromEntity(
(ResultList<? extends EntityInterface>) resultList,
paginatedSource.getEntityType());
contextData.put(ENTITY_NAME_LIST_KEY, entityName);
processEntity(
(ResultList<? extends EntityInterface>) resultList, contextData, paginatedSource);
} else {
entityName =
getEntityNameFromEntityTimeSeries(
(ResultList<? extends EntityTimeSeriesInterface>) resultList,
paginatedSource.getEntityType());
contextData.put(ENTITY_NAME_LIST_KEY, entityName);
processEntityTimeSeries(
(ResultList<? extends EntityTimeSeriesInterface>) resultList,
contextData,
paginatedSource);
}
} catch (SearchIndexException rx) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(rx.getIndexingError());
paginatedEntitiesSource.updateStats(
paginatedSource.updateStats(
rx.getIndexingError().getSuccessCount(), rx.getIndexingError().getFailedCount());
} finally {
updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
updateStats(paginatedSource.getEntityType(), paginatedSource.getStats());
sendUpdates(jobExecutionContext);
}
}
}
}
private void dataInsightReindex(JobExecutionContext jobExecutionContext) {
Map<String, Object> contextData = new HashMap<>();
for (PaginatedDataInsightSource paginatedDataInsightSource : paginatedDataInsightSources) {
reCreateIndexes(paginatedDataInsightSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedDataInsightSource.getEntityType());
ResultList<ReportData> resultList;
while (!stopped && !paginatedDataInsightSource.isDone()) {
try {
resultList = paginatedDataInsightSource.readNext(null);
List<String> entityName =
resultList.getData().stream()
.map(
entity ->
String.format(
"%s %s", paginatedDataInsightSource.getEntityType(), entity.getId()))
.collect(Collectors.toList());
private List<String> getEntityNameFromEntity(
ResultList<? extends EntityInterface> resultList, String entityType) {
return resultList.getData().stream()
.map(entity -> String.format("%s %s", entityType, entity.getId()))
.toList();
}
contextData.put(ENTITY_NAME_LIST_KEY, entityName);
if (!resultList.getData().isEmpty()) {
searchIndexSink.write(
dataInsightProcessor.process(resultList, contextData), contextData);
}
paginatedDataInsightSource.updateStats(resultList.getData().size(), 0);
} catch (SearchIndexException ex) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(ex.getIndexingError());
paginatedDataInsightSource.updateStats(
ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount());
} finally {
updateStats(
paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
sendUpdates(jobExecutionContext);
}
private List<String> getEntityNameFromEntityTimeSeries(
ResultList<? extends EntityTimeSeriesInterface> resultList, String entityType) {
return resultList.getData().stream()
.map(entity -> String.format("%s %s", entityType, entity.getId()))
.toList();
}
private void processEntity(
ResultList<? extends EntityInterface> resultList,
Map<String, Object> contextData,
Source paginatedSource)
throws SearchIndexException {
if (!resultList.getData().isEmpty()) {
searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData);
if (!resultList.getErrors().isEmpty()) {
throw new SearchIndexException(
new IndexingError()
.withErrorSource(READER)
.withLastFailedCursor(paginatedSource.getLastFailedCursor())
.withSubmittedCount(paginatedSource.getBatchSize())
.withSuccessCount(resultList.getData().size())
.withFailedCount(resultList.getErrors().size())
.withMessage(
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
.withFailedEntities(resultList.getErrors()));
}
paginatedSource.updateStats(resultList.getData().size(), 0);
}
}
private void processEntityTimeSeries(
ResultList<? extends EntityTimeSeriesInterface> resultList,
Map<String, Object> contextData,
Source paginatedSource)
throws SearchIndexException {
if (!resultList.getData().isEmpty()) {
searchIndexSink.write(
entityTimeSeriesProcessor.process(resultList, contextData), contextData);
if (!resultList.getErrors().isEmpty()) {
throw new SearchIndexException(
new IndexingError()
.withErrorSource(READER)
.withLastFailedCursor(paginatedSource.getLastFailedCursor())
.withSubmittedCount(paginatedSource.getBatchSize())
.withSuccessCount(resultList.getData().size())
.withFailedCount(resultList.getErrors().size())
.withMessage(
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
.withFailedEntities(resultList.getErrors()));
}
paginatedSource.updateStats(resultList.getData().size(), 0);
}
}

View File

@ -115,6 +115,14 @@ public interface EntityTimeSeriesDAO {
update(getTimeSeriesTableName(), json, id.toString());
}
@SqlQuery(
"SELECT json FROM <table> <cond> " + "ORDER BY timestamp DESC LIMIT :limit OFFSET :offset")
List<String> listWithOffset(
@Define("table") String table,
@Define("cond") String cond,
@Bind("limit") int limit,
@Bind("offset") int offset);
@SqlQuery(
"SELECT json FROM <table> <cond> "
+ "AND timestamp BETWEEN :startTs AND :endTs "
@ -158,6 +166,10 @@ public interface EntityTimeSeriesDAO {
getTimeSeriesTableName(), filter.getCondition(), limit, offset, startTs, endTs);
}
default List<String> listWithOffset(ListFilter filter, int limit, int offset) {
return listWithOffset(getTimeSeriesTableName(), filter.getCondition(), limit, offset);
}
@ConnectionAwareSqlUpdate(
value =
"UPDATE <table> set json = :json where entityFQNHash=:entityFQNHash and extension=:extension and timestamp=:timestamp and json -> '$.operation' = :operation",
@ -198,6 +210,10 @@ public interface EntityTimeSeriesDAO {
return listCount(getTimeSeriesTableName(), filter.getCondition());
}
default int listCount() {
return listCount(new ListFilter(null));
}
@SqlQuery("SELECT count(*) FROM <table> <cond> AND timestamp BETWEEN :startTs AND :endTs")
int listCount(
@Define("table") String table,

View File

@ -3,11 +3,14 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.schema.type.Include.ALL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.Getter;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.system.EntityError;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
@ -129,9 +132,21 @@ public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInter
: null;
}
public final ResultList<T> getResultList(
List<T> entities,
String beforeCursor,
String afterCursor,
int total,
List<EntityError> errors) {
if (errors == null) {
return new ResultList<>(entities, beforeCursor, afterCursor, total);
}
return new ResultList<>(entities, errors, beforeCursor, afterCursor, total);
}
public final ResultList<T> getResultList(
List<T> entities, String beforeCursor, String afterCursor, int total) {
return new ResultList<>(entities, beforeCursor, afterCursor, total);
return getResultList(entities, beforeCursor, afterCursor, total, null);
}
/**
@ -139,36 +154,52 @@ public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInter
*
* @return ResultList
*/
protected ResultList<T> listWithOffset(
String offset, ListFilter filter, int limitParam, Long startTs, Long endTs, boolean latest) {
public ResultList<T> listWithOffset(
String offset,
ListFilter filter,
int limitParam,
Long startTs,
Long endTs,
boolean latest,
boolean skipErrors) {
int total = timeSeriesDao.listCount(filter, startTs, endTs, latest);
List<T> entityList = new ArrayList<>();
int offsetInt = offset != null ? Integer.parseInt(RestUtil.decodeCursor(offset)) : 0;
int afterOffsetInt = offsetInt + limitParam;
int beforeOffsetInt = offsetInt - limitParam;
// If offset is negative, then set it to 0 if you pass offset 4 and limit 10, then the previous
// page will be at offset 0
if (beforeOffsetInt < 0) beforeOffsetInt = 0;
// if offsetInt is 0 (i.e. either no offset or offset is 0), then set it to null as there is no
// previous page
String beforeOffset = (offsetInt == 0) ? null : String.valueOf(beforeOffsetInt);
// If afterOffset is greater than total, then set it to null to indicate end of list
String afterOffset = afterOffsetInt >= total ? null : String.valueOf(afterOffsetInt);
List<EntityError> errors = null;
int offsetInt = getOffset(offset);
String afterOffset = getAfterOffset(offsetInt, limitParam, total);
String beforeOffset = getBeforeOffset(offsetInt, limitParam);
if (limitParam > 0) {
List<String> jsons =
timeSeriesDao.listWithOffset(filter, limitParam, offsetInt, startTs, endTs, latest);
for (String json : jsons) {
T recordEntity = JsonUtils.readValue(json, entityClass);
setInheritedFields(recordEntity);
entityList.add(recordEntity);
Map<String, List<?>> entityListMap = getEntityList(jsons, skipErrors);
entityList = (List<T>) entityListMap.get("entityList");
if (skipErrors) {
errors = (List<EntityError>) entityListMap.get("errors");
}
return getResultList(entityList, beforeOffset, afterOffset, total);
return getResultList(entityList, beforeOffset, afterOffset, total, errors);
} else {
return getResultList(entityList, null, null, total);
}
}
public ResultList<T> listWithOffset(
String offset, ListFilter filter, int limitParam, boolean skipErrors) {
int total = timeSeriesDao.listCount(filter);
List<T> entityList = new ArrayList<>();
List<EntityError> errors = null;
int offsetInt = getOffset(offset);
String afterOffset = getAfterOffset(offsetInt, limitParam, total);
String beforeOffset = getBeforeOffset(offsetInt, limitParam);
if (limitParam > 0) {
List<String> jsons = timeSeriesDao.listWithOffset(filter, limitParam, offsetInt);
Map<String, List<?>> entityListMap = getEntityList(jsons, skipErrors);
entityList = (List<T>) entityListMap.get("entityList");
if (skipErrors) {
errors = (List<EntityError>) entityListMap.get("errors");
}
return getResultList(entityList, beforeOffset, afterOffset, total, errors);
} else {
return getResultList(entityList, null, null, total);
}
@ -176,7 +207,7 @@ public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInter
public ResultList<T> list(
String offset, Long startTs, Long endTs, int limitParam, ListFilter filter, boolean latest) {
return listWithOffset(offset, filter, limitParam, startTs, endTs, latest);
return listWithOffset(offset, filter, limitParam, startTs, endTs, latest, false);
}
public T getLatestRecord(String recordFQN) {
@ -212,4 +243,47 @@ public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInter
}
timeSeriesDao.deleteById(id);
}
private String getAfterOffset(int offsetInt, int limit, int total) {
int afterOffset = offsetInt + limit;
// If afterOffset is greater than total, then set it to null to indicate end of list
return afterOffset >= total ? null : String.valueOf(afterOffset);
}
private String getBeforeOffset(int offsetInt, int limit) {
int beforeOffsetInt = offsetInt - limit;
// If offset is negative, then set it to 0 if you pass offset 4 and limit 10, then the previous
// page will be at offset 0
if (beforeOffsetInt < 0) beforeOffsetInt = 0;
// if offsetInt is 0 (i.e. either no offset or offset is 0), then set it to null as there is no
// previous page
return (offsetInt == 0) ? null : String.valueOf(beforeOffsetInt);
}
private int getOffset(String offset) {
return offset != null ? Integer.parseInt(RestUtil.decodeCursor(offset)) : 0;
}
private Map<String, List<?>> getEntityList(List<String> jsons, boolean skipErrors) {
List<T> entityList = new ArrayList<>();
List<EntityError> errors = new ArrayList<>();
Map<String, List<?>> resultList = new HashMap<>();
for (String json : jsons) {
try {
T recordEntity = JsonUtils.readValue(json, entityClass);
setInheritedFields(recordEntity);
entityList.add(recordEntity);
} catch (Exception e) {
if (!skipErrors) {
throw e;
}
errors.add(new EntityError().withMessage(e.getMessage()));
}
}
resultList.put("entityList", entityList);
resultList.put("errors", errors);
return resultList;
}
}

View File

@ -9,7 +9,6 @@ import static org.openmetadata.service.Entity.TEST_CASE;
import static org.openmetadata.service.Entity.TEST_SUITE;
import static org.openmetadata.service.util.FullyQualifiedName.quoteName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@ -232,7 +231,7 @@ public class TestSuiteRepository extends EntityRepository<TestSuite> {
testSummary = getEntityTestCasesExecutionSummary(testCaseResultSummary);
}
return testSummary;
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Error reading aggregation query", e);
}
return null;

View File

@ -8,31 +8,32 @@ import es.org.elasticsearch.action.update.UpdateRequest;
import es.org.elasticsearch.xcontent.XContentType;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.search.indexes.ReportDataIndexes;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Processor;
@Slf4j
public class ElasticSearchDataInsightProcessor
implements Processor<BulkRequest, ResultList<ReportData>> {
public class ElasticSearchEntityTimeSeriesProcessor
implements Processor<BulkRequest, ResultList<? extends EntityTimeSeriesInterface>> {
private final StepStats stats = new StepStats();
public ElasticSearchDataInsightProcessor(int total) {
public ElasticSearchEntityTimeSeriesProcessor(int total) {
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
}
@Override
public BulkRequest process(ResultList<ReportData> input, Map<String, Object> contextData)
public BulkRequest process(
ResultList<? extends EntityTimeSeriesInterface> input, Map<String, Object> contextData)
throws SearchIndexException {
String entityType = (String) contextData.get(ENTITY_TYPE_KEY);
if (CommonUtil.nullOrEmpty(entityType)) {
@ -70,23 +71,26 @@ public class ElasticSearchDataInsightProcessor
return requests;
}
private BulkRequest buildBulkRequests(String entityType, List<ReportData> entities) {
private BulkRequest buildBulkRequests(
String entityType, List<? extends EntityTimeSeriesInterface> entities) {
BulkRequest bulkRequests = new BulkRequest();
for (ReportData reportData : entities) {
UpdateRequest request = getUpdateRequest(entityType, reportData);
for (EntityTimeSeriesInterface entity : entities) {
UpdateRequest request = getUpdateRequest(entityType, entity);
bulkRequests.add(request);
}
return bulkRequests;
}
private UpdateRequest getUpdateRequest(String entityType, ReportData reportData) {
private UpdateRequest getUpdateRequest(String entityType, EntityTimeSeriesInterface entity) {
IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType);
UpdateRequest updateRequest =
new UpdateRequest(
indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()),
reportData.getId().toString());
entity.getId().toString());
updateRequest.doc(
JsonUtils.pojoToJson(new ReportDataIndexes(reportData).buildSearchIndexDoc()),
JsonUtils.pojoToJson(
Objects.requireNonNull(
Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc())),
XContentType.JSON);
updateRequest.docAsUpsert(true);
return updateRequest;

View File

@ -5,15 +5,15 @@ import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getU
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.search.indexes.ReportDataIndexes;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
@ -23,16 +23,17 @@ import os.org.opensearch.action.update.UpdateRequest;
import os.org.opensearch.common.xcontent.XContentType;
@Slf4j
public class OpenSearchDataInsightProcessor
implements Processor<BulkRequest, ResultList<ReportData>> {
public class OpenSearchEntityTimeSeriesProcessor
implements Processor<BulkRequest, ResultList<? extends EntityTimeSeriesInterface>> {
private final StepStats stats = new StepStats();
public OpenSearchDataInsightProcessor(int total) {
public OpenSearchEntityTimeSeriesProcessor(int total) {
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
}
@Override
public BulkRequest process(ResultList<ReportData> input, Map<String, Object> contextData)
public BulkRequest process(
ResultList<? extends EntityTimeSeriesInterface> input, Map<String, Object> contextData)
throws SearchIndexException {
String entityType = (String) contextData.get(ENTITY_TYPE_KEY);
if (CommonUtil.nullOrEmpty(entityType)) {
@ -70,23 +71,26 @@ public class OpenSearchDataInsightProcessor
return requests;
}
private BulkRequest buildBulkRequests(String entityType, List<ReportData> entities) {
private BulkRequest buildBulkRequests(
String entityType, List<? extends EntityTimeSeriesInterface> entities) {
BulkRequest bulkRequests = new BulkRequest();
for (ReportData reportData : entities) {
UpdateRequest request = getUpdateRequest(entityType, reportData);
for (EntityTimeSeriesInterface entity : entities) {
UpdateRequest request = getUpdateRequest(entityType, entity);
bulkRequests.add(request);
}
return bulkRequests;
}
private UpdateRequest getUpdateRequest(String entityType, ReportData reportData) {
private UpdateRequest getUpdateRequest(String entityType, EntityTimeSeriesInterface entity) {
IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType);
UpdateRequest updateRequest =
new UpdateRequest(
indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()),
reportData.getId().toString());
entity.getId().toString());
updateRequest.doc(
JsonUtils.pojoToJson(new ReportDataIndexes(reportData).buildSearchIndexDoc()),
JsonUtils.pojoToJson(
Objects.requireNonNull(
Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc())),
XContentType.JSON);
updateRequest.docAsUpsert(true);
return updateRequest;

View File

@ -23,4 +23,12 @@ public interface Source<R> extends Stats {
List<String> getReaderErrors();
void reset();
String getEntityType();
int getBatchSize();
String getLastFailedCursor();
boolean isDone();
}

View File

@ -1,150 +0,0 @@
/*
* Copyright 2022 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.workflows.searchIndex;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Source;
@Slf4j
public class PaginatedDataInsightSource implements Source<ResultList<ReportData>> {
private final CollectionDAO dao;
@Getter private final String entityType;
@Getter private final int batchSize;
@Getter private final List<String> readerErrors = new ArrayList<>();
@Getter private final StepStats stats = new StepStats();
private String cursor = null;
@Getter private boolean isDone = false;
public PaginatedDataInsightSource(CollectionDAO dao, String entityType, int batchSize) {
this.dao = dao;
this.entityType = entityType;
this.batchSize = batchSize;
this.stats
.withTotalRecords(
dao.reportDataTimeSeriesDao()
.listCount(new ListFilter(null).addQueryParam("entityFQNHash", entityType)))
.withSuccessRecords(0)
.withFailedRecords(0);
}
@Override
public ResultList<ReportData> readNext(Map<String, Object> contextData)
throws SearchIndexException {
if (!isDone) {
ResultList<ReportData> data = read(cursor);
cursor = data.getPaging().getAfter();
if (cursor == null) {
isDone = true;
}
return data;
}
return null;
}
@Override
public void reset() {
cursor = null;
isDone = false;
}
private ResultList<ReportData> read(String afterCursor) throws SearchIndexException {
LOG.debug("[DataInsightReader] Fetching a Batch of Size: {} ", batchSize);
ResultList<ReportData> result = null;
try {
result = getReportDataPagination(entityType, batchSize, afterCursor);
LOG.debug(
"[DataInsightReader] Batch Stats :- Submitted : {} Success: {} Failed: {}",
batchSize,
result.getData().size(),
0);
// updateStats(result.getData().size(), result.getErrors().size());
} catch (Exception ex) {
IndexingError indexingError =
new IndexingError()
.withErrorSource(IndexingError.ErrorSource.READER)
.withSubmittedCount(batchSize)
.withSuccessCount(0)
.withFailedCount(batchSize)
.withMessage("Issues in Reading A Batch For Data Insight Data.")
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(ex));
LOG.debug(
"[DataInsightReader] Failing Completely. Details : {}",
JsonUtils.pojoToJson(indexingError));
if (result != null) {
if (result.getPaging().getAfter() == null) {
isDone = true;
int recordToRead =
stats.getTotalRecords() - (stats.getSuccessRecords() + stats.getFailedRecords());
updateStats(result.getData().size(), recordToRead - result.getData().size());
} else {
updateStats(result.getData().size(), batchSize - result.getData().size());
}
} else {
updateStats(0, batchSize);
}
throw new SearchIndexException(indexingError);
}
return result;
}
public ResultList<ReportData> getReportDataPagination(String entityFQN, int limit, String after) {
int reportDataCount =
dao.reportDataTimeSeriesDao()
.listCount(new ListFilter(null).addQueryParam("entityFQNHash", entityFQN));
List<CollectionDAO.ReportDataRow> reportDataList =
dao.reportDataTimeSeriesDao()
.getAfterExtension(
entityFQN, limit + 1, after == null ? "0" : RestUtil.decodeCursor(after));
return getAfterExtensionList(reportDataList, after, limit, reportDataCount);
}
private ResultList<ReportData> getAfterExtensionList(
List<CollectionDAO.ReportDataRow> reportDataRowList, String after, int limit, int total) {
String beforeCursor;
String afterCursor = null;
beforeCursor = after == null ? null : reportDataRowList.get(0).getRowNum();
if (reportDataRowList.size() > limit) {
reportDataRowList.remove(limit);
afterCursor = reportDataRowList.get(limit - 1).getRowNum();
}
List<ReportData> reportDataList = new ArrayList<>();
for (CollectionDAO.ReportDataRow reportDataRow : reportDataRowList) {
reportDataList.add(reportDataRow.getReportData());
}
return new ResultList<>(reportDataList, new ArrayList<>(), beforeCursor, afterCursor, total);
}
@Override
public void updateStats(int currentSuccess, int currentFailed) {
getUpdatedStats(stats, currentSuccess, currentFailed);
}
}

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.schema.EntityInterface;
@ -42,7 +43,7 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
@Getter private final List<String> readerErrors = new ArrayList<>();
@Getter private final StepStats stats = new StepStats();
@Getter private String lastFailedCursor = null;
private String cursor = RestUtil.encodeCursor("0");
@Setter private String cursor = RestUtil.encodeCursor("0");
@Getter private boolean isDone = false;
public PaginatedEntitiesSource(String entityType, int batchSize, List<String> fields) {
@ -139,8 +140,4 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
public void updateStats(int currentSuccess, int currentFailed) {
getUpdatedStats(stats, currentSuccess, currentFailed);
}
public void setCursor(String cursor) {
this.cursor = cursor;
}
}

View File

@ -0,0 +1,141 @@
package org.openmetadata.service.workflows.searchIndex;
import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Source;
@Slf4j
public class PaginatedEntityTimeSeriesSource
implements Source<ResultList<? extends EntityTimeSeriesInterface>> {
@Getter private final int batchSize;
@Getter private final String entityType;
@Getter private final List<String> fields;
@Getter private final List<String> readerErrors = new ArrayList<>();
@Getter private final StepStats stats = new StepStats();
@Getter private String lastFailedCursor = null;
@Setter private String cursor = RestUtil.encodeCursor("0");
@Getter private boolean isDone = false;
public PaginatedEntityTimeSeriesSource(String entityType, int batchSize, List<String> fields) {
this.entityType = entityType;
this.batchSize = batchSize;
this.fields = fields;
this.stats
.withTotalRecords(getEntityTimeSeriesRepository().getTimeSeriesDao().listCount(getFilter()))
.withSuccessRecords(0)
.withFailedRecords(0);
}
@Override
public ResultList<? extends EntityTimeSeriesInterface> readNext(Map<String, Object> contextData)
throws SearchIndexException {
ResultList<? extends EntityTimeSeriesInterface> data = null;
if (!isDone) {
data = read(cursor);
cursor = data.getPaging().getAfter();
if (cursor == null) {
isDone = true;
}
}
return data;
}
private ResultList<? extends EntityTimeSeriesInterface> read(String cursor)
throws SearchIndexException {
LOG.debug("[PaginatedEntityTimeSeriesSource] Fetching a Batch of Size: {} ", batchSize);
EntityTimeSeriesRepository<? extends EntityTimeSeriesInterface> repository =
getEntityTimeSeriesRepository();
ResultList<? extends EntityTimeSeriesInterface> result;
ListFilter filter = getFilter();
try {
result = repository.listWithOffset(cursor, filter, batchSize, true);
if (!result.getErrors().isEmpty()) {
lastFailedCursor = this.cursor;
if (result.getPaging().getAfter() == null) {
isDone = true;
} else {
this.cursor = result.getPaging().getAfter();
}
return result;
}
LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, result.getData().size(), result.getErrors().size());
} catch (Exception e) {
lastFailedCursor = this.cursor;
int remainingRecords =
stats.getTotalRecords() - stats.getFailedRecords() - stats.getSuccessRecords();
int submittedRecords;
if (remainingRecords - batchSize <= 0) {
submittedRecords = remainingRecords;
updateStats(0, remainingRecords);
this.cursor = null;
this.isDone = true;
} else {
submittedRecords = batchSize;
String decodedCursor = RestUtil.decodeCursor(cursor);
this.cursor =
RestUtil.encodeCursor(String.valueOf(Integer.parseInt(decodedCursor) + batchSize));
updateStats(0, batchSize);
}
IndexingError indexingError =
new IndexingError()
.withErrorSource(READER)
.withSubmittedCount(submittedRecords)
.withSuccessCount(0)
.withFailedCount(submittedRecords)
.withMessage(
"Issues in Reading A Batch For Entities. No Relationship Issue , Json Processing or DB issue.")
.withLastFailedCursor(lastFailedCursor)
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
LOG.debug(indexingError.getMessage());
throw new SearchIndexException(indexingError);
}
return result;
}
@Override
public void reset() {
cursor = null;
isDone = false;
}
@Override
public void updateStats(int currentSuccess, int currentFailed) {
getUpdatedStats(stats, currentSuccess, currentFailed);
}
private ListFilter getFilter() {
ListFilter filter = new ListFilter(null);
if (ReindexingUtil.isDataInsightIndex(entityType)) {
filter.addQueryParam("entityFQNHash", entityType);
}
return filter;
}
private EntityTimeSeriesRepository<? extends EntityTimeSeriesInterface>
getEntityTimeSeriesRepository() {
if (ReindexingUtil.isDataInsightIndex(entityType)) {
return Entity.getEntityTimeSeriesRepository(Entity.ENTITY_REPORT_DATA);
} else {
return Entity.getEntityTimeSeriesRepository(entityType);
}
}
}

View File

@ -13,6 +13,8 @@
package org.openmetadata.service.workflows.searchIndex;
import static org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp.TIME_SERIES_ENTITIES;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.util.ArrayList;
@ -29,6 +31,7 @@ import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.search.SearchRequest;
import org.openmetadata.service.util.JsonUtils;
@ -55,13 +58,19 @@ public class ReindexingUtil {
public static int getTotalRequestToProcess(Set<String> entities, CollectionDAO dao) {
int total = 0;
for (String entityType : entities) {
if (!isDataInsightIndex(entityType)) {
if (!TIME_SERIES_ENTITIES.contains(entityType)) {
EntityRepository<?> repository = Entity.getEntityRepository(entityType);
total += repository.getDao().listTotalCount();
} else {
total +=
dao.reportDataTimeSeriesDao()
.listCount(new ListFilter(null).addQueryParam("entityFQNHash", entityType));
EntityTimeSeriesRepository<?> repository;
ListFilter listFilter = new ListFilter(null);
if (isDataInsightIndex(entityType)) {
listFilter.addQueryParam("entityFQNHash", entityType);
repository = Entity.getEntityTimeSeriesRepository(Entity.ENTITY_REPORT_DATA);
} else {
repository = Entity.getEntityTimeSeriesRepository(entityType);
}
total += repository.getTimeSeriesDao().listCount(listFilter);
}
}
return total;

View File

@ -36,7 +36,8 @@
"domain",
"storedProcedure",
"storageService",
"dataProduct"
"dataProduct",
"testCaseResolutionStatus"
],
"recreateIndex": true,
"batchSize": "100",

View File

@ -13,7 +13,7 @@
import { uuid } from '../../constants/constants';
import { CustomPropertySupportedEntityList } from '../../constants/CustomProperty.constant';
import { ENTITY_PATH, EntityType } from '../../constants/Entity.interface';
import { EntityType, ENTITY_PATH } from '../../constants/Entity.interface';
import {
createAnnouncement as createAnnouncementUtil,
createInactiveAnnouncement as createInactiveAnnouncementUtil,

View File

@ -66,6 +66,7 @@ export enum EntityType {
ENTITY_REPORT_DATA = 'entityReportData',
WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA = 'webAnalyticEntityViewReportData',
WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA = 'webAnalyticUserActivityReportData',
TEST_CASE_RESOLUTION_STATUS = 'test_case_resolution_status_search_index',
EVENT_SUBSCRIPTION = 'eventsubscription',
}

View File

@ -42,6 +42,7 @@ import { SearchService } from '../generated/entity/services/searchService';
import { Team } from '../generated/entity/teams/team';
import { User } from '../generated/entity/teams/user';
import { TestCase } from '../generated/tests/testCase';
import { TestCaseResolutionStatus } from '../generated/tests/testCaseResolutionStatus';
import { TestSuite } from '../generated/tests/testSuite';
import { TagLabel } from '../generated/type/tagLabel';
import { AggregatedCostAnalysisReportDataSearchSource } from './data-insight.interface';
@ -131,6 +132,16 @@ export interface TestCaseSearchSource
} // extends EntityInterface
export interface TestSuiteSearchSource extends SearchSourceBase, TestSuite {}
export interface TestCaseResolutionStatusSearchSource
extends SearchSourceBase,
TestCaseResolutionStatus {
name: string;
displayName: string;
fullyQualifiedName: string;
serviceType: string;
description: string;
}
export interface IngestionPipelineSearchSource
extends SearchSourceBase,
IngestionPipeline {}

View File

@ -51,7 +51,8 @@
"webAnalyticUserActivityReportData",
"domain",
"storedProcedure",
"dataProduct"
"dataProduct",
"testCaseResolutionStatus"
]
},
"default": [
@ -89,7 +90,8 @@
"domain",
"storedProcedure",
"dataProduct",
"ingestionPipeline"
"ingestionPipeline",
"testCaseResolutionStatus"
],
"uniqueItems": true
},

View File

@ -188,6 +188,11 @@ export const MOCK_APPLICATION_ENTITY_STATS = {
failedRecords: 0,
successRecords: 4,
},
[EntityType.TEST_CASE_RESOLUTION_STATUS]: {
totalRecords: 4,
failedRecords: 0,
successRecords: 4,
},
};
export const MOCK_APPLICATION_ENTITY_STATS_DATA = [
@ -401,4 +406,10 @@ export const MOCK_APPLICATION_ENTITY_STATS_DATA = [
failedRecords: 0,
successRecords: 4,
},
{
name: EntityType.TEST_CASE_RESOLUTION_STATUS,
totalRecords: 4,
failedRecords: 0,
successRecords: 4,
},
];