Minor: Query Cost Table Aggregation Endpoint (#20270)

This commit is contained in:
Mayur Singal 2025-03-17 11:33:50 +05:30 committed by GitHub
parent 912e5bc74e
commit d30fd90096
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 716 additions and 26 deletions

View File

@ -1,24 +1,25 @@
query,cost
"select * from shopify.raw_product_catalog",10
"select * from shopify.raw_product_catalog",12
"select comments, products from shopify.raw_product_catalog",2
"select comments, products from shopify.raw_product_catalog",3
"select comments, products from shopify.raw_product_catalog",9
"select cust.customer_id, fact_order.order_id from dim_customer cust join fact_order on cust.customer_id = fact_order.customer_id",0.222
"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.234
"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.5
"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.65
"select case when net_sales > 100 then 'high' else 'low' end as map_sales from shopify.fact_sale",2
"select ROW_NUMBER() OVER (PARTITION BY sale_id, customer_id) AS sale_row from shopify.fact_sale",5
"select * from shopify.raw_customer",19
"select * from shopify.raw_customer",19
"select * from shopify.raw_customer",18
"select * from shopify.raw_customer",17
"select * from shopify.raw_customer",16
"select * from shopify.raw_customer",20
"select * from shopify.raw_customer",21
"select * from shopify.raw_customer",22
"select * from shopify.raw_customer",15
"select * from shopify.raw_customer",12
"create table shopify.dim_address_clean as select address_id, shop_id, first_name, last_name, address1 as address, company, city, region, zip, country, phone from shopify.dim_address",0.5
"create table shopify.dim_address_clean as select address_id, shop_id, first_name, last_name, address1 as address, company, city, region, zip, country, phone from shopify.dim_address",0.5
query,cost,user
"select * from shopify.raw_product_catalog",10,admin
"select * from shopify.raw_product_catalog",12,admin
"select comments, products from shopify.raw_product_catalog",2,
"select comments, products from shopify.raw_product_catalog",3,
"select comments, products from shopify.raw_product_catalog",9,
"select cust.customer_id, fact_order.order_id from dim_customer cust join fact_order on cust.customer_id = fact_order.customer_id",0.222,
"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.234,
"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.5,
"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.65,
"select case when net_sales > 100 then 'high' else 'low' end as map_sales from shopify.fact_sale",2,
"select ROW_NUMBER() OVER (PARTITION BY sale_id, customer_id) AS sale_row from shopify.fact_sale",5,
"select * from shopify.raw_customer",19,admin
"select * from shopify.raw_customer",19,admin
"select * from shopify.raw_customer",18,admin
"select * from shopify.raw_customer",17,admin
"select * from shopify.raw_customer",16,admin
"select * from shopify.raw_customer",20,admin
"select * from shopify.raw_customer",21,admin
"select * from shopify.raw_customer",22,admin
"select * from shopify.raw_customer",15,admin
"select * from shopify.raw_customer",12,admin
"create table shopify.dim_address_clean as select address_id, shop_id, first_name, last_name, address1 as address, company, city, region, zip, country, phone from shopify.dim_address",0.5,admin
"create table shopify.dim_address_clean as select address_id, shop_id, first_name, last_name, address1 as address, company, city, region, zip, country, phone from shopify.dim_address",0.5,admin
"select * from shopify.raw_customer limit 1",12,admin

View File

@ -93,6 +93,7 @@ class SampleUsageSource(UsageSource):
serviceName=self.config.serviceName,
databaseSchema="shopify",
cost=row.get("cost"),
userName=row.get("user"),
)
for row in self.query_logs
]

View File

