Fixed#9289 : Support Data Insight Index Refresh from Elastic Search (#9412)

* Fixed#9289 : Support Data Insight Index Refresh from Elastic Search

* Fixed#9289 : Support Data Insight Index Refresh from Elastic Search
This commit is contained in:
Parth Panchal 2022-12-20 13:51:16 +05:30 committed by GitHub
parent ee05bb412d
commit 36e01a2b21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 33 deletions

View File

@ -37,6 +37,9 @@ import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class ElasticSearchIndexDefinition {
private static final String REASON_TRACE = "Reason: [%s] , Trace : [%s]";
public static final String ENTITY_REPORT_DATA = "entityReportData";
public static final String WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA = "webAnalyticEntityViewReportData";
public static final String WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA = "webAnalyticUserActivityReportData";
private final CollectionDAO dao;
final EnumMap<ElasticSearchIndexType, ElasticSearchIndexStatus> elasticSearchIndexes =
new EnumMap<>(ElasticSearchIndexType.class);
@ -203,6 +206,12 @@ public class ElasticSearchIndexDefinition {
return ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(Entity.TAG)) {
return ElasticSearchIndexType.TAG_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(ENTITY_REPORT_DATA)) {
return ElasticSearchIndexType.ENTITY_REPORT_DATA_INDEX;
} else if (type.equalsIgnoreCase(WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA)) {
return ElasticSearchIndexType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA_INDEX;
} else if (type.equalsIgnoreCase(WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)) {
return ElasticSearchIndexType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA_INDEX;
}
throw new RuntimeException("Failed to find index doc for type " + type);
}

View File

@ -0,0 +1,23 @@
package org.openmetadata.service.elasticsearch;
import java.util.Map;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.service.util.JsonUtils;
public class ReportDataIndexes implements ElasticSearchIndex {
final ReportData reportData;
public ReportDataIndexes(ReportData reportData) {
this.reportData = reportData;
}
@Override
public Map<String, Object> buildESDoc() {
Map<String, Object> doc = JsonUtils.getMap(reportData);
doc.put("timestamp", reportData.getTimestamp());
doc.put("reportDataType", reportData.getReportDataType());
doc.put("data", reportData.getData());
return doc;
}
}

View File

