diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index ba3b19c43c0..4cc45a5f1b9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -1257,56 +1257,76 @@ public class ElasticSearchClient implements SearchClient { Set visitedFQN = new HashSet<>(); Set> edges = new HashSet<>(); Set> nodes = new HashSet<>(); - es.org.elasticsearch.action.search.SearchRequest searchRequest = - new es.org.elasticsearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); - es.org.elasticsearch.index.query.BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - boolQueryBuilder.should( - QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn))); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); - searchSourceBuilder.query(boolQueryBuilder); - if (CommonUtil.nullOrEmpty(deleted)) { - searchSourceBuilder.query( + Object[] searchAfter = null; + long processedRecords = 0; + long totalRecords = -1; + while (totalRecords != processedRecords) { + es.org.elasticsearch.action.search.SearchRequest searchRequest = + new es.org.elasticsearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + es.org.elasticsearch.index.query.BoolQueryBuilder boolQueryBuilder = + QueryBuilders.boolQuery(); + boolQueryBuilder.should( QueryBuilders.boolQuery() - .must(boolQueryBuilder) - .must(QueryBuilders.termQuery("deleted", deleted))); - } - buildSearchSourceFilter(queryFilter, searchSourceBuilder); - searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - for (var hit : searchResponse.getHits().getHits()) { - List> lineage = - (List>) hit.getSourceAsMap().get("lineage"); - HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); - nodes.add(tempMap); - for (Map lin : lineage) { - HashMap fromEntity = (HashMap) lin.get("fromEntity"); - HashMap toEntity = (HashMap) lin.get("toEntity"); - HashMap pipeline = (HashMap) lin.get("pipeline"); - if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) { - edges.add(lin); - getLineage( - fromEntity.get("fqn"), - visitedFQN, - upstreamDepth, - edges, - nodes, - queryFilter, - "lineage.toEntity.fqn.keyword", - deleted); - getLineage( - toEntity.get("fqn"), - visitedFQN, - downstreamDepth, - edges, - nodes, - queryFilter, - "lineage.fromEntity.fqn.keyword", - deleted); + .must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn))); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); + FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName"); + searchSourceBuilder.sort(sortBuilder); + searchSourceBuilder.query(boolQueryBuilder); + if (searchAfter != null) { + searchSourceBuilder.searchAfter(searchAfter); + } + if (CommonUtil.nullOrEmpty(deleted)) { + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(boolQueryBuilder) + .must(QueryBuilders.termQuery("deleted", deleted))); + } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); + searchRequest.source(searchSourceBuilder); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + + for (var hit : searchResponse.getHits().getHits()) { + List> lineage = + (List>) hit.getSourceAsMap().get("lineage"); + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + nodes.add(tempMap); + for (Map lin : lineage) { + HashMap fromEntity = (HashMap) lin.get("fromEntity"); + HashMap toEntity = (HashMap) lin.get("toEntity"); + HashMap pipeline = (HashMap) lin.get("pipeline"); + if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) { + edges.add(lin); + getLineage( + fromEntity.get("fqn"), + visitedFQN, + upstreamDepth, + edges, + nodes, + queryFilter, + "lineage.toEntity.fqn.keyword", + deleted); + getLineage( + toEntity.get("fqn"), + visitedFQN, + downstreamDepth, + edges, + nodes, + queryFilter, + "lineage.fromEntity.fqn.keyword", + deleted); + } } } + totalRecords = searchResponse.getHits().getTotalHits().value; + int currentHits = searchResponse.getHits().getHits().length; + processedRecords += currentHits; + if (currentHits > 0) { + searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues(); + } else { + searchAfter = null; + } } getLineage( fqn, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 9c47af1e50f..56f1abe8beb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -1256,57 +1256,75 @@ public class OpenSearchClient implements SearchClient { Set> edges = new HashSet<>(); Set> nodes = new HashSet<>(); responseMap.put("entity", null); - os.org.opensearch.action.search.SearchRequest searchRequest = - new os.org.opensearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - boolQueryBuilder.should( - QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn))); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); - searchSourceBuilder.query(boolQueryBuilder); - if (CommonUtil.nullOrEmpty(deleted)) { - searchSourceBuilder.query( + Object[] searchAfter = null; + long processedRecords = 0; + long totalRecords = -1; + while (totalRecords != processedRecords) { + os.org.opensearch.action.search.SearchRequest searchRequest = + new os.org.opensearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.should( QueryBuilders.boolQuery() - .must(boolQueryBuilder) - .must(QueryBuilders.termQuery("deleted", deleted))); - } - buildSearchSourceFilter(queryFilter, searchSourceBuilder); + .must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn))); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); + FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName"); + searchSourceBuilder.sort(sortBuilder); + searchSourceBuilder.query(boolQueryBuilder); + if (searchAfter != null) { + searchSourceBuilder.searchAfter(searchAfter); + } + if (CommonUtil.nullOrEmpty(deleted)) { + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(boolQueryBuilder) + .must(QueryBuilders.termQuery("deleted", deleted))); + } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); - searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - for (var hit : searchResponse.getHits().getHits()) { - List> lineage = - (List>) hit.getSourceAsMap().get("lineage"); - HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); - nodes.add(tempMap); - for (Map lin : lineage) { - HashMap fromEntity = (HashMap) lin.get("fromEntity"); - HashMap toEntity = (HashMap) lin.get("toEntity"); - HashMap pipeline = (HashMap) lin.get("pipeline"); - if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) { - edges.add(lin); - getLineage( - fromEntity.get("fqn"), - visitedFQN, - upstreamDepth, - edges, - nodes, - queryFilter, - "lineage.toEntity.fqn.keyword", - deleted); - getLineage( - toEntity.get("fqn"), - visitedFQN, - downstreamDepth, - edges, - nodes, - queryFilter, - "lineage.fromEntity.fqn.keyword", - deleted); + searchRequest.source(searchSourceBuilder); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + List> lineage = + (List>) hit.getSourceAsMap().get("lineage"); + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + nodes.add(tempMap); + for (Map lin : lineage) { + HashMap fromEntity = (HashMap) lin.get("fromEntity"); + HashMap toEntity = (HashMap) lin.get("toEntity"); + HashMap pipeline = (HashMap) lin.get("pipeline"); + if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) { + edges.add(lin); + getLineage( + fromEntity.get("fqn"), + visitedFQN, + upstreamDepth, + edges, + nodes, + queryFilter, + "lineage.toEntity.fqn.keyword", + deleted); + getLineage( + toEntity.get("fqn"), + visitedFQN, + downstreamDepth, + edges, + nodes, + queryFilter, + "lineage.fromEntity.fqn.keyword", + deleted); + } } } + totalRecords = searchResponse.getHits().getTotalHits().value; + int currentHits = searchResponse.getHits().getHits().length; + processedRecords += currentHits; + if (currentHits > 0) { + searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues(); + } else { + searchAfter = null; + } } getLineage( fqn,