@ -77,7 +77,7 @@ class QueryParserTest(TestCase):
"dim_customer": 4,
"fact_order": 4,
"shopify.fact_sale": 5,
"shopify.raw_customer": 10,
"shopify.raw_customer": 11,
}
config_dict = json.loads(config)
config_dict["source"]["serviceConnection"]["config"]["connectionOptions"][

View File

@ -1,6 +1,8 @@
package org.openmetadata.service.jdbi3;
import java.io.IOException;
import org.openmetadata.schema.entity.data.QueryCostRecord;
import org.openmetadata.schema.entity.data.QueryCostSearchResult;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.query.QueryCostResource;
@ -13,4 +15,8 @@ public class QueryCostRepository extends EntityTimeSeriesRepository<QueryCostRec
QueryCostRecord.class,
Entity.QUERY_COST_RECORD);
}
public QueryCostSearchResult getQueryCostAggData(String serviceName) throws IOException {
return searchRepository.getQueryCostRecords(serviceName);
}
}

View File

@ -253,7 +253,7 @@ public class QueryRepository extends EntityRepository<Query> {
"users",
USER,
original.getUsers(),
updated.getUsers(),
updated.getUsers() == null ? new ArrayList<>() : updated.getUsers(),
Relationship.USES,
Entity.QUERY,
original.getId());

View File

@ -6,6 +6,7 @@ import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.io.IOException;
import java.util.UUID;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
@ -21,8 +22,10 @@ import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.openmetadata.schema.entity.data.CreateQueryCostRecord;
import org.openmetadata.schema.entity.data.QueryCostRecord;
import org.openmetadata.schema.entity.data.QueryCostSearchResult;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.QueryCostRepository;
import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.resources.EntityTimeSeriesResource;
@ -106,4 +109,32 @@ public class QueryCostResource
mapper.createToEntity(createQueryCostRecord, securityContext.getUserPrincipal().getName());
return create(queryCostRecord, queryCostRecord.getQueryReference().getFullyQualifiedName());
}
@GET
@Path("/service/{serviceName}")
@Operation(
operationId = "getQueryCostByService",
summary = "Get Query Cost By Service",
description = "Get Query Cost By Service",
responses = {
@ApiResponse(
responseCode = "200",
description = "Create query cost record",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CreateQueryCostRecord.class)))
})
public QueryCostSearchResult getQueryCostAggData(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@PathParam("serviceName") String serviceName)
throws IOException {
OperationContext operationContext =
new OperationContext(Entity.QUERY, MetadataOperation.VIEW_QUERIES);
ListFilter filter = new ListFilter(null);
ResourceContext resourceContext = filter.getResourceContext(Entity.QUERY);
authorizer.authorize(securityContext, operationContext, resourceContext);
return repository.getQueryCostAggData(serviceName);
}
}

View File

@ -14,6 +14,7 @@ import org.openmetadata.service.util.EntityUtil;
public class QueryMapper implements EntityMapper<Query, CreateQuery> {
@Override
public Query createToEntity(CreateQuery create, String user) {
return copy(new Query(), create, user)
.withQuery(create.getQuery())
.withChecksum(EntityUtil.hash(create.getQuery()))
@ -23,6 +24,7 @@ public class QueryMapper implements EntityMapper<Query, CreateQuery> {
.withUsers(getEntityReferences(USER, create.getUsers()))
.withQueryUsedIn(EntityUtil.populateEntityReferences(create.getQueryUsedIn()))
.withQueryDate(create.getQueryDate())
.withUsedBy(create.getUsedBy())
.withTriggeredBy(create.getTriggeredBy())
.withProcessedLineage(create.getProcessedLineage());
}

View File

@ -17,6 +17,7 @@ import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList;
import org.openmetadata.schema.entity.data.QueryCostSearchResult;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.tests.DataQualityReport;
import org.openmetadata.schema.type.EntityReference;
@ -348,4 +349,6 @@ public interface SearchClient {
Object getClient();
SearchHealthStatus getSearchHealthStatus() throws IOException;
QueryCostSearchResult getQueryCostRecords(String serviceName) throws IOException;
}

View File

@ -72,6 +72,7 @@ import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.entity.classification.Tag;
import org.openmetadata.schema.entity.data.QueryCostSearchResult;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.tests.DataQualityReport;
import org.openmetadata.schema.tests.TestSuite;
@ -1214,4 +1215,8 @@ public class SearchRepository {
new ImmutablePair<>("entityRelationship.docId.keyword", relationDocId),
new ImmutablePair<>(String.format(REMOVE_ENTITY_RELATIONSHIP, relationDocId), null));
}
public QueryCostSearchResult getQueryCostRecords(String serviceName) throws IOException {
return searchClient.getQueryCostRecords(serviceName);
}
}