@ -3096,6 +3096,9 @@ public interface CollectionDAO {
@SqlQuery("SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension")
String getExtension(@Bind("entityFQN") String entityId, @Bind("extension") String extension);
@SqlQuery("SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN")
List<String> getExtension(@Bind("entityFQN") String entityFQN);
@SqlQuery(
"SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension AND timestamp = :timestamp")
String getExtensionAtTimestamp(

View File

@ -43,6 +43,7 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.api.CreateEventPublisherJob.RunMode;
import org.openmetadata.schema.entity.data.Table;
@ -56,6 +57,7 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory;
import org.openmetadata.service.elasticsearch.ReportDataIndexes;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
@ -255,12 +257,33 @@ public class BuildSearchIndexResource {
}
}
private synchronized void updateDataInsightBatch(
BulkProcessor processor,
BulkProcessorListener listener,
String entityType,
ElasticSearchIndexDefinition.ElasticSearchIndexType indexType)
throws IOException {
List<String> entityReportData;
if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.ENTITY_REPORT_DATA)) {
entityReportData = dao.entityExtensionTimeSeriesDao().getExtension("EntityReportData");
} else if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA)) {
entityReportData = dao.entityExtensionTimeSeriesDao().getExtension("WebAnalyticEntityViewReportData");
} else if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)) {
entityReportData = dao.entityExtensionTimeSeriesDao().getExtension("WebAnalyticUserActivityViewReportData");
} else {
return;
}
updateElasticSearchForDataInsightBatch(processor, indexType, entityType, entityReportData);
listener.addRequests(entityReportData.size());
}
private synchronized void updateEntityBatch(
BulkProcessor processor,
BulkProcessorListener listener,
UriInfo uriInfo,
String entityType,
CreateEventPublisherJob createRequest) {
CreateEventPublisherJob createRequest)
throws IOException {
listener.allowTotalRequestUpdate();
ElasticSearchIndexDefinition.ElasticSearchIndexType indexType =
@ -273,38 +296,45 @@ public class BuildSearchIndexResource {
elasticSearchIndexDefinition.createIndex(indexType);
}
// Start fetching a list of Entities and pushing them to ES
EntityRepository<EntityInterface> entityRepository = Entity.getEntityRepository(entityType);
List<String> allowedFields = entityRepository.getAllowedFields();
String fields = String.join(",", allowedFields);
ResultList<EntityInterface> result;
String after = null;
try {
do {
if (entityType.equals(TEAM)) {
// just name and display name are needed
fields = "name,displayName";
}
result =
entityRepository.listAfter(
uriInfo,
new EntityUtil.Fields(allowedFields, fields),
new ListFilter(Include.ALL),
createRequest.getBatchSize(),
after);
listener.addRequests(result.getPaging().getTotal());
updateElasticSearchForEntityBatch(indexType, processor, entityType, result.getData());
processor.flush();
after = result.getPaging().getAfter();
} while (after != null);
} catch (Exception ex) {
LOG.error("Failed in listing all Entities of type : {}, Reason : ", entityType, ex);
FailureDetails failureDetails =
new FailureDetails()
.withContext(String.format("%s:Failure in fetching Data", entityType))
.withLastFailedReason(
String.format("Failed in listing all Entities \n Reason : %s", ExceptionUtils.getStackTrace(ex)));
listener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, failureDetails, null);
// Start fetching a list of Report Data and pushing them to ES
if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.ENTITY_REPORT_DATA)
|| entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA)
|| entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)) {
updateDataInsightBatch(processor, listener, entityType, indexType);
} else {
// Start fetching a list of Entities and pushing them to ES
EntityRepository<EntityInterface> entityRepository = Entity.getEntityRepository(entityType);
List<String> allowedFields = entityRepository.getAllowedFields();
String fields = String.join(",", allowedFields);
ResultList<EntityInterface> result;
String after = null;
try {
do {
if (entityType.equals(TEAM)) {
// just name and display name are needed
fields = "name,displayName";
}
result =
entityRepository.listAfter(
uriInfo,
new EntityUtil.Fields(allowedFields, fields),
new ListFilter(Include.ALL),
createRequest.getBatchSize(),
after);
listener.addRequests(result.getPaging().getTotal());
updateElasticSearchForEntityBatch(indexType, processor, entityType, result.getData());
processor.flush();
after = result.getPaging().getAfter();
} while (after != null);
} catch (Exception ex) {
LOG.error("Failed in listing all Entities of type : {}, Reason : ", entityType, ex);
FailureDetails failureDetails =
new FailureDetails()
.withContext(String.format("%s:Failure in fetching Data", entityType))
.withLastFailedReason(
String.format("Failed in listing all Entities \n Reason : %s", ExceptionUtils.getStackTrace(ex)));
listener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, failureDetails, null);
}
}
}
@ -344,6 +374,20 @@ public class BuildSearchIndexResource {
}
}
private synchronized void updateElasticSearchForDataInsightBatch(
BulkProcessor bulkProcessor,
ElasticSearchIndexDefinition.ElasticSearchIndexType indexType,
String entityType,
List<String> entities)
throws IOException {
for (String reportData : entities) {
UpdateRequest request = getUpdateRequest(indexType, entityType, reportData);
if (request != null) {
bulkProcessor.add(request);
}
}
}
private synchronized void updateElasticSearchForEntityBatch(
ElasticSearchIndexDefinition.ElasticSearchIndexType indexType,
BulkProcessor bulkProcessor,
@ -408,4 +452,19 @@ public class BuildSearchIndexResource {
}
return null;
}
private UpdateRequest getUpdateRequest(
ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, String entityType, String entity)
throws IOException {
ReportData reportData = JsonUtils.readValue(entity, ReportData.class);
try {
UpdateRequest updateRequest = new UpdateRequest(indexType.indexName, reportData.getId().toString());
updateRequest.doc(JsonUtils.pojoToJson(new ReportDataIndexes(reportData).buildESDoc()), XContentType.JSON);
updateRequest.docAsUpsert(true);
return updateRequest;
} catch (Exception ex) {
LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", indexType, entityType, ex);
}
return null;
}
}