From a813d25efa41a54484b70b9a6092e10ec4d996c8 Mon Sep 17 00:00:00 2001 From: Parth Panchal <83201188+parthp2107@users.noreply.github.com> Date: Thu, 5 Jan 2023 11:42:54 +0530 Subject: [PATCH] Fixes#9576: Data insights page empty and reindexing fails (#9597) * Fixes#9576: Data insights page empty and reindexing fails * Fixes#9576: Data insights page empty and reindexing fails Co-authored-by: Sachin Chaurasiya --- .../service/jdbi3/CollectionDAO.java | 32 +- .../BuildSearchIndexResource.java | 439 +----------------- .../service/util/ElasticSearchIndexUtil.java | 347 ++++++++++++++ .../src/constants/elasticsearch.constant.ts | 8 +- 4 files changed, 381 insertions(+), 445 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/util/ElasticSearchIndexUtil.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 014579c6f31..2503805e6a9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -3110,19 +3110,35 @@ public interface CollectionDAO { @SqlQuery("SELECT count(*) FROM entity_extension_time_series WHERE EntityFQN = :entityFQN") int listCount(@Bind("entityFQN") String entityFQN); + @ConnectionAwareSqlQuery( + value = + "WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json " + + "FROM entity_extension_time_series WHERE EntityFQN = :entityFQN) " + + "SELECT row_num, json FROM data WHERE row_num < :before LIMIT :limit", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json " + + "FROM entity_extension_time_series WHERE EntityFQN = :entityFQN) " + + "SELECT row_num, json FROM data WHERE row_num < (:before :: integer) LIMIT :limit", + connectionType = POSTGRES) @RegisterRowMapper(ReportDataMapper.class) - @SqlQuery( - "WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json " - + "FROM entity_extension_time_series WHERE EntityFQN = :entityFQN) " - + "SELECT row_num, json FROM data WHERE row_num < :before LIMIT :limit") List getBeforeExtension( @Bind("entityFQN") String entityFQN, @Bind("limit") int limit, @Bind("before") String before); + @ConnectionAwareSqlQuery( + value = + "WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json " + + "FROM entity_extension_time_series WHERE EntityFQN = :entityFQN) " + + "SELECT row_num, json FROM data WHERE row_num > :after LIMIT :limit", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json " + + "FROM entity_extension_time_series WHERE EntityFQN = :entityFQN) " + + "SELECT row_num, json FROM data WHERE row_num > (:after :: integer) LIMIT :limit", + connectionType = POSTGRES) @RegisterRowMapper(ReportDataMapper.class) - @SqlQuery( - "WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json " - + "FROM entity_extension_time_series WHERE EntityFQN = :entityFQN) " - + "SELECT row_num, json FROM data WHERE row_num > :after LIMIT :limit") List getAfterExtension( @Bind("entityFQN") String entityFQN, @Bind("limit") int limit, @Bind("after") String after); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticsearch/BuildSearchIndexResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticsearch/BuildSearchIndexResource.java index e444fd06bfa..028e92d5bce 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticsearch/BuildSearchIndexResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/elasticsearch/BuildSearchIndexResource.java @@ -1,27 +1,9 @@ package org.openmetadata.service.resources.elasticsearch; -import static org.openmetadata.schema.analytics.ReportData.ReportDataType.ENTITY_REPORT_DATA; -import static org.openmetadata.schema.analytics.ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA; -import static org.openmetadata.schema.analytics.ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA; -import static org.openmetadata.service.Entity.TABLE; -import static org.openmetadata.service.Entity.TEAM; - import io.swagger.annotations.Api; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.responses.ApiResponse; import java.io.IOException; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; import javax.validation.Valid; import javax.ws.rs.Consumes; import javax.ws.rs.GET; @@ -35,44 +17,20 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; import javax.ws.rs.core.UriInfo; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; -import org.openmetadata.schema.EntityInterface; -import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.api.CreateEventPublisherJob; import org.openmetadata.schema.api.CreateEventPublisherJob.RunMode; -import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.entity.teams.User; import org.openmetadata.schema.settings.EventPublisherJob; -import org.openmetadata.schema.settings.EventPublisherJob.Status; -import org.openmetadata.schema.settings.FailureDetails; -import org.openmetadata.schema.settings.Stats; -import org.openmetadata.schema.type.Include; -import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition; -import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory; -import org.openmetadata.service.elasticsearch.ReportDataIndexes; import org.openmetadata.service.jdbi3.CollectionDAO; -import org.openmetadata.service.jdbi3.EntityRepository; -import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.UserRepository; import org.openmetadata.service.resources.Collection; import org.openmetadata.service.security.Authorizer; import org.openmetadata.service.util.ElasticSearchClientUtils; -import org.openmetadata.service.util.EntityUtil; +import org.openmetadata.service.util.ElasticSearchIndexUtil; import org.openmetadata.service.util.JsonUtils; -import org.openmetadata.service.util.RestUtil; -import org.openmetadata.service.util.ResultList; @Path("/v1/indexResource") @Api(value = "Elastic Search Collection", tags = "Elastic Search Collection") @@ -85,39 +43,25 @@ public class BuildSearchIndexResource { public static final String ELASTIC_SEARCH_ENTITY_FQN_STREAM = "eventPublisher:ElasticSearch:STREAM"; public static final String ELASTIC_SEARCH_ENTITY_FQN_BATCH = "eventPublisher:ElasticSearch:BATCH"; private RestHighLevelClient client; - private ElasticSearchIndexDefinition elasticSearchIndexDefinition; private final CollectionDAO dao; private final Authorizer authorizer; - private final ExecutorService threadScheduler; private final UserRepository userRepository; + private ElasticSearchIndexUtil elasticSearchIndexUtil; public BuildSearchIndexResource(CollectionDAO dao, Authorizer authorizer) { this.dao = dao; this.userRepository = new UserRepository(dao); this.authorizer = authorizer; - this.threadScheduler = - new ThreadPoolExecutor( - 2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy()); } public void initialize(OpenMetadataApplicationConfig config) { if (config.getElasticSearchConfiguration() != null) { this.client = ElasticSearchClientUtils.createElasticSearchClient(config.getElasticSearchConfiguration()); - this.elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(client, dao); + ElasticSearchIndexDefinition elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(client, dao); + this.elasticSearchIndexUtil = new ElasticSearchIndexUtil(dao, client, elasticSearchIndexDefinition); } } - private BulkProcessor getBulkProcessor(BulkProcessorListener listener, int bulkSize, int flushIntervalInSeconds) { - BiConsumer> bulkConsumer = - (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); - BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener, "es-reindex"); - builder.setBulkActions(bulkSize); - builder.setConcurrentRequests(2); - builder.setFlushInterval(TimeValue.timeValueSeconds(flushIntervalInSeconds)); - builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); - return builder.build(); - } - @POST @Path("/reindex") @Operation( @@ -137,9 +81,9 @@ public class BuildSearchIndexResource { User user = userRepository.getByName(null, securityContext.getUserPrincipal().getName(), userRepository.getFields("id")); if (createRequest.getRunMode() == RunMode.BATCH) { - return startReindexingBatchMode(uriInfo, user.getId(), createRequest); + return elasticSearchIndexUtil.startReindexingBatchMode(uriInfo, user.getId(), createRequest); } else { - return startReindexingStreamMode(uriInfo, user.getId(), createRequest); + return Response.status(Response.Status.BAD_REQUEST).entity("Invalid Run Mode").build(); } } @@ -177,375 +121,4 @@ public class BuildSearchIndexResource { } return Response.status(Response.Status.NOT_FOUND).entity("No Last Run.").build(); } - - private synchronized Response startReindexingStreamMode( - UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) { - // create a new Job - threadScheduler.submit(() -> this.submitStreamJob(uriInfo, startedBy, createRequest)); - return Response.status(Response.Status.OK).entity("Reindexing Started").build(); - } - - private synchronized Response startReindexingBatchMode( - UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) { - // create a new Job - threadScheduler.submit( - () -> { - try { - this.submitBatchJob(uriInfo, startedBy, createRequest); - } catch (IOException e) { - LOG.error("Reindexing Batch Job error", e); - } - }); - return Response.status(Response.Status.OK).entity("Reindexing Started").build(); - } - - private synchronized void submitStreamJob(UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) { - try { - for (String entityName : createRequest.getEntities()) { - updateEntityStream(uriInfo, startedBy, entityName, createRequest); - } - - // Mark the Job end - String reindexJobString = - dao.entityExtensionTimeSeriesDao() - .getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION); - EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); - long lastUpdateTime = latestJob.getTimestamp(); - Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); - latestJob.setTimestamp(time); - latestJob.setEndTime(time); - if (latestJob.getFailureDetails() != null) { - latestJob.setStatus(Status.ACTIVE_WITH_ERROR); - } else { - latestJob.setStatus(EventPublisherJob.Status.ACTIVE); - } - dao.entityExtensionTimeSeriesDao() - .update( - ELASTIC_SEARCH_ENTITY_FQN_STREAM, - ELASTIC_SEARCH_EXTENSION, - JsonUtils.pojoToJson(latestJob), - lastUpdateTime); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private synchronized void submitBatchJob(UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) - throws IOException { - long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); - String recordString = - dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION); - EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class); - long originalLastUpdate = lastRecord.getTimestamp(); - lastRecord.setStatus(EventPublisherJob.Status.STARTING); - lastRecord.setStats(new Stats().withFailed(0).withTotal(0).withSuccess(0)); - lastRecord.setTimestamp(updateTime); - lastRecord.setEntities(createRequest.getEntities()); - dao.entityExtensionTimeSeriesDao() - .update( - ELASTIC_SEARCH_ENTITY_FQN_BATCH, - ELASTIC_SEARCH_EXTENSION, - JsonUtils.pojoToJson(lastRecord), - originalLastUpdate); - - // Update Listener for only Batch - BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(dao, startedBy); - BulkProcessor processor = - getBulkProcessor(bulkProcessorListener, createRequest.getBatchSize(), createRequest.getFlushIntervalInSec()); - - for (String entityName : createRequest.getEntities()) { - try { - updateEntityBatch(processor, bulkProcessorListener, uriInfo, entityName, createRequest); - } catch (Exception ex) { - LOG.error("Reindexing intermittent failure for entityType : {}", entityName, ex); - } - } - } - - public ResultList getReportDataPagination(String entityFQN, int limit, String before, String after) { - RestUtil.validateCursors(before, after); - int reportDataCount = dao.entityExtensionTimeSeriesDao().listCount(entityFQN); - List reportDataList; - if (before != null) { - reportDataList = - dao.entityExtensionTimeSeriesDao().getBeforeExtension(entityFQN, limit + 1, RestUtil.decodeCursor(before)); - } else { - reportDataList = - dao.entityExtensionTimeSeriesDao() - .getAfterExtension(entityFQN, limit + 1, after == null ? "" : RestUtil.decodeCursor(after)); - } - ResultList reportDataResultList; - if (before != null) { - reportDataResultList = getBeforeExtensionList(reportDataList, limit, reportDataCount); - } else { - reportDataResultList = getAfterExtensionList(reportDataList, after, limit, reportDataCount); - } - return reportDataResultList; - } - - private ResultList getBeforeExtensionList( - List reportDataRowList, int limit, int total) { - String beforeCursor = null; - String afterCursor; - if (reportDataRowList.size() > limit) { - reportDataRowList.remove(0); - beforeCursor = reportDataRowList.get(0).getRowNum(); - } - afterCursor = reportDataRowList.get(reportDataRowList.size() - 1).getRowNum(); - List reportDataList = new ArrayList<>(); - for (CollectionDAO.ReportDataRow reportDataRow : reportDataRowList) { - reportDataList.add(reportDataRow.getReportData()); - } - return getReportDataResultList(reportDataList, beforeCursor, afterCursor, total); - } - - private ResultList getAfterExtensionList( - List reportDataRowList, String after, int limit, int total) { - String beforeCursor; - String afterCursor = null; - beforeCursor = after == null ? null : reportDataRowList.get(0).getRowNum(); - if (reportDataRowList.size() > limit) { - reportDataRowList.remove(limit); - afterCursor = reportDataRowList.get(limit - 1).getRowNum(); - } - List reportDataList = new ArrayList<>(); - for (CollectionDAO.ReportDataRow reportDataRow : reportDataRowList) { - reportDataList.add(reportDataRow.getReportData()); - } - return getReportDataResultList(reportDataList, beforeCursor, afterCursor, total); - } - - private ResultList getReportDataResultList( - List queries, String before, String after, int total) { - return new ResultList<>(queries, before, after, total); - } - - private synchronized void fetchReportData( - String entityFQN, - CreateEventPublisherJob createRequest, - BulkProcessor processor, - BulkProcessorListener listener, - String entityType, - ElasticSearchIndexDefinition.ElasticSearchIndexType indexType) { - ResultList result; - String after = null; - try { - do { - result = getReportDataPagination(entityFQN, createRequest.getBatchSize(), null, after); - listener.addRequests(result.getPaging().getTotal()); - updateElasticSearchForDataInsightBatch(processor, indexType, entityType, result.getData()); - processor.flush(); - after = result.getPaging().getAfter(); - } while (after != null); - } catch (Exception ex) { - LOG.error("Failed in listing all Entities of type : {}, Reason : ", entityType, ex); - FailureDetails failureDetails = - new FailureDetails() - .withContext(String.format("%s:Failure in fetching Data", entityType)) - .withLastFailedReason( - String.format("Failed in listing all ReportData \n Reason : %s", ExceptionUtils.getStackTrace(ex))); - listener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, failureDetails, null); - } - } - - private synchronized void updateEntityBatch( - BulkProcessor processor, - BulkProcessorListener listener, - UriInfo uriInfo, - String entityType, - CreateEventPublisherJob createRequest) { - listener.allowTotalRequestUpdate(); - - ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = - ElasticSearchIndexDefinition.getIndexMappingByEntityType(entityType); - - if (Boolean.TRUE.equals(createRequest.getRecreateIndex())) { - // Delete index - elasticSearchIndexDefinition.deleteIndex(indexType); - // Create index - elasticSearchIndexDefinition.createIndex(indexType); - } - - // Start fetching a list of Report Data and pushing them to ES - if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.ENTITY_REPORT_DATA)) { - fetchReportData(String.valueOf(ENTITY_REPORT_DATA), createRequest, processor, listener, entityType, indexType); - } else if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA)) { - fetchReportData( - String.valueOf(WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA), - createRequest, - processor, - listener, - entityType, - indexType); - } else if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)) { - fetchReportData( - String.valueOf(WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA), - createRequest, - processor, - listener, - entityType, - indexType); - } else { - // Start fetching a list of Entities and pushing them to ES - EntityRepository entityRepository = Entity.getEntityRepository(entityType); - List allowedFields = entityRepository.getAllowedFields(); - String fields = String.join(",", allowedFields); - ResultList result; - String after = null; - try { - do { - if (entityType.equals(TEAM)) { - // just name and display name are needed - fields = "name,displayName"; - } - result = - entityRepository.listAfter( - uriInfo, - new EntityUtil.Fields(allowedFields, fields), - new ListFilter(Include.ALL), - createRequest.getBatchSize(), - after); - listener.addRequests(result.getPaging().getTotal()); - updateElasticSearchForEntityBatch(indexType, processor, entityType, result.getData()); - processor.flush(); - after = result.getPaging().getAfter(); - } while (after != null); - } catch (Exception ex) { - LOG.error("Failed in listing all Entities of type : {}, Reason : ", entityType, ex); - FailureDetails failureDetails = - new FailureDetails() - .withContext(String.format("%s:Failure in fetching Data", entityType)) - .withLastFailedReason( - String.format("Failed in listing all Entities \n Reason : %s", ExceptionUtils.getStackTrace(ex))); - listener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, failureDetails, null); - } - } - } - - private synchronized void updateEntityStream( - UriInfo uriInfo, UUID startedBy, String entityType, CreateEventPublisherJob createRequest) { - - ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = - ElasticSearchIndexDefinition.getIndexMappingByEntityType(entityType); - - if (Boolean.TRUE.equals(createRequest.getRecreateIndex())) { - // Delete index - elasticSearchIndexDefinition.deleteIndex(indexType); - // Create index - elasticSearchIndexDefinition.createIndex(indexType); - } - - // Start fetching a list of Entities and pushing them to ES - EntityRepository entityRepository = Entity.getEntityRepository(entityType); - List allowedFields = entityRepository.getAllowedFields(); - String fields = String.join(",", allowedFields); - ResultList result; - String after = null; - try { - do { - result = - entityRepository.listAfter( - uriInfo, - new EntityUtil.Fields(allowedFields, fields), - new ListFilter(Include.ALL), - createRequest.getBatchSize(), - after); - updateElasticSearchForEntityStream(entityType, result.getData()); - after = result.getPaging().getAfter(); - } while (after != null); - } catch (Exception ex) { - LOG.error("Failed in listing all Entities of type : {}, Reason {}", entityType, ex); - } - } - - private synchronized void updateElasticSearchForDataInsightBatch( - BulkProcessor bulkProcessor, - ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, - String entityType, - List entities) { - for (ReportData reportData : entities) { - UpdateRequest request = getUpdateRequest(indexType, entityType, reportData); - if (request != null) { - bulkProcessor.add(request); - } - } - } - - private synchronized void updateElasticSearchForEntityBatch( - ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, - BulkProcessor bulkProcessor, - String entityType, - List entities) { - for (EntityInterface entity : entities) { - if (entityType.equals(TABLE)) { - ((Table) entity).getColumns().forEach(table -> table.setProfile(null)); - } - UpdateRequest request = getUpdateRequest(indexType, entityType, entity); - if (request != null) { - bulkProcessor.add(request); - } - } - } - - private synchronized void updateElasticSearchForEntityStream(String entityType, List entities) - throws IOException { - String reindexJobString = - dao.entityExtensionTimeSeriesDao() - .getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION); - EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class); - Long lastUpdateTime = latestJob.getTimestamp(); - ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = - ElasticSearchIndexDefinition.getIndexMappingByEntityType(entityType); - for (EntityInterface entity : entities) { - if (entityType.equals(TABLE)) { - ((Table) entity).getColumns().forEach(table -> table.setProfile(null)); - } - FailureDetails failureDetails; - Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); - try { - client.update(getUpdateRequest(indexType, entityType, entity), RequestOptions.DEFAULT); - } catch (IOException ex) { - failureDetails = new FailureDetails().withLastFailedAt(time).withLastFailedReason(ex.getMessage()); - latestJob.setFailureDetails(failureDetails); - latestJob.setStatus(Status.ACTIVE_WITH_ERROR); - } - latestJob.setTimestamp(time); - dao.entityExtensionTimeSeriesDao() - .update( - ELASTIC_SEARCH_ENTITY_FQN_STREAM, - ELASTIC_SEARCH_EXTENSION, - JsonUtils.pojoToJson(latestJob), - lastUpdateTime); - lastUpdateTime = time; - } - } - - private UpdateRequest getUpdateRequest( - ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, String entityType, EntityInterface entity) { - try { - UpdateRequest updateRequest = new UpdateRequest(indexType.indexName, entity.getId().toString()); - updateRequest.doc( - JsonUtils.pojoToJson( - Objects.requireNonNull(ElasticSearchIndexFactory.buildIndex(entityType, entity)).buildESDoc()), - XContentType.JSON); - updateRequest.docAsUpsert(true); - return updateRequest; - } catch (Exception ex) { - LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", indexType, entityType, ex); - } - return null; - } - - private UpdateRequest getUpdateRequest( - ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, String entityType, ReportData reportData) { - try { - UpdateRequest updateRequest = new UpdateRequest(indexType.indexName, reportData.getId().toString()); - updateRequest.doc(JsonUtils.pojoToJson(new ReportDataIndexes(reportData).buildESDoc()), XContentType.JSON); - updateRequest.docAsUpsert(true); - return updateRequest; - } catch (Exception ex) { - LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", indexType, entityType, ex); - } - return null; - } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/ElasticSearchIndexUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/ElasticSearchIndexUtil.java new file mode 100644 index 00000000000..1a8e9c883e9 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/ElasticSearchIndexUtil.java @@ -0,0 +1,347 @@ +package org.openmetadata.service.util; + +import static org.openmetadata.schema.analytics.ReportData.ReportDataType.ENTITY_REPORT_DATA; +import static org.openmetadata.schema.analytics.ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA; +import static org.openmetadata.schema.analytics.ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA; +import static org.openmetadata.service.Entity.TABLE; +import static org.openmetadata.service.Entity.TEAM; +import static org.openmetadata.service.resources.elasticsearch.BuildSearchIndexResource.ELASTIC_SEARCH_ENTITY_FQN_BATCH; +import static org.openmetadata.service.resources.elasticsearch.BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.analytics.ReportData; +import org.openmetadata.schema.api.CreateEventPublisherJob; +import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.settings.EventPublisherJob; +import org.openmetadata.schema.settings.FailureDetails; +import org.openmetadata.schema.settings.Stats; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.Entity; +import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition; +import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory; +import org.openmetadata.service.elasticsearch.ReportDataIndexes; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.EntityRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.resources.elasticsearch.BulkProcessorListener; + +@Slf4j +public class ElasticSearchIndexUtil { + + private final CollectionDAO dao; + private final ExecutorService threadScheduler; + private final RestHighLevelClient client; + private final ElasticSearchIndexDefinition elasticSearchIndexDefinition; + + public ElasticSearchIndexUtil( + CollectionDAO dao, RestHighLevelClient client, ElasticSearchIndexDefinition elasticSearchIndexDefinition) { + this.dao = dao; + this.client = client; + this.elasticSearchIndexDefinition = elasticSearchIndexDefinition; + this.threadScheduler = + new ThreadPoolExecutor( + 2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + private BulkProcessor getBulkProcessor(BulkProcessorListener listener, int bulkSize, int flushIntervalInSeconds) { + BiConsumer> bulkConsumer = + (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); + BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener, "es-reindex"); + builder.setBulkActions(bulkSize); + builder.setConcurrentRequests(2); + builder.setFlushInterval(TimeValue.timeValueSeconds(flushIntervalInSeconds)); + builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); + return builder.build(); + } + + public synchronized Response startReindexingBatchMode( + UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) { + // create a new Job + threadScheduler.submit( + () -> { + try { + this.submitBatchJob(uriInfo, startedBy, createRequest); + } catch (IOException e) { + LOG.error("Reindexing Batch Job error", e); + } + }); + return Response.status(Response.Status.OK).entity("Reindexing Started").build(); + } + + private synchronized void submitBatchJob(UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) + throws IOException { + long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + String recordString = + dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION); + EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class); + long originalLastUpdate = lastRecord.getTimestamp(); + lastRecord.setStatus(EventPublisherJob.Status.STARTING); + lastRecord.setStats(new Stats().withFailed(0).withTotal(0).withSuccess(0)); + lastRecord.setTimestamp(updateTime); + lastRecord.setEntities(createRequest.getEntities()); + dao.entityExtensionTimeSeriesDao() + .update( + ELASTIC_SEARCH_ENTITY_FQN_BATCH, + ELASTIC_SEARCH_EXTENSION, + JsonUtils.pojoToJson(lastRecord), + originalLastUpdate); + + // Update Listener for only Batch + BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(dao, startedBy); + BulkProcessor processor = + getBulkProcessor(bulkProcessorListener, createRequest.getBatchSize(), createRequest.getFlushIntervalInSec()); + + for (String entityName : createRequest.getEntities()) { + try { + updateEntityBatch(processor, bulkProcessorListener, uriInfo, entityName, createRequest); + } catch (Exception ex) { + LOG.error("Reindexing intermittent failure for entityType : {}", entityName, ex); + } + } + } + + public ResultList getReportDataPagination(String entityFQN, int limit, String before, String after) { + RestUtil.validateCursors(before, after); + int reportDataCount = dao.entityExtensionTimeSeriesDao().listCount(entityFQN); + List reportDataList; + if (before != null) { + reportDataList = + dao.entityExtensionTimeSeriesDao().getBeforeExtension(entityFQN, limit + 1, RestUtil.decodeCursor(before)); + } else { + reportDataList = + dao.entityExtensionTimeSeriesDao() + .getAfterExtension(entityFQN, limit + 1, after == null ? "0" : RestUtil.decodeCursor(after)); + } + ResultList reportDataResultList; + if (before != null) { + reportDataResultList = getBeforeExtensionList(reportDataList, limit, reportDataCount); + } else { + reportDataResultList = getAfterExtensionList(reportDataList, after, limit, reportDataCount); + } + return reportDataResultList; + } + + private ResultList getBeforeExtensionList( + List reportDataRowList, int limit, int total) { + String beforeCursor = null; + String afterCursor; + if (reportDataRowList.size() > limit) { + reportDataRowList.remove(0); + beforeCursor = reportDataRowList.get(0).getRowNum(); + } + afterCursor = reportDataRowList.get(reportDataRowList.size() - 1).getRowNum(); + List reportDataList = new ArrayList<>(); + for (CollectionDAO.ReportDataRow reportDataRow : reportDataRowList) { + reportDataList.add(reportDataRow.getReportData()); + } + return getReportDataResultList(reportDataList, beforeCursor, afterCursor, total); + } + + private ResultList getAfterExtensionList( + List reportDataRowList, String after, int limit, int total) { + String beforeCursor; + String afterCursor = null; + beforeCursor = after == null ? null : reportDataRowList.get(0).getRowNum(); + if (reportDataRowList.size() > limit) { + reportDataRowList.remove(limit); + afterCursor = reportDataRowList.get(limit - 1).getRowNum(); + } + List reportDataList = new ArrayList<>(); + for (CollectionDAO.ReportDataRow reportDataRow : reportDataRowList) { + reportDataList.add(reportDataRow.getReportData()); + } + return getReportDataResultList(reportDataList, beforeCursor, afterCursor, total); + } + + private ResultList getReportDataResultList( + List queries, String before, String after, int total) { + return new ResultList<>(queries, before, after, total); + } + + private synchronized void fetchReportData( + String entityFQN, + CreateEventPublisherJob createRequest, + BulkProcessor processor, + BulkProcessorListener listener, + String entityType, + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType) { + ResultList result; + String after = null; + try { + do { + result = getReportDataPagination(entityFQN, createRequest.getBatchSize(), null, after); + listener.addRequests(result.getPaging().getTotal()); + updateElasticSearchForDataInsightBatch(processor, indexType, entityType, result.getData()); + processor.flush(); + after = result.getPaging().getAfter(); + } while (after != null); + } catch (Exception ex) { + LOG.error("Failed in listing all Entities of type : {}, Reason : ", entityType, ex); + FailureDetails failureDetails = + new FailureDetails() + .withContext(String.format("%s:Failure in fetching Data", entityType)) + .withLastFailedReason( + String.format("Failed in listing all ReportData \n Reason : %s", ExceptionUtils.getStackTrace(ex))); + listener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, failureDetails, null); + } + } + + private synchronized void updateEntityBatch( + BulkProcessor processor, + BulkProcessorListener listener, + UriInfo uriInfo, + String entityType, + CreateEventPublisherJob createRequest) { + listener.allowTotalRequestUpdate(); + + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = + ElasticSearchIndexDefinition.getIndexMappingByEntityType(entityType); + + if (Boolean.TRUE.equals(createRequest.getRecreateIndex())) { + // Delete index + elasticSearchIndexDefinition.deleteIndex(indexType); + // Create index + elasticSearchIndexDefinition.createIndex(indexType); + } + + // Start fetching a list of Report Data and pushing them to ES + if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.ENTITY_REPORT_DATA)) { + fetchReportData(String.valueOf(ENTITY_REPORT_DATA), createRequest, processor, listener, entityType, indexType); + } else if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA)) { + fetchReportData( + String.valueOf(WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA), + createRequest, + processor, + listener, + entityType, + indexType); + } else if (entityType.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)) { + fetchReportData( + String.valueOf(WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA), + createRequest, + processor, + listener, + entityType, + indexType); + } else { + // Start fetching a list of Entities and pushing them to ES + EntityRepository entityRepository = Entity.getEntityRepository(entityType); + List allowedFields = entityRepository.getAllowedFields(); + String fields = String.join(",", allowedFields); + ResultList result; + String after = null; + try { + do { + if (entityType.equals(TEAM)) { + // just name and display name are needed + fields = "name,displayName"; + } + result = + entityRepository.listAfter( + uriInfo, + new EntityUtil.Fields(allowedFields, fields), + new ListFilter(Include.ALL), + createRequest.getBatchSize(), + after); + listener.addRequests(result.getPaging().getTotal()); + updateElasticSearchForEntityBatch(indexType, processor, entityType, result.getData()); + processor.flush(); + after = result.getPaging().getAfter(); + } while (after != null); + } catch (Exception ex) { + LOG.error("Failed in listing all Entities of type : {}, Reason : ", entityType, ex); + FailureDetails failureDetails = + new FailureDetails() + .withContext(String.format("%s:Failure in fetching Data", entityType)) + .withLastFailedReason( + String.format("Failed in listing all Entities \n Reason : %s", ExceptionUtils.getStackTrace(ex))); + listener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, failureDetails, null); + } + } + } + + private synchronized void updateElasticSearchForDataInsightBatch( + BulkProcessor bulkProcessor, + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, + String entityType, + List entities) { + for (ReportData reportData : entities) { + UpdateRequest request = getUpdateRequest(indexType, entityType, reportData); + if (request != null) { + bulkProcessor.add(request); + } + } + } + + private synchronized void updateElasticSearchForEntityBatch( + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, + BulkProcessor bulkProcessor, + String entityType, + List entities) { + for (EntityInterface entity : entities) { + if (entityType.equals(TABLE)) { + ((Table) entity).getColumns().forEach(table -> table.setProfile(null)); + } + UpdateRequest request = getUpdateRequest(indexType, entityType, entity); + if (request != null) { + bulkProcessor.add(request); + } + } + } + + private UpdateRequest getUpdateRequest( + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, String entityType, EntityInterface entity) { + try { + UpdateRequest updateRequest = new UpdateRequest(indexType.indexName, entity.getId().toString()); + updateRequest.doc( + JsonUtils.pojoToJson( + Objects.requireNonNull(ElasticSearchIndexFactory.buildIndex(entityType, entity)).buildESDoc()), + XContentType.JSON); + updateRequest.docAsUpsert(true); + return updateRequest; + } catch (Exception ex) { + LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", indexType, entityType, ex); + } + return null; + } + + private UpdateRequest getUpdateRequest( + ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, String entityType, ReportData reportData) { + try { + UpdateRequest updateRequest = new UpdateRequest(indexType.indexName, reportData.getId().toString()); + updateRequest.doc(JsonUtils.pojoToJson(new ReportDataIndexes(reportData).buildESDoc()), XContentType.JSON); + updateRequest.docAsUpsert(true); + return updateRequest; + } catch (Exception ex) { + LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", indexType, entityType, ex); + } + return null; + } +} diff --git a/openmetadata-ui/src/main/resources/ui/src/constants/elasticsearch.constant.ts b/openmetadata-ui/src/main/resources/ui/src/constants/elasticsearch.constant.ts index 7475a866cd3..5590890135f 100644 --- a/openmetadata-ui/src/main/resources/ui/src/constants/elasticsearch.constant.ts +++ b/openmetadata-ui/src/main/resources/ui/src/constants/elasticsearch.constant.ts @@ -55,11 +55,11 @@ export const ELASTIC_SEARCH_INDEX_ENTITIES = [ label: t('label.data-assets-report'), }, { - value: 'webAnalyticEntityViewReport', + value: 'webAnalyticEntityViewReportData', label: t('label.web-analytics-report'), }, { - value: 'webAnalyticUserActivityReport', + value: 'webAnalyticUserActivityReportData', label: t('label.user-analytics-report'), }, ]; @@ -76,8 +76,8 @@ export const ELASTIC_SEARCH_INITIAL_VALUES = { 'glossaryTerm', 'tag', 'entityReportData', - 'webAnalyticEntityViewReport', - 'webAnalyticUserActivityReport', + 'webAnalyticEntityViewReportData', + 'webAnalyticUserActivityReportData', ], batchSize: 100, flushIntervalInSec: 30,