View File

@ -162,6 +162,7 @@ import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList;
import org.openmetadata.schema.dataInsight.custom.FormulaHolder;
import org.openmetadata.schema.entity.data.EntityHierarchy;
import org.openmetadata.schema.entity.data.QueryCostSearchResult;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.tests.DataQualityReport;
@ -200,6 +201,7 @@ import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.Elas
import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.ElasticSearchMostViewedEntitiesAggregator;
import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.ElasticSearchPageViewsByEntitiesAggregator;
import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.ElasticSearchUnusedAssetsAggregator;
import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.QueryCostRecordsAggregator;
import org.openmetadata.service.search.elasticsearch.queries.ElasticQueryBuilder;
import org.openmetadata.service.search.elasticsearch.queries.ElasticQueryBuilderFactory;
import org.openmetadata.service.search.indexes.APIEndpointIndex;
@ -2500,6 +2502,14 @@ public class ElasticSearchClient implements SearchClient {
return null;
}
public QueryCostSearchResult getQueryCostRecords(String serviceName) throws IOException {
QueryCostRecordsAggregator queryCostRecordsAggregator = new QueryCostRecordsAggregator();
es.org.elasticsearch.action.search.SearchRequest searchRequest =
queryCostRecordsAggregator.getQueryCostRecords(serviceName);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
return queryCostRecordsAggregator.parseQueryCostResponse(searchResponse);
}
private static AggregationBuilder buildQueryAggregation(
DataInsightChartResult.DataInsightChartType dataInsightChartName)
throws IllegalArgumentException {

View File

@ -0,0 +1,209 @@
package org.openmetadata.service.search.elasticsearch.dataInsightAggregators;
import es.org.elasticsearch.action.search.SearchAction;
import es.org.elasticsearch.action.search.SearchRequest;
import es.org.elasticsearch.action.search.SearchRequestBuilder;
import es.org.elasticsearch.action.search.SearchResponse;
import es.org.elasticsearch.index.query.BoolQueryBuilder;
import es.org.elasticsearch.index.query.QueryBuilders;
import es.org.elasticsearch.script.Script;
import es.org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import es.org.elasticsearch.search.aggregations.AggregationBuilders;
import es.org.elasticsearch.search.aggregations.BucketOrder;
import es.org.elasticsearch.search.aggregations.PipelineAggregatorBuilders;
import es.org.elasticsearch.search.aggregations.bucket.terms.Terms;
import es.org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import es.org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import es.org.elasticsearch.search.aggregations.metrics.Stats;
import es.org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder;
import es.org.elasticsearch.search.aggregations.metrics.Sum;
import es.org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import es.org.elasticsearch.search.aggregations.metrics.TopHits;
import es.org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import es.org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.openmetadata.schema.entity.data.OverallStats;
import org.openmetadata.schema.entity.data.QueryCostSearchResult;
import org.openmetadata.schema.entity.data.QueryDetails;
import org.openmetadata.schema.entity.data.QueryGroup;
import org.openmetadata.schema.entity.data.QueryHolder;
public class QueryCostRecordsAggregator {
public static SearchRequest getQueryCostRecords(String serviceName) {
SearchRequest searchRequest;
AbstractAggregationBuilder aggregationBuilder;
// Create search request builder
SearchRequestBuilder searchRequestBuilder =
new SearchRequestBuilder(null, SearchAction.INSTANCE);
searchRequestBuilder.setSize(0);
// Create query groups aggregation with size 10 and order by total_cost
TermsAggregationBuilder queryGroupsAgg =
AggregationBuilders.terms("query_groups")
.field("query.query")
.size(10)
.order(BucketOrder.aggregation("total_cost", false));
// Add sub-aggregations to query_groups
// Users aggregation
TermsAggregationBuilder usersAgg =
AggregationBuilders.terms("users").field("query.usedBy.keyword").size(10);
queryGroupsAgg.subAggregation(usersAgg);
// Total cost aggregation
SumAggregationBuilder totalCostAgg = AggregationBuilders.sum("total_cost").field("cost");
queryGroupsAgg.subAggregation(totalCostAgg);
// Total count aggregation
SumAggregationBuilder totalCountAgg = AggregationBuilders.sum("total_count").field("count");
queryGroupsAgg.subAggregation(totalCountAgg);
// Total duration aggregation
SumAggregationBuilder totalDurationAgg =
AggregationBuilders.sum("total_duration").field("totalDuration");
queryGroupsAgg.subAggregation(totalDurationAgg);
// Average duration aggregation (bucket script)
Map<String, String> bucketsPathMap = new HashMap<>();
bucketsPathMap.put("total_duration", "total_duration");
bucketsPathMap.put("total_count", "total_count");
BucketScriptPipelineAggregationBuilder avgDurationAgg =
PipelineAggregatorBuilders.bucketScript(
"avg_duration",
bucketsPathMap,
new Script("params.total_duration / params.total_count"));
queryGroupsAgg.subAggregation(avgDurationAgg);
// Query details aggregation (top hits)
TopHitsAggregationBuilder queryDetailsAgg =
AggregationBuilders.topHits("query_details")
.size(1)
.fetchSource(new String[] {"query.*"}, null);
queryGroupsAgg.subAggregation(queryDetailsAgg);
// set query size to 10
queryGroupsAgg.size(10);
queryGroupsAgg.order(BucketOrder.aggregation("total_cost", false));
// Overall totals aggregation
StatsAggregationBuilder overallTotalsAgg =
AggregationBuilders.stats("overall_totals").field("cost");
// Total execution count aggregation
SumAggregationBuilder totalExecutionCountAgg =
AggregationBuilders.sum("total_execution_count").field("count");
// Add all top-level aggregations to the search request
searchRequestBuilder.addAggregation(queryGroupsAgg);
searchRequestBuilder.addAggregation(overallTotalsAgg);
searchRequestBuilder.addAggregation(totalExecutionCountAgg);
// If serviceName is provided, add a filter
if (serviceName != null && !serviceName.isEmpty()) {
BoolQueryBuilder boolQuery =
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("service.name.keyword", serviceName));
searchRequestBuilder.setQuery(boolQuery);
}
// Build the search request
searchRequest = searchRequestBuilder.request().indices("query_cost_record_search_index");
return searchRequest;
}
public static QueryCostSearchResult parseQueryCostResponse(SearchResponse response) {
List<QueryGroup> queryGroups = new ArrayList<>();
// Get the query_groups aggregation
Terms queryGroupsAgg = response.getAggregations().get("query_groups");
// Process each query group
for (Terms.Bucket bucket : queryGroupsAgg.getBuckets()) {
String queryText = bucket.getKeyAsString();
// Get users
Terms usersAgg = bucket.getAggregations().get("users");
List<String> users =
usersAgg.getBuckets().stream()
.map(Terms.Bucket::getKeyAsString)
.collect(Collectors.toList());
// Get metrics
double totalCost = ((Sum) bucket.getAggregations().get("total_cost")).getValue();
long totalCount = (long) ((Sum) bucket.getAggregations().get("total_count")).getValue();
double totalDuration = ((Sum) bucket.getAggregations().get("total_duration")).getValue();
// Get avg_duration using a more generic approach
double avgDuration;
Object avgDurationAgg = bucket.getAggregations().get("avg_duration");
if (avgDurationAgg instanceof NumericMetricsAggregation.SingleValue) {
// This should work for most implementations
avgDuration = ((NumericMetricsAggregation.SingleValue) avgDurationAgg).value();
} else {
// Fallback: calculate it ourselves if the aggregation result can't be accessed
avgDuration = totalCount > 0 ? totalDuration / totalCount : 0;
}
// Get query details
TopHits queryDetailsHits = bucket.getAggregations().get("query_details");
Map<String, Object> detailsMap = queryDetailsHits.getHits().getHits()[0].getSourceAsMap();
// Create QueryDetails object
QueryDetails queryDetails = new QueryDetails();
// Extract query information if available
if (detailsMap.containsKey("query")) {
// Create a QueryHolder object instead of using the Map directly
QueryHolder query = new QueryHolder();
@SuppressWarnings("unchecked")
Map<String, Object> queryMap = (Map<String, Object>) detailsMap.get("query");
// Add all properties from queryMap to the Query__1 object as additional properties
for (Map.Entry<String, Object> entry : queryMap.entrySet()) {
query.withAdditionalProperty(entry.getKey(), entry.getValue());
}
queryDetails.withQuery(query);
}
// Add any other fields from detailsMap to queryDetails
for (Map.Entry<String, Object> entry : detailsMap.entrySet()) {
if (!entry.getKey().equals("query")) {
queryDetails.withAdditionalProperty(entry.getKey(), entry.getValue());
}
}
QueryGroup queryGroup =
new QueryGroup()
.withQueryText(queryText)
.withUsers(users)
.withTotalCost(totalCost)
.withTotalCount((int) totalCount)
.withTotalDuration(totalDuration)
.withAvgDuration(avgDuration)
.withQueryDetails(queryDetails);
queryGroups.add(queryGroup);
}
// Get overall stats
Stats overallTotals = response.getAggregations().get("overall_totals");
Sum totalExecutionCount = response.getAggregations().get("total_execution_count");
OverallStats overallStats =
new OverallStats()
.withTotalCost(overallTotals.getSum())
.withMinCost(overallTotals.getMin())
.withMaxCost(overallTotals.getMax())
.withAvgCost(overallTotals.getAvg())
.withTotalExecutionCount((int) totalExecutionCount.getValue());
return new QueryCostSearchResult().withQueryGroups(queryGroups).withOverallStats(overallStats);
}
}

