diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchEventPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchEventPublisher.java index 0c40708133a..269c8955087 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchEventPublisher.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchEventPublisher.java @@ -32,14 +32,19 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.query.BoolQueryBuilder; @@ -50,6 +55,8 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.openmetadata.schema.api.CreateEventPublisherJob; import org.openmetadata.schema.entity.classification.Classification; import org.openmetadata.schema.entity.classification.Tag; @@ -78,6 +85,7 @@ import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EventType; import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.schema.type.TagLabel; import org.openmetadata.schema.type.UsageDetails; import org.openmetadata.service.Entity; import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType; @@ -571,10 +579,71 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { DeleteRequest deleteRequest = new DeleteRequest(ElasticSearchIndexType.TAG_SEARCH_INDEX.indexName, event.getEntityId().toString()); deleteEntityFromElasticSearch(deleteRequest); - break; + + String[] indexes = + new String[] { + ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName, + ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName, + ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName, + ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName, + ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName, + ElasticSearchIndexType.MLMODEL_SEARCH_INDEX.indexName + }; + BulkRequest request = new BulkRequest(); + SearchRequest searchRequest; + SearchResponse response; + int batchSize = 50; + int totalHits; + int currentHits = 0; + + do { + searchRequest = + searchRequest(indexes, "tags.tagFQN", event.getEntityFullyQualifiedName(), batchSize, currentHits); + response = client.search(searchRequest, RequestOptions.DEFAULT); + totalHits = (int) response.getHits().getTotalHits().value; + for (SearchHit hit : response.getHits()) { + Map sourceAsMap = hit.getSourceAsMap(); + List listTags = (List) sourceAsMap.get("tags"); + Script script = generateTagScript(listTags); + if (!script.toString().isEmpty()) { + request.add( + updateRequests(sourceAsMap.get("entityType").toString(), sourceAsMap.get("id").toString(), script)); + } + } + currentHits += response.getHits().getHits().length; + } while (currentHits < totalHits); + client.bulk(request, RequestOptions.DEFAULT); } } + private SearchRequest searchRequest(String[] indexes, String field, String value, int batchSize, int from) { + SearchRequest searchRequest = new SearchRequest(indexes); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.matchQuery(field, value)); + searchSourceBuilder.from(from); + searchSourceBuilder.size(batchSize); + searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); + searchRequest.source(searchSourceBuilder); + return searchRequest; + } + + private Script generateTagScript(List listTags) { + StringBuilder scriptTxt = new StringBuilder(); + Map fieldRemoveParams = new HashMap<>(); + fieldRemoveParams.put("tags", listTags); + scriptTxt.append("ctx._source.tags=params.tags;"); + scriptTxt.append("ctx._source.tags.removeAll(params.tags);"); + fieldRemoveParams.put("tags", listTags); + return new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptTxt.toString(), fieldRemoveParams); + } + + private UpdateRequest updateRequests(String entityType, String entityId, Script script) { + UpdateRequest updateRequest = + new UpdateRequest(ElasticSearchIndexDefinition.ENTITY_TYPE_TO_INDEX_MAP.get(entityType), entityId) + .script(script); + return updateRequest; + } + private void updateDatabase(ChangeEvent event) throws IOException { if (event.getEventType() == EventType.ENTITY_DELETED) { Database database = (Database) event.getEntity(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java index ae314643dc6..785ca3c271b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/ElasticSearchIndexDefinition.java @@ -11,6 +11,7 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Date; import java.util.EnumMap; +import java.util.HashMap; import java.util.List; import lombok.Builder; import lombok.Getter; @@ -44,8 +45,17 @@ public class ElasticSearchIndexDefinition { private final CollectionDAO dao; final EnumMap elasticSearchIndexes = new EnumMap<>(ElasticSearchIndexType.class); + + public static final HashMap ENTITY_TYPE_TO_INDEX_MAP; private final RestHighLevelClient client; + static { + ENTITY_TYPE_TO_INDEX_MAP = new HashMap<>(); + for (ElasticSearchIndexType elasticSearchIndexType : ElasticSearchIndexType.values()) { + ENTITY_TYPE_TO_INDEX_MAP.put(elasticSearchIndexType.entityType, elasticSearchIndexType.indexName); + } + } + public ElasticSearchIndexDefinition(RestHighLevelClient client, CollectionDAO dao) { this.dao = dao; this.client = client; @@ -61,26 +71,33 @@ public class ElasticSearchIndexDefinition { } public enum ElasticSearchIndexType { - TABLE_SEARCH_INDEX("table_search_index", "/elasticsearch/%s/table_index_mapping.json"), - TOPIC_SEARCH_INDEX("topic_search_index", "/elasticsearch/%s/topic_index_mapping.json"), - DASHBOARD_SEARCH_INDEX("dashboard_search_index", "/elasticsearch/%s/dashboard_index_mapping.json"), - PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/%s/pipeline_index_mapping.json"), - USER_SEARCH_INDEX("user_search_index", "/elasticsearch/%s/user_index_mapping.json"), - TEAM_SEARCH_INDEX("team_search_index", "/elasticsearch/%s/team_index_mapping.json"), - GLOSSARY_SEARCH_INDEX("glossary_search_index", "/elasticsearch/%s/glossary_index_mapping.json"), - MLMODEL_SEARCH_INDEX("mlmodel_search_index", "/elasticsearch/%s/mlmodel_index_mapping.json"), - TAG_SEARCH_INDEX("tag_search_index", "/elasticsearch/%s/tag_index_mapping.json"), - ENTITY_REPORT_DATA_INDEX("entity_report_data_index", "/elasticsearch/entity_report_data_index.json"), + TABLE_SEARCH_INDEX(Entity.TABLE, "table_search_index", "/elasticsearch/%s/table_index_mapping.json"), + TOPIC_SEARCH_INDEX(Entity.TOPIC, "topic_search_index", "/elasticsearch/%s/topic_index_mapping.json"), + DASHBOARD_SEARCH_INDEX( + Entity.DASHBOARD, "dashboard_search_index", "/elasticsearch/%s/dashboard_index_mapping.json"), + PIPELINE_SEARCH_INDEX(Entity.PIPELINE, "pipeline_search_index", "/elasticsearch/%s/pipeline_index_mapping.json"), + USER_SEARCH_INDEX(Entity.USER, "user_search_index", "/elasticsearch/%s/user_index_mapping.json"), + TEAM_SEARCH_INDEX(Entity.TEAM, "team_search_index", "/elasticsearch/%s/team_index_mapping.json"), + GLOSSARY_SEARCH_INDEX(Entity.GLOSSARY, "glossary_search_index", "/elasticsearch/%s/glossary_index_mapping.json"), + MLMODEL_SEARCH_INDEX(Entity.MLMODEL, "mlmodel_search_index", "/elasticsearch/%s/mlmodel_index_mapping.json"), + TAG_SEARCH_INDEX(Entity.TAG, "tag_search_index", "/elasticsearch/%s/tag_index_mapping.json"), + ENTITY_REPORT_DATA_INDEX( + ENTITY_REPORT_DATA, "entity_report_data_index", "/elasticsearch/entity_report_data_index.json"), WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA_INDEX( - "web_analytic_entity_view_report_data_index", "/elasticsearch/web_analytic_entity_view_report_data_index.json"), + Entity.WEB_ANALYTIC_EVENT, + "web_analytic_entity_view_report_data_index", + "/elasticsearch/web_analytic_entity_view_report_data_index.json"), WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA_INDEX( + WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA, "web_analytic_user_activity_report_data_index", "/elasticsearch/web_analytic_user_activity_report_data_index.json"); public final String indexName; public final String indexMappingFile; + public final String entityType; - ElasticSearchIndexType(String indexName, String indexMappingFile) { + ElasticSearchIndexType(String entityType, String indexName, String indexMappingFile) { + this.entityType = entityType; this.indexName = indexName; this.indexMappingFile = indexMappingFile; }