diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java index a09457a9605..c105d0c4551 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java @@ -6,6 +6,7 @@ import static org.openmetadata.service.search.SearchClient.ADD_UPDATE_ENTITY_REL import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import es.co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; import es.co.elastic.clients.elasticsearch.ElasticsearchClient; import es.co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure; import es.co.elastic.clients.elasticsearch._types.ElasticsearchException; @@ -30,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; @@ -47,11 +49,17 @@ import org.openmetadata.service.search.EntityManagementClient; public class ElasticSearchEntityManager implements EntityManagementClient { private final ElasticsearchClient client; private final boolean isClientAvailable; + private ElasticsearchAsyncClient asyncClient; + private final boolean isAsyncClientAvailable; private final ObjectMapper objectMapper = new ObjectMapper(); public ElasticSearchEntityManager(ElasticsearchClient client) { this.client = client; this.isClientAvailable = client != null; + if (this.isClientAvailable) { + this.asyncClient = new ElasticsearchAsyncClient(this.client._transport()); + } + this.isAsyncClientAvailable = this.asyncClient != null; } @Override @@ -82,14 +90,15 @@ public class ElasticSearchEntityManager implements EntityManagementClient { @Override public void createEntities(String indexName, List> docsAndIds) { - if (!isClientAvailable) { - LOG.error("ElasticSearch client is not available. Cannot create entities."); + if (!isAsyncClientAvailable) { + LOG.error("ElasticSearch async client is not available. Cannot create entities."); return; } try { List operations = new ArrayList<>(); for (Map docAndId : docsAndIds) { + if (docAndId == null || docAndId.isEmpty()) continue; // skip invalid entries Map.Entry entry = docAndId.entrySet().iterator().next(); operations.add( BulkOperation.of( @@ -101,19 +110,39 @@ public class ElasticSearchEntityManager implements EntityManagementClient { .document(toJsonData(entry.getValue()))))); } - BulkResponse response = - client.bulk(b -> b.index(indexName).operations(operations).refresh(Refresh.True)); + // Execute the async bulk request + CompletableFuture future = + asyncClient.bulk(b -> b.index(indexName).operations(operations).refresh(Refresh.True)); + + // Handle response asynchronously + future.whenComplete( + (response, error) -> { + if (error != null) { + LOG.error("Failed to create entities in ElasticSearch (async)", error); + return; + } + + if (response.errors()) { + LOG.error( + "Bulk indexing to ElasticSearch encountered errors. Index: {}, Total: {}, Failed: {}", + indexName, + docsAndIds.size(), + response.items().stream().filter(item -> item.error() != null).count()); + + response.items().stream() + .filter(item -> item.error() != null) + .forEach( + item -> + LOG.error( + "Indexing failed for ID {}: {}", item.id(), item.error().reason())); + } else { + LOG.info( + "Successfully indexed {} entities to ElasticSearch (async) for index: {}", + docsAndIds.size(), + indexName); + } + }); - if (response.errors()) { - String errorMessage = - response.items().stream() - .filter(item -> item.error() != null) - .map(item -> "Failed to index document " + item.id() + ": " + item.error().reason()) - .collect(Collectors.joining("; ")); - LOG.error("Failed to create entities in ElasticSearch: {}", errorMessage); - } else { - LOG.info("Successfully created {} entities in ElasticSearch", docsAndIds.size()); - } } catch (Exception e) { LOG.error("Failed to create entities in ElasticSearch for index: {} ", indexName, e); } @@ -271,15 +300,7 @@ public class ElasticSearchEntityManager implements EntityManagementClient { inline .lang(ScriptLanguage.Painless) .source(scriptTxt) - .params( - params.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> - JsonData.of( - entry - .getValue())))))))) + .params(convertToJsonDataMap(params)))))) .refresh(true)); LOG.info( @@ -514,7 +535,7 @@ public class ElasticSearchEntityManager implements EntityManagementClient { g.index(Entity.getSearchRepository().getIndexOrAliasName(indexName)).id(entityId), Map.class); - if (response.found()) { + if (response != null && response.found()) { return Response.status(Response.Status.OK).entity(response.source()).build(); } } catch (ElasticsearchException e) { @@ -540,7 +561,10 @@ public class ElasticSearchEntityManager implements EntityManagementClient { try { Map params = - Collections.singletonMap("entityRelationshipData", JsonData.of(entityRelationshipData)); + entityRelationshipData != null + ? Collections.singletonMap( + "entityRelationshipData", JsonData.of(entityRelationshipData)) + : new HashMap<>(); UpdateByQueryResponse response = client.updateByQuery( @@ -609,6 +633,7 @@ public class ElasticSearchEntityManager implements EntityManagementClient { private Map convertToJsonDataMap(Map map) { return JsonUtils.getMap(map).entrySet().stream() + .filter(entry -> entry.getValue() != null) .collect(Collectors.toMap(Map.Entry::getKey, entry -> JsonData.of(entry.getValue()))); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java index 11d9ad46df0..63c08514aa3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java @@ -15,6 +15,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; @@ -24,6 +25,7 @@ import org.openmetadata.sdk.exception.SearchIndexNotFoundException; import org.openmetadata.service.Entity; import org.openmetadata.service.search.EntityManagementClient; import os.org.opensearch.client.json.JsonData; +import os.org.opensearch.client.opensearch.OpenSearchAsyncClient; import os.org.opensearch.client.opensearch.OpenSearchClient; import os.org.opensearch.client.opensearch._types.BulkIndexByScrollFailure; import os.org.opensearch.client.opensearch._types.FieldValue; @@ -48,11 +50,17 @@ import os.org.opensearch.client.opensearch.core.bulk.BulkOperation; public class OpenSearchEntityManager implements EntityManagementClient { private final OpenSearchClient client; private final boolean isClientAvailable; + private OpenSearchAsyncClient asyncClient; + private final boolean isAsyncClientAvailable; private final ObjectMapper objectMapper = new ObjectMapper(); public OpenSearchEntityManager(OpenSearchClient client) { this.client = client; this.isClientAvailable = client != null; + if (this.isClientAvailable) { + this.asyncClient = new OpenSearchAsyncClient(this.client._transport()); + } + this.isAsyncClientAvailable = this.asyncClient != null; } @Override @@ -81,14 +89,15 @@ public class OpenSearchEntityManager implements EntityManagementClient { @Override public void createEntities(String indexName, List> docsAndIds) { - if (!isClientAvailable) { - LOG.error("OpenSearch client is not available. Cannot create entities."); + if (!isAsyncClientAvailable) { + LOG.error("OpenSearch async client is not available. Cannot create entities."); return; } try { List operations = new ArrayList<>(); for (Map docAndId : docsAndIds) { + if (docAndId == null || docAndId.isEmpty()) continue; // skip invalid entries Map.Entry entry = docAndId.entrySet().iterator().next(); operations.add( BulkOperation.of( @@ -101,29 +110,36 @@ public class OpenSearchEntityManager implements EntityManagementClient { } BulkRequest bulkRequest = BulkRequest.of(b -> b.operations(operations).refresh(Refresh.True)); - BulkResponse bulkResponse = client.bulk(bulkRequest); + // Async call using OpenSearchAsyncClient + CompletableFuture future = asyncClient.bulk(bulkRequest); - if (bulkResponse.errors()) { - LOG.error( - "Bulk indexing to OpenSearch has errors for index: {}. Total requests: {}, Errors: {}", - indexName, - docsAndIds.size(), - bulkResponse.items().stream().filter(item -> item.error() != null).count()); - bulkResponse - .items() - .forEach( - item -> { - if (item.error() != null) { - LOG.error( - "Bulk indexing error for id {}: {}", item.id(), item.error().reason()); - } - }); - } else { - LOG.info( - "Successfully indexed {} entities to OpenSearch for index: {}", - docsAndIds.size(), - indexName); - } + future.whenComplete( + (response, error) -> { + if (error != null) { + LOG.error("Failed to create entities in OpenSearch (async)", error); + return; + } + + if (response.errors()) { + LOG.error( + "Bulk indexing to OpenSearch encountered errors. Index: {}, Total: {}, Failed: {}", + indexName, + docsAndIds.size(), + response.items().stream().filter(item -> item.error() != null).count()); + + response.items().stream() + .filter(item -> item.error() != null) + .forEach( + item -> + LOG.error( + "Indexing failed for ID {}: {}", item.id(), item.error().reason())); + } else { + LOG.info( + "Successfully indexed {} entities to OpenSearch (async) for index: {}", + docsAndIds.size(), + indexName); + } + }); } catch (Exception e) { LOG.error("Failed to create entities in OpenSearch for index: {} ", indexName, e); } @@ -285,15 +301,7 @@ public class OpenSearchEntityManager implements EntityManagementClient { inline .lang(ScriptLanguage.Painless.jsonValue()) .source(scriptTxt) - .params( - params.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> - JsonData.of( - entry - .getValue())))))))) + .params(convertToJsonDataMap(params)))))) .refresh(true)); LOG.info( @@ -527,7 +535,7 @@ public class OpenSearchEntityManager implements EntityManagementClient { g.index(Entity.getSearchRepository().getIndexOrAliasName(indexName)).id(entityId), Map.class); - if (response.found()) { + if (response != null && response.found()) { return Response.status(Response.Status.OK).entity(response.source()).build(); } } catch (OpenSearchException e) { @@ -553,7 +561,10 @@ public class OpenSearchEntityManager implements EntityManagementClient { try { Map params = - Collections.singletonMap("entityRelationshipData", JsonData.of(entityRelationshipData)); + entityRelationshipData != null + ? Collections.singletonMap( + "entityRelationshipData", JsonData.of(entityRelationshipData)) + : new HashMap<>(); UpdateByQueryResponse response = client.updateByQuery( @@ -620,6 +631,7 @@ public class OpenSearchEntityManager implements EntityManagementClient { private Map convertToJsonDataMap(Map map) { return JsonUtils.getMap(map).entrySet().stream() + .filter(entry -> entry.getValue() != null) .collect(Collectors.toMap(Map.Entry::getKey, entry -> JsonData.of(entry.getValue()))); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java index d44c1c02bac..ff95e691476 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java @@ -4207,18 +4207,10 @@ public abstract class EntityResourceTest response = TestUtils.get(target, Map.class, ADMIN_AUTH_HEADERS); + return response; } @Test