View File

@ -80,6 +80,7 @@ import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList;
import org.openmetadata.schema.dataInsight.custom.FormulaHolder;
import org.openmetadata.schema.entity.data.EntityHierarchy;
import org.openmetadata.schema.entity.data.QueryCostSearchResult;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.tests.DataQualityReport;
@ -138,6 +139,7 @@ import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSear
import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSearchMostViewedEntitiesAggregator;
import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSearchPageViewsByEntitiesAggregator;
import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSearchUnusedAssetsAggregator;
import org.openmetadata.service.search.opensearch.dataInsightAggregator.QueryCostRecordsAggregator;
import org.openmetadata.service.search.opensearch.queries.OpenSearchQueryBuilder;
import org.openmetadata.service.search.opensearch.queries.OpenSearchQueryBuilderFactory;
import org.openmetadata.service.search.queries.OMQueryBuilder;
@ -2771,4 +2773,14 @@ public class OpenSearchClient implements SearchClient {
return new SearchHealthStatus(UNHEALTHY_STATUS);
}
}
@Override
public QueryCostSearchResult getQueryCostRecords(String serviceName) throws IOException {
QueryCostRecordsAggregator queryCostRecordsAggregator = new QueryCostRecordsAggregator();
os.org.opensearch.action.search.SearchRequest searchRequest =
queryCostRecordsAggregator.getQueryCostRecords(serviceName);
os.org.opensearch.action.search.SearchResponse searchResponse =
client.search(searchRequest, RequestOptions.DEFAULT);
return queryCostRecordsAggregator.parseQueryCostResponse(searchResponse);
}
}

