diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java index ebb6feb414b..ada8d653bfa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java @@ -37,6 +37,9 @@ import org.openmetadata.service.util.JsonUtils; @Slf4j public class ElasticSearchIndexDefinition { private static final String REASON_TRACE = "Reason: [%s] , Trace : [%s]"; + public static final String ENTITY_REPORT_DATA = "entityReportData"; + public static final String WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA = "webAnalyticEntityViewReportData"; + public static final String WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA = "webAnalyticUserActivityReportData"; private final CollectionDAO dao; final EnumMap elasticSearchIndexes = new EnumMap<>(ElasticSearchIndexType.class); @@ -203,6 +206,12 @@ public class ElasticSearchIndexDefinition { return ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX; } else if (type.equalsIgnoreCase(Entity.TAG)) { return ElasticSearchIndexType.TAG_SEARCH_INDEX; + } else if (type.equalsIgnoreCase(ENTITY_REPORT_DATA)) { + return ElasticSearchIndexType.ENTITY_REPORT_DATA_INDEX; + } else if (type.equalsIgnoreCase(WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA)) { + return ElasticSearchIndexType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA_INDEX; + } else if (type.equalsIgnoreCase(WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)) { + return ElasticSearchIndexType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA_INDEX; } throw new RuntimeException("Failed to find index doc for type " + type); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ReportDataIndexes.java b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ReportDataIndexes.java new file mode 100644 index 00000000000..00ea92e3bb8 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ReportDataIndexes.java @@ -0,0 +1,23 @@ +package org.openmetadata.service.elasticsearch; + +import java.util.Map; +import org.openmetadata.schema.analytics.ReportData; +import org.openmetadata.service.util.JsonUtils; + +public class ReportDataIndexes implements ElasticSearchIndex { + + final ReportData reportData; + + public ReportDataIndexes(ReportData reportData) { + this.reportData = reportData; + } + + @Override + public Map buildESDoc() { + Map doc = JsonUtils.getMap(reportData); + doc.put("timestamp", reportData.getTimestamp()); + doc.put("reportDataType", reportData.getReportDataType()); + doc.put("data", reportData.getData()); + return doc; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index c66899dec68..dec58a7127f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -3096,6 +3096,9 @@ public interface CollectionDAO { @SqlQuery("SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension") String getExtension(@Bind("entityFQN") String entityId, @Bind("extension") String extension); + @SqlQuery("SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN") + List getExtension(@Bind("entityFQN") String entityFQN); + @SqlQuery( "SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension AND timestamp = :timestamp") String getExtensionAtTimestamp( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticsearch/BuildSearchIndexResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticsearch/BuildSearchIndexResource.java index 3b5939ffa84..713ff5ca049 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticsearch/BuildSearchIndexResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticsearch/BuildSearchIndexResource.java @@ -43,6 +43,7 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.api.CreateEventPublisherJob; import org.openmetadata.schema.api.CreateEventPublisherJob.RunMode; import org.openmetadata.schema.entity.data.Table; @@ -56,6 +57,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition; import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory; +import org.openmetadata.service.elasticsearch.ReportDataIndexes; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.ListFilter; @@ -255,12 +257,33 @@ public class BuildSearchIndexResource { } } + private synchronized void updateDataInsightBatch( + BulkProcessor processor, + BulkProcessorListener listener, + String entityType, + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType) + throws IOException { + List entityReportData; + if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.ENTITY_REPORT_DATA)) { + entityReportData = dao.entityExtensionTimeSeriesDao().getExtension("EntityReportData"); + } else if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA)) { + entityReportData = dao.entityExtensionTimeSeriesDao().getExtension("WebAnalyticEntityViewReportData"); + } else if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)) { + entityReportData = dao.entityExtensionTimeSeriesDao().getExtension("WebAnalyticUserActivityViewReportData"); + } else { + return; + } + updateElasticSearchForDataInsightBatch(processor, indexType, entityType, entityReportData); + listener.addRequests(entityReportData.size()); + } + private synchronized void updateEntityBatch( BulkProcessor processor, BulkProcessorListener listener, UriInfo uriInfo, String entityType, - CreateEventPublisherJob createRequest) { + CreateEventPublisherJob createRequest) + throws IOException { listener.allowTotalRequestUpdate(); ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = @@ -273,38 +296,45 @@ public class BuildSearchIndexResource { elasticSearchIndexDefinition.createIndex(indexType); } - // Start fetching a list of Entities and pushing them to ES - EntityRepository entityRepository = Entity.getEntityRepository(entityType); - List allowedFields = entityRepository.getAllowedFields(); - String fields = String.join(",", allowedFields); - ResultList result; - String after = null; - try { - do { - if (entityType.equals(TEAM)) { - // just name and display name are needed - fields = "name,displayName"; - } - result = - entityRepository.listAfter( - uriInfo, - new EntityUtil.Fields(allowedFields, fields), - new ListFilter(Include.ALL), - createRequest.getBatchSize(), - after); - listener.addRequests(result.getPaging().getTotal()); - updateElasticSearchForEntityBatch(indexType, processor, entityType, result.getData()); - processor.flush(); - after = result.getPaging().getAfter(); - } while (after != null); - } catch (Exception ex) { - LOG.error("Failed in listing all Entities of type : {}, Reason : ", entityType, ex); - FailureDetails failureDetails = - new FailureDetails() - .withContext(String.format("%s:Failure in fetching Data", entityType)) - .withLastFailedReason( - String.format("Failed in listing all Entities \n Reason : %s", ExceptionUtils.getStackTrace(ex))); - listener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, failureDetails, null); + // Start fetching a list of Report Data and pushing them to ES + if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.ENTITY_REPORT_DATA) + || entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA) + || entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)) { + updateDataInsightBatch(processor, listener, entityType, indexType); + } else { + // Start fetching a list of Entities and pushing them to ES + EntityRepository entityRepository = Entity.getEntityRepository(entityType); + List allowedFields = entityRepository.getAllowedFields(); + String fields = String.join(",", allowedFields); + ResultList result; + String after = null; + try { + do { + if (entityType.equals(TEAM)) { + // just name and display name are needed + fields = "name,displayName"; + } + result = + entityRepository.listAfter( + uriInfo, + new EntityUtil.Fields(allowedFields, fields), + new ListFilter(Include.ALL), + createRequest.getBatchSize(), + after); + listener.addRequests(result.getPaging().getTotal()); + updateElasticSearchForEntityBatch(indexType, processor, entityType, result.getData()); + processor.flush(); + after = result.getPaging().getAfter(); + } while (after != null); + } catch (Exception ex) { + LOG.error("Failed in listing all Entities of type : {}, Reason : ", entityType, ex); + FailureDetails failureDetails = + new FailureDetails() + .withContext(String.format("%s:Failure in fetching Data", entityType)) + .withLastFailedReason( + String.format("Failed in listing all Entities \n Reason : %s", ExceptionUtils.getStackTrace(ex))); + listener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, failureDetails, null); + } } } @@ -344,6 +374,20 @@ public class BuildSearchIndexResource { } } + private synchronized void updateElasticSearchForDataInsightBatch( + BulkProcessor bulkProcessor, + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, + String entityType, + List entities) + throws IOException { + for (String reportData : entities) { + UpdateRequest request = getUpdateRequest(indexType, entityType, reportData); + if (request != null) { + bulkProcessor.add(request); + } + } + } + private synchronized void updateElasticSearchForEntityBatch( ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, BulkProcessor bulkProcessor, @@ -408,4 +452,19 @@ public class BuildSearchIndexResource { } return null; } + + private UpdateRequest getUpdateRequest( + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, String entityType, String entity) + throws IOException { + ReportData reportData = JsonUtils.readValue(entity, ReportData.class); + try { + UpdateRequest updateRequest = new UpdateRequest(indexType.indexName, reportData.getId().toString()); + updateRequest.doc(JsonUtils.pojoToJson(new ReportDataIndexes(reportData).buildESDoc()), XContentType.JSON); + updateRequest.docAsUpsert(true); + return updateRequest; + } catch (Exception ex) { + LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", indexType, entityType, ex); + } + return null; + } }