Fix Indexing Issues (#20686)

This commit is contained in:
Mohit Yadav 2025-04-08 16:27:43 +05:30 committed by GitHub
parent 327c9f8503
commit 6970b11677
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 16 additions and 10 deletions

View File

@ -221,7 +221,8 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
searchRepository.updateEntity(pipeline);
searchRepository
.getSearchClient()
.reindexAcrossIndices("lineage.pipeline.fullyQualifiedName", pipeline.getEntityReference());
.reindexAcrossIndices(
"upstreamLineage.pipeline.fullyQualifiedName", pipeline.getEntityReference());
return new RestUtil.PutResponse<>(
Response.Status.OK,

View File

@ -49,6 +49,7 @@ public interface SearchIndex {
"changeDescription",
"incrementalChangeDescription",
"upstreamLineage.pipeline.changeDescription",
"upstreamLineage.pipeline.incrementalChangeDescription",
"connection");
public static final SearchClient searchClient = Entity.getSearchRepository().getSearchClient();

View File

@ -507,9 +507,7 @@ public class OpenSearchClient implements SearchClient {
}
searchSourceBuilder.timeout(new TimeValue(30, TimeUnit.SECONDS));
if (Boolean.TRUE.equals(request.getExplain())) {
searchSourceBuilder.explain(true);
}
try {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.addHeader("Content-Type", "application/json");
@ -1077,7 +1075,7 @@ public class OpenSearchClient implements SearchClient {
searchRequest.source(searchSourceBuilder.size(1000));
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
Map<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
tempMap.keySet().removeAll(FIELDS_TO_REMOVE);
responseMap.put("entity", tempMap);
}
@ -1138,7 +1136,7 @@ public class OpenSearchClient implements SearchClient {
searchRequest.source(searchSourceBuilder.size(1000));
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
Map<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
tempMap.keySet().removeAll(FIELDS_TO_REMOVE);
responseMap.put("entity", tempMap);
}
@ -1186,6 +1184,7 @@ public class OpenSearchClient implements SearchClient {
return responseMap;
}
@Override
public Response searchSchemaEntityRelationship(
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException {
@ -1288,7 +1287,7 @@ public class OpenSearchClient implements SearchClient {
Map<String, Object> node = allNodes.get(nodeFailureId);
if (node != null) {
node.keySet().removeAll(FIELDS_TO_REMOVE);
node.remove("lineage");
node.remove("upstreamLineage");
nodes.add(node);
}
}
@ -1318,9 +1317,7 @@ public class OpenSearchClient implements SearchClient {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(
QueryBuilders.termQuery(
"lineage.fromEntity.fqnHash.keyword", FullyQualifiedName.buildHash(fqn)))
.must(QueryBuilders.termQuery("fqnHash.keyword", FullyQualifiedName.buildHash(fqn)))
.must(QueryBuilders.termQuery("deleted", !nullOrEmpty(deleted) && deleted)));
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
@ -1400,6 +1397,7 @@ public class OpenSearchClient implements SearchClient {
return Response.status(OK).entity(response).build();
}
@Override
public Response aggregate(String index, String fieldName, String value, String query)
throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@ -2377,6 +2375,12 @@ public class OpenSearchClient implements SearchClient {
if (sslContext != null) {
httpAsyncClientBuilder.setSSLContext(sslContext);
}
// Enable TCP keep alive strategy
if (esConfig.getKeepAliveTimeoutSecs() != null
&& esConfig.getKeepAliveTimeoutSecs() > 0) {
httpAsyncClientBuilder.setKeepAliveStrategy(
(response, context) -> esConfig.getKeepAliveTimeoutSecs() * 1000);
}
return httpAsyncClientBuilder;
});
}