View File

@ -0,0 +1,209 @@
package org.openmetadata.service.search.opensearch.dataInsightAggregator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.openmetadata.schema.entity.data.OverallStats;
import org.openmetadata.schema.entity.data.QueryCostSearchResult;
import org.openmetadata.schema.entity.data.QueryDetails;
import org.openmetadata.schema.entity.data.QueryGroup;
import org.openmetadata.schema.entity.data.QueryHolder;
import os.org.opensearch.action.search.SearchAction;
import os.org.opensearch.action.search.SearchRequest;
import os.org.opensearch.action.search.SearchRequestBuilder;
import os.org.opensearch.action.search.SearchResponse;
import os.org.opensearch.index.query.BoolQueryBuilder;
import os.org.opensearch.index.query.QueryBuilders;
import os.org.opensearch.script.Script;
import os.org.opensearch.search.aggregations.AbstractAggregationBuilder;
import os.org.opensearch.search.aggregations.AggregationBuilders;
import os.org.opensearch.search.aggregations.BucketOrder;
import os.org.opensearch.search.aggregations.PipelineAggregatorBuilders;
import os.org.opensearch.search.aggregations.bucket.terms.Terms;
import os.org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import os.org.opensearch.search.aggregations.metrics.NumericMetricsAggregation;
import os.org.opensearch.search.aggregations.metrics.Stats;
import os.org.opensearch.search.aggregations.metrics.StatsAggregationBuilder;
import os.org.opensearch.search.aggregations.metrics.Sum;
import os.org.opensearch.search.aggregations.metrics.SumAggregationBuilder;
import os.org.opensearch.search.aggregations.metrics.TopHits;
import os.org.opensearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import os.org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
public class QueryCostRecordsAggregator {
public static SearchRequest getQueryCostRecords(String serviceName) {
SearchRequest searchRequest;
AbstractAggregationBuilder aggregationBuilder;
// Create search request builder
SearchRequestBuilder searchRequestBuilder =
new SearchRequestBuilder(null, SearchAction.INSTANCE);
searchRequestBuilder.setSize(0);
// Create query groups aggregation with size 10 and order by total_cost
TermsAggregationBuilder queryGroupsAgg =
AggregationBuilders.terms("query_groups")
.field("query.query")
.size(10)
.order(BucketOrder.aggregation("total_cost", false));
// Add sub-aggregations to query_groups
// Users aggregation
TermsAggregationBuilder usersAgg =
AggregationBuilders.terms("users").field("query.usedBy.keyword").size(10);
queryGroupsAgg.subAggregation(usersAgg);
// Total cost aggregation
SumAggregationBuilder totalCostAgg = AggregationBuilders.sum("total_cost").field("cost");
queryGroupsAgg.subAggregation(totalCostAgg);
// Total count aggregation
SumAggregationBuilder totalCountAgg = AggregationBuilders.sum("total_count").field("count");
queryGroupsAgg.subAggregation(totalCountAgg);
// Total duration aggregation
SumAggregationBuilder totalDurationAgg =
AggregationBuilders.sum("total_duration").field("totalDuration");
queryGroupsAgg.subAggregation(totalDurationAgg);
// Average duration aggregation (bucket script)
Map<String, String> bucketsPathMap = new HashMap<>();
bucketsPathMap.put("total_duration", "total_duration");
bucketsPathMap.put("total_count", "total_count");
BucketScriptPipelineAggregationBuilder avgDurationAgg =
PipelineAggregatorBuilders.bucketScript(
"avg_duration",
bucketsPathMap,
new Script("params.total_duration / params.total_count"));
queryGroupsAgg.subAggregation(avgDurationAgg);
// Query details aggregation (top hits)
TopHitsAggregationBuilder queryDetailsAgg =
AggregationBuilders.topHits("query_details")
.size(1)
.fetchSource(new String[] {"query.*"}, null);
queryGroupsAgg.subAggregation(queryDetailsAgg);
// set query size to 10
queryGroupsAgg.size(10);
queryGroupsAgg.order(BucketOrder.aggregation("total_cost", false));
// Overall totals aggregation
StatsAggregationBuilder overallTotalsAgg =
AggregationBuilders.stats("overall_totals").field("cost");
// Total execution count aggregation
SumAggregationBuilder totalExecutionCountAgg =
AggregationBuilders.sum("total_execution_count").field("count");
// Add all top-level aggregations to the search request
searchRequestBuilder.addAggregation(queryGroupsAgg);
searchRequestBuilder.addAggregation(overallTotalsAgg);
searchRequestBuilder.addAggregation(totalExecutionCountAgg);
// If serviceName is provided, add a filter
if (serviceName != null && !serviceName.isEmpty()) {
BoolQueryBuilder boolQuery =
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("service.name.keyword", serviceName));
searchRequestBuilder.setQuery(boolQuery);
}
// Build the search request
searchRequest = searchRequestBuilder.request().indices("query_cost_record_search_index");
return searchRequest;
}
public static QueryCostSearchResult parseQueryCostResponse(SearchResponse response) {
List<QueryGroup> queryGroups = new ArrayList<>();
// Get the query_groups aggregation
Terms queryGroupsAgg = response.getAggregations().get("query_groups");
// Process each query group
for (Terms.Bucket bucket : queryGroupsAgg.getBuckets()) {
String queryText = bucket.getKeyAsString();
// Get users
Terms usersAgg = bucket.getAggregations().get("users");
List<String> users =
usersAgg.getBuckets().stream()
.map(Terms.Bucket::getKeyAsString)
.collect(Collectors.toList());
// Get metrics
double totalCost = ((Sum) bucket.getAggregations().get("total_cost")).getValue();
long totalCount = (long) ((Sum) bucket.getAggregations().get("total_count")).getValue();
double totalDuration = ((Sum) bucket.getAggregations().get("total_duration")).getValue();
// Get avg_duration using a more generic approach
double avgDuration;
Object avgDurationAgg = bucket.getAggregations().get("avg_duration");
if (avgDurationAgg instanceof NumericMetricsAggregation.SingleValue) {
// This should work for most implementations
avgDuration = ((NumericMetricsAggregation.SingleValue) avgDurationAgg).value();
} else {
// Fallback: calculate it ourselves if the aggregation result can't be accessed
avgDuration = totalCount > 0 ? totalDuration / totalCount : 0;
}
// Get query details
TopHits queryDetailsHits = bucket.getAggregations().get("query_details");
Map<String, Object> detailsMap = queryDetailsHits.getHits().getHits()[0].getSourceAsMap();
// Create QueryDetails object
QueryDetails queryDetails = new QueryDetails();
// Extract query information if available
if (detailsMap.containsKey("query")) {
// Create a QueryHolder object instead of using the Map directly
QueryHolder query = new QueryHolder();
@SuppressWarnings("unchecked")
Map<String, Object> queryMap = (Map<String, Object>) detailsMap.get("query");
// Add all properties from queryMap to the Query__1 object as additional properties
for (Map.Entry<String, Object> entry : queryMap.entrySet()) {
query.withAdditionalProperty(entry.getKey(), entry.getValue());
}
queryDetails.withQuery(query);
}
// Add any other fields from detailsMap to queryDetails
for (Map.Entry<String, Object> entry : detailsMap.entrySet()) {
if (!entry.getKey().equals("query")) {
queryDetails.withAdditionalProperty(entry.getKey(), entry.getValue());
}
}
QueryGroup queryGroup =
new QueryGroup()
.withQueryText(queryText)
.withUsers(users)
.withTotalCost(totalCost)
.withTotalCount((int) totalCount)
.withTotalDuration(totalDuration)
.withAvgDuration(avgDuration)
.withQueryDetails(queryDetails);
queryGroups.add(queryGroup);
}
// Get overall stats
Stats overallTotals = response.getAggregations().get("overall_totals");
Sum totalExecutionCount = response.getAggregations().get("total_execution_count");
OverallStats overallStats =
new OverallStats()
.withTotalCost(overallTotals.getSum())
.withMinCost(overallTotals.getMin())
.withMaxCost(overallTotals.getMax())
.withAvgCost(overallTotals.getAvg())
.withTotalExecutionCount((int) totalExecutionCount.getValue());
return new QueryCostSearchResult().withQueryGroups(queryGroups).withOverallStats(overallStats);
}
}

