Added async client, fixed null obj parsing issue

This commit is contained in:
Bhanu Agrawal 2025-09-25 13:03:00 +01:00
parent 31c7b1b28e
commit e9d202c959
3 changed files with 101 additions and 72 deletions

View File

@ -6,6 +6,7 @@ import static org.openmetadata.service.search.SearchClient.ADD_UPDATE_ENTITY_REL
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import es.co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import es.co.elastic.clients.elasticsearch.ElasticsearchClient; import es.co.elastic.clients.elasticsearch.ElasticsearchClient;
import es.co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure; import es.co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure;
import es.co.elastic.clients.elasticsearch._types.ElasticsearchException; import es.co.elastic.clients.elasticsearch._types.ElasticsearchException;
@ -30,6 +31,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -47,11 +49,17 @@ import org.openmetadata.service.search.EntityManagementClient;
public class ElasticSearchEntityManager implements EntityManagementClient { public class ElasticSearchEntityManager implements EntityManagementClient {
private final ElasticsearchClient client; private final ElasticsearchClient client;
private final boolean isClientAvailable; private final boolean isClientAvailable;
private ElasticsearchAsyncClient asyncClient;
private final boolean isAsyncClientAvailable;
private final ObjectMapper objectMapper = new ObjectMapper(); private final ObjectMapper objectMapper = new ObjectMapper();
public ElasticSearchEntityManager(ElasticsearchClient client) { public ElasticSearchEntityManager(ElasticsearchClient client) {
this.client = client; this.client = client;
this.isClientAvailable = client != null; this.isClientAvailable = client != null;
if (this.isClientAvailable) {
this.asyncClient = new ElasticsearchAsyncClient(this.client._transport());
}
this.isAsyncClientAvailable = this.asyncClient != null;
} }
@Override @Override
@ -82,14 +90,15 @@ public class ElasticSearchEntityManager implements EntityManagementClient {
@Override @Override
public void createEntities(String indexName, List<Map<String, String>> docsAndIds) { public void createEntities(String indexName, List<Map<String, String>> docsAndIds) {
if (!isClientAvailable) { if (!isAsyncClientAvailable) {
LOG.error("ElasticSearch client is not available. Cannot create entities."); LOG.error("ElasticSearch async client is not available. Cannot create entities.");
return; return;
} }
try { try {
List<BulkOperation> operations = new ArrayList<>(); List<BulkOperation> operations = new ArrayList<>();
for (Map<String, String> docAndId : docsAndIds) { for (Map<String, String> docAndId : docsAndIds) {
if (docAndId == null || docAndId.isEmpty()) continue; // skip invalid entries
Map.Entry<String, String> entry = docAndId.entrySet().iterator().next(); Map.Entry<String, String> entry = docAndId.entrySet().iterator().next();
operations.add( operations.add(
BulkOperation.of( BulkOperation.of(
@ -101,19 +110,39 @@ public class ElasticSearchEntityManager implements EntityManagementClient {
.document(toJsonData(entry.getValue()))))); .document(toJsonData(entry.getValue())))));
} }
BulkResponse response = // Execute the async bulk request
client.bulk(b -> b.index(indexName).operations(operations).refresh(Refresh.True)); CompletableFuture<BulkResponse> future =
asyncClient.bulk(b -> b.index(indexName).operations(operations).refresh(Refresh.True));
// Handle response asynchronously
future.whenComplete(
(response, error) -> {
if (error != null) {
LOG.error("Failed to create entities in ElasticSearch (async)", error);
return;
}
if (response.errors()) {
LOG.error(
"Bulk indexing to ElasticSearch encountered errors. Index: {}, Total: {}, Failed: {}",
indexName,
docsAndIds.size(),
response.items().stream().filter(item -> item.error() != null).count());
response.items().stream()
.filter(item -> item.error() != null)
.forEach(
item ->
LOG.error(
"Indexing failed for ID {}: {}", item.id(), item.error().reason()));
} else {
LOG.info(
"Successfully indexed {} entities to ElasticSearch (async) for index: {}",
docsAndIds.size(),
indexName);
}
});
if (response.errors()) {
String errorMessage =
response.items().stream()
.filter(item -> item.error() != null)
.map(item -> "Failed to index document " + item.id() + ": " + item.error().reason())
.collect(Collectors.joining("; "));
LOG.error("Failed to create entities in ElasticSearch: {}", errorMessage);
} else {
LOG.info("Successfully created {} entities in ElasticSearch", docsAndIds.size());
}
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to create entities in ElasticSearch for index: {} ", indexName, e); LOG.error("Failed to create entities in ElasticSearch for index: {} ", indexName, e);
} }
@ -271,15 +300,7 @@ public class ElasticSearchEntityManager implements EntityManagementClient {
inline inline
.lang(ScriptLanguage.Painless) .lang(ScriptLanguage.Painless)
.source(scriptTxt) .source(scriptTxt)
.params( .params(convertToJsonDataMap(params))))))
params.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry ->
JsonData.of(
entry
.getValue()))))))))
.refresh(true)); .refresh(true));
LOG.info( LOG.info(
@ -514,7 +535,7 @@ public class ElasticSearchEntityManager implements EntityManagementClient {
g.index(Entity.getSearchRepository().getIndexOrAliasName(indexName)).id(entityId), g.index(Entity.getSearchRepository().getIndexOrAliasName(indexName)).id(entityId),
Map.class); Map.class);
if (response.found()) { if (response != null && response.found()) {
return Response.status(Response.Status.OK).entity(response.source()).build(); return Response.status(Response.Status.OK).entity(response.source()).build();
} }
} catch (ElasticsearchException e) { } catch (ElasticsearchException e) {
@ -540,7 +561,10 @@ public class ElasticSearchEntityManager implements EntityManagementClient {
try { try {
Map<String, JsonData> params = Map<String, JsonData> params =
Collections.singletonMap("entityRelationshipData", JsonData.of(entityRelationshipData)); entityRelationshipData != null
? Collections.singletonMap(
"entityRelationshipData", JsonData.of(entityRelationshipData))
: new HashMap<>();
UpdateByQueryResponse response = UpdateByQueryResponse response =
client.updateByQuery( client.updateByQuery(
@ -609,6 +633,7 @@ public class ElasticSearchEntityManager implements EntityManagementClient {
private Map<String, JsonData> convertToJsonDataMap(Map<String, Object> map) { private Map<String, JsonData> convertToJsonDataMap(Map<String, Object> map) {
return JsonUtils.getMap(map).entrySet().stream() return JsonUtils.getMap(map).entrySet().stream()
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, entry -> JsonData.of(entry.getValue()))); .collect(Collectors.toMap(Map.Entry::getKey, entry -> JsonData.of(entry.getValue())));
} }

View File

@ -15,6 +15,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -24,6 +25,7 @@ import org.openmetadata.sdk.exception.SearchIndexNotFoundException;
import org.openmetadata.service.Entity; import org.openmetadata.service.Entity;
import org.openmetadata.service.search.EntityManagementClient; import org.openmetadata.service.search.EntityManagementClient;
import os.org.opensearch.client.json.JsonData; import os.org.opensearch.client.json.JsonData;
import os.org.opensearch.client.opensearch.OpenSearchAsyncClient;
import os.org.opensearch.client.opensearch.OpenSearchClient; import os.org.opensearch.client.opensearch.OpenSearchClient;
import os.org.opensearch.client.opensearch._types.BulkIndexByScrollFailure; import os.org.opensearch.client.opensearch._types.BulkIndexByScrollFailure;
import os.org.opensearch.client.opensearch._types.FieldValue; import os.org.opensearch.client.opensearch._types.FieldValue;
@ -48,11 +50,17 @@ import os.org.opensearch.client.opensearch.core.bulk.BulkOperation;
public class OpenSearchEntityManager implements EntityManagementClient { public class OpenSearchEntityManager implements EntityManagementClient {
private final OpenSearchClient client; private final OpenSearchClient client;
private final boolean isClientAvailable; private final boolean isClientAvailable;
private OpenSearchAsyncClient asyncClient;
private final boolean isAsyncClientAvailable;
private final ObjectMapper objectMapper = new ObjectMapper(); private final ObjectMapper objectMapper = new ObjectMapper();
public OpenSearchEntityManager(OpenSearchClient client) { public OpenSearchEntityManager(OpenSearchClient client) {
this.client = client; this.client = client;
this.isClientAvailable = client != null; this.isClientAvailable = client != null;
if (this.isClientAvailable) {
this.asyncClient = new OpenSearchAsyncClient(this.client._transport());
}
this.isAsyncClientAvailable = this.asyncClient != null;
} }
@Override @Override
@ -81,14 +89,15 @@ public class OpenSearchEntityManager implements EntityManagementClient {
@Override @Override
public void createEntities(String indexName, List<Map<String, String>> docsAndIds) { public void createEntities(String indexName, List<Map<String, String>> docsAndIds) {
if (!isClientAvailable) { if (!isAsyncClientAvailable) {
LOG.error("OpenSearch client is not available. Cannot create entities."); LOG.error("OpenSearch async client is not available. Cannot create entities.");
return; return;
} }
try { try {
List<BulkOperation> operations = new ArrayList<>(); List<BulkOperation> operations = new ArrayList<>();
for (Map<String, String> docAndId : docsAndIds) { for (Map<String, String> docAndId : docsAndIds) {
if (docAndId == null || docAndId.isEmpty()) continue; // skip invalid entries
Map.Entry<String, String> entry = docAndId.entrySet().iterator().next(); Map.Entry<String, String> entry = docAndId.entrySet().iterator().next();
operations.add( operations.add(
BulkOperation.of( BulkOperation.of(
@ -101,29 +110,36 @@ public class OpenSearchEntityManager implements EntityManagementClient {
} }
BulkRequest bulkRequest = BulkRequest.of(b -> b.operations(operations).refresh(Refresh.True)); BulkRequest bulkRequest = BulkRequest.of(b -> b.operations(operations).refresh(Refresh.True));
BulkResponse bulkResponse = client.bulk(bulkRequest); // Async call using OpenSearchAsyncClient
CompletableFuture<BulkResponse> future = asyncClient.bulk(bulkRequest);
if (bulkResponse.errors()) { future.whenComplete(
LOG.error( (response, error) -> {
"Bulk indexing to OpenSearch has errors for index: {}. Total requests: {}, Errors: {}", if (error != null) {
indexName, LOG.error("Failed to create entities in OpenSearch (async)", error);
docsAndIds.size(), return;
bulkResponse.items().stream().filter(item -> item.error() != null).count()); }
bulkResponse
.items() if (response.errors()) {
.forEach( LOG.error(
item -> { "Bulk indexing to OpenSearch encountered errors. Index: {}, Total: {}, Failed: {}",
if (item.error() != null) { indexName,
LOG.error( docsAndIds.size(),
"Bulk indexing error for id {}: {}", item.id(), item.error().reason()); response.items().stream().filter(item -> item.error() != null).count());
}
}); response.items().stream()
} else { .filter(item -> item.error() != null)
LOG.info( .forEach(
"Successfully indexed {} entities to OpenSearch for index: {}", item ->
docsAndIds.size(), LOG.error(
indexName); "Indexing failed for ID {}: {}", item.id(), item.error().reason()));
} } else {
LOG.info(
"Successfully indexed {} entities to OpenSearch (async) for index: {}",
docsAndIds.size(),
indexName);
}
});
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to create entities in OpenSearch for index: {} ", indexName, e); LOG.error("Failed to create entities in OpenSearch for index: {} ", indexName, e);
} }
@ -285,15 +301,7 @@ public class OpenSearchEntityManager implements EntityManagementClient {
inline inline
.lang(ScriptLanguage.Painless.jsonValue()) .lang(ScriptLanguage.Painless.jsonValue())
.source(scriptTxt) .source(scriptTxt)
.params( .params(convertToJsonDataMap(params))))))
params.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry ->
JsonData.of(
entry
.getValue()))))))))
.refresh(true)); .refresh(true));
LOG.info( LOG.info(
@ -527,7 +535,7 @@ public class OpenSearchEntityManager implements EntityManagementClient {
g.index(Entity.getSearchRepository().getIndexOrAliasName(indexName)).id(entityId), g.index(Entity.getSearchRepository().getIndexOrAliasName(indexName)).id(entityId),
Map.class); Map.class);
if (response.found()) { if (response != null && response.found()) {
return Response.status(Response.Status.OK).entity(response.source()).build(); return Response.status(Response.Status.OK).entity(response.source()).build();
} }
} catch (OpenSearchException e) { } catch (OpenSearchException e) {
@ -553,7 +561,10 @@ public class OpenSearchEntityManager implements EntityManagementClient {
try { try {
Map<String, JsonData> params = Map<String, JsonData> params =
Collections.singletonMap("entityRelationshipData", JsonData.of(entityRelationshipData)); entityRelationshipData != null
? Collections.singletonMap(
"entityRelationshipData", JsonData.of(entityRelationshipData))
: new HashMap<>();
UpdateByQueryResponse response = UpdateByQueryResponse response =
client.updateByQuery( client.updateByQuery(
@ -620,6 +631,7 @@ public class OpenSearchEntityManager implements EntityManagementClient {
private Map<String, JsonData> convertToJsonDataMap(Map<String, Object> map) { private Map<String, JsonData> convertToJsonDataMap(Map<String, Object> map) {
return JsonUtils.getMap(map).entrySet().stream() return JsonUtils.getMap(map).entrySet().stream()
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, entry -> JsonData.of(entry.getValue()))); .collect(Collectors.toMap(Map.Entry::getKey, entry -> JsonData.of(entry.getValue())));
} }

View File

@ -4207,18 +4207,10 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
getResource( getResource(
String.format( String.format(
"search/get/%s/doc/%s", indexMapping.getIndexName(null), entityId.toString())); "search/get/%s/doc/%s", indexMapping.getIndexName(null), entityId.toString()));
String result = TestUtils.get(target, String.class, ADMIN_AUTH_HEADERS);
GetResponse response = null; // Get the document directly as a Map from the REST API response
try { Map<String, Object> response = TestUtils.get(target, Map.class, ADMIN_AUTH_HEADERS);
NamedXContentRegistry registry = new NamedXContentRegistry(getDefaultNamedXContents()); return response;
XContentParser parser =
JsonXContent.jsonXContent.createParser(
registry, DeprecationHandler.IGNORE_DEPRECATIONS, result);
response = GetResponse.fromXContent(parser);
} catch (Exception e) {
System.out.println("exception " + e);
}
return response.getSourceAsMap();
} }
@Test @Test