MINOR: Implement pagination in pipeline lineage render (#19295)

This commit is contained in:
Mayur Singal 2025-01-09 15:21:04 +05:30 committed by GitHub
parent 4cad5762ad
commit 8f27ed0455
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 132 additions and 94 deletions

View File

@ -1257,56 +1257,76 @@ public class ElasticSearchClient implements SearchClient {
Set<String> visitedFQN = new HashSet<>(); Set<String> visitedFQN = new HashSet<>();
Set<Map<String, Object>> edges = new HashSet<>(); Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>(); Set<Map<String, Object>> nodes = new HashSet<>();
es.org.elasticsearch.action.search.SearchRequest searchRequest = Object[] searchAfter = null;
new es.org.elasticsearch.action.search.SearchRequest( long processedRecords = 0;
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); long totalRecords = -1;
es.org.elasticsearch.index.query.BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); while (totalRecords != processedRecords) {
boolQueryBuilder.should( es.org.elasticsearch.action.search.SearchRequest searchRequest =
QueryBuilders.boolQuery() new es.org.elasticsearch.action.search.SearchRequest(
.must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn))); Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); es.org.elasticsearch.index.query.BoolQueryBuilder boolQueryBuilder =
searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); QueryBuilders.boolQuery();
searchSourceBuilder.query(boolQueryBuilder); boolQueryBuilder.should(
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery() QueryBuilders.boolQuery()
.must(boolQueryBuilder) .must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn)));
.must(QueryBuilders.termQuery("deleted", deleted))); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
} searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new));
buildSearchSourceFilter(queryFilter, searchSourceBuilder); FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName");
searchRequest.source(searchSourceBuilder); searchSourceBuilder.sort(sortBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); searchSourceBuilder.query(boolQueryBuilder);
for (var hit : searchResponse.getHits().getHits()) { if (searchAfter != null) {
List<Map<String, Object>> lineage = searchSourceBuilder.searchAfter(searchAfter);
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage"); }
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); if (CommonUtil.nullOrEmpty(deleted)) {
nodes.add(tempMap); searchSourceBuilder.query(
for (Map<String, Object> lin : lineage) { QueryBuilders.boolQuery()
HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity"); .must(boolQueryBuilder)
HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity"); .must(QueryBuilders.termQuery("deleted", deleted)));
HashMap<String, String> pipeline = (HashMap<String, String>) lin.get("pipeline"); }
if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) { buildSearchSourceFilter(queryFilter, searchSourceBuilder);
edges.add(lin); searchRequest.source(searchSourceBuilder);
getLineage( SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
fromEntity.get("fqn"),
visitedFQN, for (var hit : searchResponse.getHits().getHits()) {
upstreamDepth, List<Map<String, Object>> lineage =
edges, (List<Map<String, Object>>) hit.getSourceAsMap().get("lineage");
nodes, HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
queryFilter, nodes.add(tempMap);
"lineage.toEntity.fqn.keyword", for (Map<String, Object> lin : lineage) {
deleted); HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
getLineage( HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
toEntity.get("fqn"), HashMap<String, String> pipeline = (HashMap<String, String>) lin.get("pipeline");
visitedFQN, if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) {
downstreamDepth, edges.add(lin);
edges, getLineage(
nodes, fromEntity.get("fqn"),
queryFilter, visitedFQN,
"lineage.fromEntity.fqn.keyword", upstreamDepth,
deleted); edges,
nodes,
queryFilter,
"lineage.toEntity.fqn.keyword",
deleted);
getLineage(
toEntity.get("fqn"),
visitedFQN,
downstreamDepth,
edges,
nodes,
queryFilter,
"lineage.fromEntity.fqn.keyword",
deleted);
}
} }
} }
totalRecords = searchResponse.getHits().getTotalHits().value;
int currentHits = searchResponse.getHits().getHits().length;
processedRecords += currentHits;
if (currentHits > 0) {
searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues();
} else {
searchAfter = null;
}
} }
getLineage( getLineage(
fqn, fqn,

View File

@ -1256,57 +1256,75 @@ public class OpenSearchClient implements SearchClient {
Set<Map<String, Object>> edges = new HashSet<>(); Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>(); Set<Map<String, Object>> nodes = new HashSet<>();
responseMap.put("entity", null); responseMap.put("entity", null);
os.org.opensearch.action.search.SearchRequest searchRequest = Object[] searchAfter = null;
new os.org.opensearch.action.search.SearchRequest( long processedRecords = 0;
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); long totalRecords = -1;
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); while (totalRecords != processedRecords) {
boolQueryBuilder.should( os.org.opensearch.action.search.SearchRequest searchRequest =
QueryBuilders.boolQuery() new os.org.opensearch.action.search.SearchRequest(
.must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn))); Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); boolQueryBuilder.should(
searchSourceBuilder.query(boolQueryBuilder);
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery() QueryBuilders.boolQuery()
.must(boolQueryBuilder) .must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn)));
.must(QueryBuilders.termQuery("deleted", deleted))); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
} searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new));
buildSearchSourceFilter(queryFilter, searchSourceBuilder); FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName");
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(boolQueryBuilder);
if (searchAfter != null) {
searchSourceBuilder.searchAfter(searchAfter);
}
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(boolQueryBuilder)
.must(QueryBuilders.termQuery("deleted", deleted)));
}
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
searchRequest.source(searchSourceBuilder); searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) { for (var hit : searchResponse.getHits().getHits()) {
List<Map<String, Object>> lineage = List<Map<String, Object>> lineage =
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage"); (List<Map<String, Object>>) hit.getSourceAsMap().get("lineage");
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
nodes.add(tempMap); nodes.add(tempMap);
for (Map<String, Object> lin : lineage) { for (Map<String, Object> lin : lineage) {
HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity"); HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity"); HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
HashMap<String, String> pipeline = (HashMap<String, String>) lin.get("pipeline"); HashMap<String, String> pipeline = (HashMap<String, String>) lin.get("pipeline");
if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) { if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) {
edges.add(lin); edges.add(lin);
getLineage( getLineage(
fromEntity.get("fqn"), fromEntity.get("fqn"),
visitedFQN, visitedFQN,
upstreamDepth, upstreamDepth,
edges, edges,
nodes, nodes,
queryFilter, queryFilter,
"lineage.toEntity.fqn.keyword", "lineage.toEntity.fqn.keyword",
deleted); deleted);
getLineage( getLineage(
toEntity.get("fqn"), toEntity.get("fqn"),
visitedFQN, visitedFQN,
downstreamDepth, downstreamDepth,
edges, edges,
nodes, nodes,
queryFilter, queryFilter,
"lineage.fromEntity.fqn.keyword", "lineage.fromEntity.fqn.keyword",
deleted); deleted);
}
} }
} }
totalRecords = searchResponse.getHits().getTotalHits().value;
int currentHits = searchResponse.getHits().getHits().length;
processedRecords += currentHits;
if (currentHits > 0) {
searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues();
} else {
searchAfter = null;
}
} }
getLineage( getLineage(
fqn, fqn,