View File

@ -0,0 +1,103 @@
{
"$id": "https://open-metadata.org/schema/entity/data/queryCostSearchResult.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "QueryCostSearchResult",
"description": "Query Cost Search Result",
"type": "object",
"javaType": "org.openmetadata.schema.entity.data.QueryCostSearchResult",
"definitions": {
"queryDetails": {
"type": "object",
"javaType": "org.openmetadata.schema.entity.data.QueryDetails",
"properties": {
"query": {
"type": "object",
"javaType": "org.openmetadata.schema.entity.data.QueryHolder",
"additionalProperties": true,
"description": "Query information"
}
},
"additionalProperties": true,
"description": "Details about the query"
},
"queryGroup": {
"type": "object",
"javaType": "org.openmetadata.schema.entity.data.QueryGroup",
"properties": {
"queryText": {
"type": "string",
"description": "The text of the query"
},
"users": {
"type": "array",
"items": {
"type": "string"
},
"description": "List of users who executed the query"
},
"totalCost": {
"type": "number",
"description": "Total cost of all query executions"
},
"totalCount": {
"type": "integer",
"description": "Total number of query executions"
},
"totalDuration": {
"type": "number",
"description": "Total duration of all query executions"
},
"avgDuration": {
"type": "number",
"description": "Average duration per query execution"
},
"queryDetails": {
"$ref": "#/definitions/queryDetails",
"description": "Additional query details"
}
},
"required": ["queryText", "users", "totalCost", "totalCount", "totalDuration", "avgDuration", "queryDetails"]
},
"overallStats": {
"type": "object",
"javaType": "org.openmetadata.schema.entity.data.OverallStats",
"properties": {
"totalCost": {
"type": "number",
"description": "Total cost across all queries"
},
"minCost": {
"type": "number",
"description": "Minimum cost among all queries"
},
"maxCost": {
"type": "number",
"description": "Maximum cost among all queries"
},
"avgCost": {
"type": "number",
"description": "Average cost across all queries"
},
"totalExecutionCount": {
"type": "integer",
"description": "Total number of query executions"
}
},
"required": ["totalCost", "minCost", "maxCost", "avgCost", "totalExecutionCount"]
}
},
"properties": {
"queryGroups": {
"type": "array",
"items": {
"$ref": "#/definitions/queryGroup"
},
"description": "List of query groups with their metrics"
},
"overallStats": {
"$ref": "#/definitions/overallStats",
"description": "Overall statistics across all queries"
}
},
"required": ["queryGroups", "overallStats"]
}

