mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-03 20:27:50 +00:00 
			
		
		
		
	feat(usagestats): Optimize elasticsearch query for usage stats aggregations (#8333)
Co-authored-by: Indy Prentice <indy@Indys-MacBook-Pro.local>
This commit is contained in:
		
							parent
							
								
									653dc7312a
								
							
						
					
					
						commit
						a679e661c6
					
				@ -19,6 +19,7 @@ import com.linkedin.timeseries.GenericTable;
 | 
			
		||||
import com.linkedin.timeseries.GroupingBucket;
 | 
			
		||||
import com.linkedin.timeseries.GroupingBucketType;
 | 
			
		||||
import com.linkedin.timeseries.TimeWindowSize;
 | 
			
		||||
import com.linkedin.util.Pair;
 | 
			
		||||
import java.time.ZonedDateTime;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Arrays;
 | 
			
		||||
@ -34,13 +35,13 @@ import org.elasticsearch.action.search.SearchResponse;
 | 
			
		||||
import org.elasticsearch.client.RequestOptions;
 | 
			
		||||
import org.elasticsearch.client.RestHighLevelClient;
 | 
			
		||||
import org.elasticsearch.index.query.BoolQueryBuilder;
 | 
			
		||||
import org.elasticsearch.index.query.QueryBuilders;
 | 
			
		||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
 | 
			
		||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
 | 
			
		||||
import org.elasticsearch.search.aggregations.Aggregations;
 | 
			
		||||
import org.elasticsearch.search.aggregations.BucketOrder;
 | 
			
		||||
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders;
 | 
			
		||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 | 
			
		||||
import org.elasticsearch.search.aggregations.bucket.filter.ParsedFilter;
 | 
			
		||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 | 
			
		||||
import org.elasticsearch.search.aggregations.metrics.ParsedCardinality;
 | 
			
		||||
import org.elasticsearch.search.aggregations.metrics.ParsedSum;
 | 
			
		||||
@ -51,7 +52,6 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class ESAggregatedStatsDAO {
 | 
			
		||||
  private static final String ES_FILTERED_STATS = "filtered_stats";
 | 
			
		||||
  private static final String ES_AGGREGATION_PREFIX = "agg_";
 | 
			
		||||
  private static final String ES_TERMS_AGGREGATION_PREFIX = "terms_";
 | 
			
		||||
  private static final String ES_MAX_AGGREGATION_PREFIX = "max_";
 | 
			
		||||
@ -124,7 +124,7 @@ public class ESAggregatedStatsDAO {
 | 
			
		||||
      }
 | 
			
		||||
    } else if (curLevel < lastLevel) {
 | 
			
		||||
      //(Recursive-case): We are still processing the nested group-by multi-bucket aggregations.
 | 
			
		||||
      // For each bucket, add the key to the row and recurse-down for full row construction.
 | 
			
		||||
      // For each bucket, add the key to the row and recur down for full row construction.
 | 
			
		||||
      GroupingBucket curGroupingBucket = groupingBuckets.get(curLevel);
 | 
			
		||||
      String curGroupingBucketAggName = getGroupingBucketAggName(curGroupingBucket);
 | 
			
		||||
      MultiBucketsAggregation nestedMBAgg = lowestAggs.get(curGroupingBucketAggName);
 | 
			
		||||
@ -135,7 +135,7 @@ public class ESAggregatedStatsDAO {
 | 
			
		||||
        } else {
 | 
			
		||||
          row.push(b.getKeyAsString());
 | 
			
		||||
        }
 | 
			
		||||
        // Recurse down
 | 
			
		||||
        // Recur down
 | 
			
		||||
        rowGenHelper(b.getAggregations(), curLevel + 1, lastLevel, rows, row, groupingBuckets, aggregationSpecs,
 | 
			
		||||
            aspectSpec);
 | 
			
		||||
        // Remove the row value we have added for this level.
 | 
			
		||||
@ -343,21 +343,24 @@ public class ESAggregatedStatsDAO {
 | 
			
		||||
 | 
			
		||||
    // Setup the filter query builder using the input filter provided.
 | 
			
		||||
    final BoolQueryBuilder filterQueryBuilder = ESUtils.buildFilterQuery(filter, true);
 | 
			
		||||
    // Create the high-level aggregation builder with the filter.
 | 
			
		||||
    final AggregationBuilder filteredAggBuilder = AggregationBuilders.filter(ES_FILTERED_STATS, filterQueryBuilder);
 | 
			
		||||
 | 
			
		||||
    AspectSpec aspectSpec = getTimeseriesAspectSpec(entityName, aspectName);
 | 
			
		||||
    // Build and attach the grouping aggregations
 | 
			
		||||
    final AggregationBuilder baseAggregationForMembers =
 | 
			
		||||
        makeGroupingAggregationBuilder(aspectSpec, filteredAggBuilder, groupingBuckets);
 | 
			
		||||
    final Pair<AggregationBuilder, AggregationBuilder> topAndBottomAggregations =
 | 
			
		||||
        makeGroupingAggregationBuilder(aspectSpec, null, groupingBuckets);
 | 
			
		||||
    AggregationBuilder rootAggregationBuilder = topAndBottomAggregations.getFirst();
 | 
			
		||||
    AggregationBuilder mostNested = topAndBottomAggregations.getSecond();
 | 
			
		||||
 | 
			
		||||
    // Add the aggregations for members.
 | 
			
		||||
    for (AggregationSpec aggregationSpec : aggregationSpecs) {
 | 
			
		||||
      addAggregationBuildersFromAggregationSpec(aspectSpec, baseAggregationForMembers, aggregationSpec);
 | 
			
		||||
      addAggregationBuildersFromAggregationSpec(aspectSpec, mostNested, aggregationSpec);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
 | 
			
		||||
    searchSourceBuilder.aggregation(filteredAggBuilder);
 | 
			
		||||
    searchSourceBuilder.aggregation(rootAggregationBuilder);
 | 
			
		||||
    searchSourceBuilder.query(QueryBuilders.boolQuery().must(filterQueryBuilder));
 | 
			
		||||
 | 
			
		||||
    searchSourceBuilder.size(0);
 | 
			
		||||
 | 
			
		||||
    final SearchRequest searchRequest = new SearchRequest();
 | 
			
		||||
    searchRequest.source(searchSourceBuilder);
 | 
			
		||||
@ -411,36 +414,43 @@ public class ESAggregatedStatsDAO {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private AggregationBuilder makeGroupingAggregationBuilder(AspectSpec aspectSpec,
 | 
			
		||||
      AggregationBuilder baseAggregationBuilder, GroupingBucket[] groupingBuckets) {
 | 
			
		||||
  private Pair<AggregationBuilder, AggregationBuilder> makeGroupingAggregationBuilder(AspectSpec aspectSpec,
 | 
			
		||||
      @Nullable AggregationBuilder baseAggregationBuilder, @Nullable GroupingBucket[] groupingBuckets) {
 | 
			
		||||
 | 
			
		||||
    AggregationBuilder firstAggregationBuilder = baseAggregationBuilder;
 | 
			
		||||
    AggregationBuilder lastAggregationBuilder = baseAggregationBuilder;
 | 
			
		||||
    for (int i = 0; i < groupingBuckets.length; ++i) {
 | 
			
		||||
      AggregationBuilder curAggregationBuilder = null;
 | 
			
		||||
      GroupingBucket curGroupingBucket = groupingBuckets[i];
 | 
			
		||||
      if (curGroupingBucket.getType() == GroupingBucketType.DATE_GROUPING_BUCKET) {
 | 
			
		||||
        // Process the date grouping bucket using 'date-histogram' aggregation.
 | 
			
		||||
        if (!curGroupingBucket.getKey().equals(ES_FIELD_TIMESTAMP)) {
 | 
			
		||||
          throw new IllegalArgumentException("Date Grouping bucket is not:" + ES_FIELD_TIMESTAMP);
 | 
			
		||||
    if (groupingBuckets != null) {
 | 
			
		||||
      for (GroupingBucket curGroupingBucket : groupingBuckets) {
 | 
			
		||||
        AggregationBuilder curAggregationBuilder = null;
 | 
			
		||||
        if (curGroupingBucket.getType() == GroupingBucketType.DATE_GROUPING_BUCKET) {
 | 
			
		||||
          // Process the date grouping bucket using 'date-histogram' aggregation.
 | 
			
		||||
          if (!curGroupingBucket.getKey().equals(ES_FIELD_TIMESTAMP)) {
 | 
			
		||||
            throw new IllegalArgumentException("Date Grouping bucket is not:" + ES_FIELD_TIMESTAMP);
 | 
			
		||||
          }
 | 
			
		||||
          curAggregationBuilder = AggregationBuilders.dateHistogram(ES_AGG_TIMESTAMP)
 | 
			
		||||
              .field(ES_FIELD_TIMESTAMP)
 | 
			
		||||
              .calendarInterval(getHistogramInterval(curGroupingBucket.getTimeWindowSize()));
 | 
			
		||||
        } else if (curGroupingBucket.getType() == GroupingBucketType.STRING_GROUPING_BUCKET) {
 | 
			
		||||
          // Process the string grouping bucket using the 'terms' aggregation.
 | 
			
		||||
          // The field can be Keyword, Numeric, ip, boolean, or binary.
 | 
			
		||||
          String fieldName = ESUtils.toKeywordField(curGroupingBucket.getKey(), true);
 | 
			
		||||
          DataSchema.Type fieldType = getGroupingBucketKeyType(aspectSpec, curGroupingBucket);
 | 
			
		||||
          curAggregationBuilder = AggregationBuilders.terms(getGroupingBucketAggName(curGroupingBucket))
 | 
			
		||||
              .field(fieldName)
 | 
			
		||||
              .size(MAX_TERM_BUCKETS)
 | 
			
		||||
              .order(BucketOrder.aggregation("_key", true));
 | 
			
		||||
        }
 | 
			
		||||
        curAggregationBuilder = AggregationBuilders.dateHistogram(ES_AGG_TIMESTAMP)
 | 
			
		||||
            .field(ES_FIELD_TIMESTAMP)
 | 
			
		||||
            .calendarInterval(getHistogramInterval(curGroupingBucket.getTimeWindowSize()));
 | 
			
		||||
      } else if (curGroupingBucket.getType() == GroupingBucketType.STRING_GROUPING_BUCKET) {
 | 
			
		||||
        // Process the string grouping bucket using the 'terms' aggregation.
 | 
			
		||||
        // The field can be Keyword, Numeric, ip, boolean, or binary.
 | 
			
		||||
        String fieldName = ESUtils.toKeywordField(curGroupingBucket.getKey(), true);
 | 
			
		||||
        DataSchema.Type fieldType = getGroupingBucketKeyType(aspectSpec, curGroupingBucket);
 | 
			
		||||
        curAggregationBuilder = AggregationBuilders.terms(getGroupingBucketAggName(curGroupingBucket))
 | 
			
		||||
            .field(fieldName)
 | 
			
		||||
            .size(MAX_TERM_BUCKETS)
 | 
			
		||||
            .order(BucketOrder.aggregation("_key", true));
 | 
			
		||||
        if (firstAggregationBuilder == null) {
 | 
			
		||||
          firstAggregationBuilder = curAggregationBuilder;
 | 
			
		||||
        }
 | 
			
		||||
        if (lastAggregationBuilder != null) {
 | 
			
		||||
          lastAggregationBuilder.subAggregation(curAggregationBuilder);
 | 
			
		||||
        }
 | 
			
		||||
        lastAggregationBuilder = curAggregationBuilder;
 | 
			
		||||
      }
 | 
			
		||||
      lastAggregationBuilder.subAggregation(curAggregationBuilder);
 | 
			
		||||
      lastAggregationBuilder = curAggregationBuilder;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return lastAggregationBuilder;
 | 
			
		||||
    return Pair.of(firstAggregationBuilder, lastAggregationBuilder);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private GenericTable generateResponseFromElastic(SearchResponse searchResponse, GroupingBucket[] groupingBuckets,
 | 
			
		||||
@ -459,10 +469,8 @@ public class ESAggregatedStatsDAO {
 | 
			
		||||
    List<StringArray> rows = new ArrayList<>();
 | 
			
		||||
 | 
			
		||||
    Aggregations aggregations = searchResponse.getAggregations();
 | 
			
		||||
    ParsedFilter filterAgg = aggregations.get(ES_FILTERED_STATS);
 | 
			
		||||
    Stack<String> rowAcc = new Stack<>();
 | 
			
		||||
    // 3.1 Do a DFS of the aggregation tree and generate the rows.
 | 
			
		||||
    rowGenHelper(filterAgg.getAggregations(), 0, groupingBuckets.length, rows, rowAcc,
 | 
			
		||||
    rowGenHelper(aggregations, 0, groupingBuckets.length, rows, rowAcc,
 | 
			
		||||
        ImmutableList.copyOf(groupingBuckets), ImmutableList.copyOf(aggregationSpecs), aspectSpec);
 | 
			
		||||
 | 
			
		||||
    if (!rowAcc.isEmpty()) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user