Es update issue fix (#10289)

* elastic search update issue fix ..

* solved bug for searchSourceBuilder size

* added search request with batch size 50 per request

---------

Co-authored-by: Himank Mehta <himankmehta@Himanks-MacBook-Air.local>
This commit is contained in:
07Himank 2023-02-27 11:57:42 +05:30 committed by GitHub
parent d21fcad203
commit 3e7f890b3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 99 additions and 13 deletions

View File

@ -32,14 +32,19 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.query.BoolQueryBuilder;
@ -50,6 +55,8 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.entity.classification.Classification;
import org.openmetadata.schema.entity.classification.Tag;
@ -78,6 +85,7 @@ import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.UsageDetails;
import org.openmetadata.service.Entity;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType;
@ -571,10 +579,71 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
DeleteRequest deleteRequest =
new DeleteRequest(ElasticSearchIndexType.TAG_SEARCH_INDEX.indexName, event.getEntityId().toString());
deleteEntityFromElasticSearch(deleteRequest);
break;
String[] indexes =
new String[] {
ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName,
ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName,
ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName,
ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName,
ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName,
ElasticSearchIndexType.MLMODEL_SEARCH_INDEX.indexName
};
BulkRequest request = new BulkRequest();
SearchRequest searchRequest;
SearchResponse response;
int batchSize = 50;
int totalHits;
int currentHits = 0;
do {
searchRequest =
searchRequest(indexes, "tags.tagFQN", event.getEntityFullyQualifiedName(), batchSize, currentHits);
response = client.search(searchRequest, RequestOptions.DEFAULT);
totalHits = (int) response.getHits().getTotalHits().value;
for (SearchHit hit : response.getHits()) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
List<TagLabel> listTags = (List<TagLabel>) sourceAsMap.get("tags");
Script script = generateTagScript(listTags);
if (!script.toString().isEmpty()) {
request.add(
updateRequests(sourceAsMap.get("entityType").toString(), sourceAsMap.get("id").toString(), script));
}
}
currentHits += response.getHits().getHits().length;
} while (currentHits < totalHits);
client.bulk(request, RequestOptions.DEFAULT);
}
}
private SearchRequest searchRequest(String[] indexes, String field, String value, int batchSize, int from) {
SearchRequest searchRequest = new SearchRequest(indexes);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery(field, value));
searchSourceBuilder.from(from);
searchSourceBuilder.size(batchSize);
searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(searchSourceBuilder);
return searchRequest;
}
private Script generateTagScript(List<TagLabel> listTags) {
StringBuilder scriptTxt = new StringBuilder();
Map<String, Object> fieldRemoveParams = new HashMap<>();
fieldRemoveParams.put("tags", listTags);
scriptTxt.append("ctx._source.tags=params.tags;");
scriptTxt.append("ctx._source.tags.removeAll(params.tags);");
fieldRemoveParams.put("tags", listTags);
return new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptTxt.toString(), fieldRemoveParams);
}
private UpdateRequest updateRequests(String entityType, String entityId, Script script) {
UpdateRequest updateRequest =
new UpdateRequest(ElasticSearchIndexDefinition.ENTITY_TYPE_TO_INDEX_MAP.get(entityType), entityId)
.script(script);
return updateRequest;
}
private void updateDatabase(ChangeEvent event) throws IOException {
if (event.getEventType() == EventType.ENTITY_DELETED) {
Database database = (Database) event.getEntity();

View File

@ -11,6 +11,7 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import lombok.Builder;
import lombok.Getter;
@ -44,8 +45,17 @@ public class ElasticSearchIndexDefinition {
private final CollectionDAO dao;
final EnumMap<ElasticSearchIndexType, ElasticSearchIndexStatus> elasticSearchIndexes =
new EnumMap<>(ElasticSearchIndexType.class);
public static final HashMap<String, String> ENTITY_TYPE_TO_INDEX_MAP;
private final RestHighLevelClient client;
static {
ENTITY_TYPE_TO_INDEX_MAP = new HashMap<>();
for (ElasticSearchIndexType elasticSearchIndexType : ElasticSearchIndexType.values()) {
ENTITY_TYPE_TO_INDEX_MAP.put(elasticSearchIndexType.entityType, elasticSearchIndexType.indexName);
}
}
public ElasticSearchIndexDefinition(RestHighLevelClient client, CollectionDAO dao) {
this.dao = dao;
this.client = client;
@ -61,26 +71,33 @@ public class ElasticSearchIndexDefinition {
}
public enum ElasticSearchIndexType {
TABLE_SEARCH_INDEX("table_search_index", "/elasticsearch/%s/table_index_mapping.json"),
TOPIC_SEARCH_INDEX("topic_search_index", "/elasticsearch/%s/topic_index_mapping.json"),
DASHBOARD_SEARCH_INDEX("dashboard_search_index", "/elasticsearch/%s/dashboard_index_mapping.json"),
PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/%s/pipeline_index_mapping.json"),
USER_SEARCH_INDEX("user_search_index", "/elasticsearch/%s/user_index_mapping.json"),
TEAM_SEARCH_INDEX("team_search_index", "/elasticsearch/%s/team_index_mapping.json"),
GLOSSARY_SEARCH_INDEX("glossary_search_index", "/elasticsearch/%s/glossary_index_mapping.json"),
MLMODEL_SEARCH_INDEX("mlmodel_search_index", "/elasticsearch/%s/mlmodel_index_mapping.json"),
TAG_SEARCH_INDEX("tag_search_index", "/elasticsearch/%s/tag_index_mapping.json"),
ENTITY_REPORT_DATA_INDEX("entity_report_data_index", "/elasticsearch/entity_report_data_index.json"),
TABLE_SEARCH_INDEX(Entity.TABLE, "table_search_index", "/elasticsearch/%s/table_index_mapping.json"),
TOPIC_SEARCH_INDEX(Entity.TOPIC, "topic_search_index", "/elasticsearch/%s/topic_index_mapping.json"),
DASHBOARD_SEARCH_INDEX(
Entity.DASHBOARD, "dashboard_search_index", "/elasticsearch/%s/dashboard_index_mapping.json"),
PIPELINE_SEARCH_INDEX(Entity.PIPELINE, "pipeline_search_index", "/elasticsearch/%s/pipeline_index_mapping.json"),
USER_SEARCH_INDEX(Entity.USER, "user_search_index", "/elasticsearch/%s/user_index_mapping.json"),
TEAM_SEARCH_INDEX(Entity.TEAM, "team_search_index", "/elasticsearch/%s/team_index_mapping.json"),
GLOSSARY_SEARCH_INDEX(Entity.GLOSSARY, "glossary_search_index", "/elasticsearch/%s/glossary_index_mapping.json"),
MLMODEL_SEARCH_INDEX(Entity.MLMODEL, "mlmodel_search_index", "/elasticsearch/%s/mlmodel_index_mapping.json"),
TAG_SEARCH_INDEX(Entity.TAG, "tag_search_index", "/elasticsearch/%s/tag_index_mapping.json"),
ENTITY_REPORT_DATA_INDEX(
ENTITY_REPORT_DATA, "entity_report_data_index", "/elasticsearch/entity_report_data_index.json"),
WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA_INDEX(
"web_analytic_entity_view_report_data_index", "/elasticsearch/web_analytic_entity_view_report_data_index.json"),
Entity.WEB_ANALYTIC_EVENT,
"web_analytic_entity_view_report_data_index",
"/elasticsearch/web_analytic_entity_view_report_data_index.json"),
WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA_INDEX(
WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA,
"web_analytic_user_activity_report_data_index",
"/elasticsearch/web_analytic_user_activity_report_data_index.json");
public final String indexName;
public final String indexMappingFile;
public final String entityType;
ElasticSearchIndexType(String indexName, String indexMappingFile) {
ElasticSearchIndexType(String entityType, String indexName, String indexMappingFile) {
this.entityType = entityType;
this.indexName = indexName;
this.indexMappingFile = indexMappingFile;
}