View File

@ -0,0 +1,98 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Query Cost Search Result
*/
export interface QueryCostSearchResult {
/**
* Overall statistics across all queries
*/
overallStats: OverallStats;
/**
* List of query groups with their metrics
*/
queryGroups: QueryGroup[];
[property: string]: any;
}
/**
* Overall statistics across all queries
*/
export interface OverallStats {
/**
* Average cost across all queries
*/
avgCost: number;
/**
* Maximum cost among all queries
*/
maxCost: number;
/**
* Minimum cost among all queries
*/
minCost: number;
/**
* Total cost across all queries
*/
totalCost: number;
/**
* Total number of query executions
*/
totalExecutionCount: number;
[property: string]: any;
}
export interface QueryGroup {
/**
* Average duration per query execution
*/
avgDuration: number;
/**
* Additional query details
*/
queryDetails: QueryDetails;
/**
* The text of the query
*/
queryText: string;
/**
* Total cost of all query executions
*/
totalCost: number;
/**
* Total number of query executions
*/
totalCount: number;
/**
* Total duration of all query executions
*/
totalDuration: number;
/**
* List of users who executed the query
*/
users: string[];
[property: string]: any;
}
/**
* Additional query details
*
* Details about the query
*/
export interface QueryDetails {
/**
* Query information
*/
query?: { [key: string]: any };
[property: string]: any;
}