diff --git a/ingestion/examples/sample_data/datasets/query_log b/ingestion/examples/sample_data/datasets/query_log index 6d98dc2032a..0185fc10675 100644 --- a/ingestion/examples/sample_data/datasets/query_log +++ b/ingestion/examples/sample_data/datasets/query_log @@ -1,24 +1,25 @@ -query,cost -"select * from shopify.raw_product_catalog",10 -"select * from shopify.raw_product_catalog",12 -"select comments, products from shopify.raw_product_catalog",2 -"select comments, products from shopify.raw_product_catalog",3 -"select comments, products from shopify.raw_product_catalog",9 -"select cust.customer_id, fact_order.order_id from dim_customer cust join fact_order on cust.customer_id = fact_order.customer_id",0.222 -"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.234 -"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.5 -"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.65 -"select case when net_sales > 100 then 'high' else 'low' end as map_sales from shopify.fact_sale",2 -"select ROW_NUMBER() OVER (PARTITION BY sale_id, customer_id) AS sale_row from shopify.fact_sale",5 -"select * from shopify.raw_customer",19 -"select * from shopify.raw_customer",19 -"select * from shopify.raw_customer",18 -"select * from shopify.raw_customer",17 -"select * from shopify.raw_customer",16 -"select * from shopify.raw_customer",20 -"select * from shopify.raw_customer",21 -"select * from shopify.raw_customer",22 -"select * from shopify.raw_customer",15 -"select * from shopify.raw_customer",12 -"create table shopify.dim_address_clean as select address_id, shop_id, first_name, last_name, address1 as address, company, city, region, zip, country, phone from shopify.dim_address",0.5 -"create table shopify.dim_address_clean as select address_id, shop_id, first_name, last_name, address1 as address, company, city, region, zip, country, phone from shopify.dim_address",0.5 \ No newline at end of file +query,cost,user +"select * from shopify.raw_product_catalog",10,admin +"select * from shopify.raw_product_catalog",12,admin +"select comments, products from shopify.raw_product_catalog",2, +"select comments, products from shopify.raw_product_catalog",3, +"select comments, products from shopify.raw_product_catalog",9, +"select cust.customer_id, fact_order.order_id from dim_customer cust join fact_order on cust.customer_id = fact_order.customer_id",0.222, +"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.234, +"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.5, +"select sale.sale_id, cust.customer_id, fact_order.order_ir from shopify.fact_sale sale join dim_customer cust on sale.customer_id = cust.customer_id join fact_order on fact_order.order_id = sale.order_id",0.65, +"select case when net_sales > 100 then 'high' else 'low' end as map_sales from shopify.fact_sale",2, +"select ROW_NUMBER() OVER (PARTITION BY sale_id, customer_id) AS sale_row from shopify.fact_sale",5, +"select * from shopify.raw_customer",19,admin +"select * from shopify.raw_customer",19,admin +"select * from shopify.raw_customer",18,admin +"select * from shopify.raw_customer",17,admin +"select * from shopify.raw_customer",16,admin +"select * from shopify.raw_customer",20,admin +"select * from shopify.raw_customer",21,admin +"select * from shopify.raw_customer",22,admin +"select * from shopify.raw_customer",15,admin +"select * from shopify.raw_customer",12,admin +"create table shopify.dim_address_clean as select address_id, shop_id, first_name, last_name, address1 as address, company, city, region, zip, country, phone from shopify.dim_address",0.5,admin +"create table shopify.dim_address_clean as select address_id, shop_id, first_name, last_name, address1 as address, company, city, region, zip, country, phone from shopify.dim_address",0.5,admin +"select * from shopify.raw_customer limit 1",12,admin \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/database/sample_usage.py b/ingestion/src/metadata/ingestion/source/database/sample_usage.py index 8e98d72d87b..e569e38a5b0 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_usage.py @@ -93,6 +93,7 @@ class SampleUsageSource(UsageSource): serviceName=self.config.serviceName, databaseSchema="shopify", cost=row.get("cost"), + userName=row.get("user"), ) for row in self.query_logs ] diff --git a/ingestion/tests/unit/test_sample_usage.py b/ingestion/tests/unit/test_sample_usage.py index 1bc68c50dd6..df87eb21ef2 100644 --- a/ingestion/tests/unit/test_sample_usage.py +++ b/ingestion/tests/unit/test_sample_usage.py @@ -77,7 +77,7 @@ class QueryParserTest(TestCase): "dim_customer": 4, "fact_order": 4, "shopify.fact_sale": 5, - "shopify.raw_customer": 10, + "shopify.raw_customer": 11, } config_dict = json.loads(config) config_dict["source"]["serviceConnection"]["config"]["connectionOptions"][ diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/QueryCostRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/QueryCostRepository.java index c579802cd22..7a13f60deac 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/QueryCostRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/QueryCostRepository.java @@ -1,6 +1,8 @@ package org.openmetadata.service.jdbi3; +import java.io.IOException; import org.openmetadata.schema.entity.data.QueryCostRecord; +import org.openmetadata.schema.entity.data.QueryCostSearchResult; import org.openmetadata.service.Entity; import org.openmetadata.service.resources.query.QueryCostResource; @@ -13,4 +15,8 @@ public class QueryCostRepository extends EntityTimeSeriesRepository { "users", USER, original.getUsers(), - updated.getUsers(), + updated.getUsers() == null ? new ArrayList<>() : updated.getUsers(), Relationship.USES, Entity.QUERY, original.getId()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryCostResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryCostResource.java index 8e3e15bee82..0fc71406cb0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryCostResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryCostResource.java @@ -6,6 +6,7 @@ import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; +import java.io.IOException; import java.util.UUID; import javax.validation.Valid; import javax.ws.rs.Consumes; @@ -21,8 +22,10 @@ import javax.ws.rs.core.SecurityContext; import javax.ws.rs.core.UriInfo; import org.openmetadata.schema.entity.data.CreateQueryCostRecord; import org.openmetadata.schema.entity.data.QueryCostRecord; +import org.openmetadata.schema.entity.data.QueryCostSearchResult; import org.openmetadata.schema.type.MetadataOperation; import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.QueryCostRepository; import org.openmetadata.service.resources.Collection; import org.openmetadata.service.resources.EntityTimeSeriesResource; @@ -106,4 +109,32 @@ public class QueryCostResource mapper.createToEntity(createQueryCostRecord, securityContext.getUserPrincipal().getName()); return create(queryCostRecord, queryCostRecord.getQueryReference().getFullyQualifiedName()); } + + @GET + @Path("/service/{serviceName}") + @Operation( + operationId = "getQueryCostByService", + summary = "Get Query Cost By Service", + description = "Get Query Cost By Service", + responses = { + @ApiResponse( + responseCode = "200", + description = "Create query cost record", + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = CreateQueryCostRecord.class))) + }) + public QueryCostSearchResult getQueryCostAggData( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @PathParam("serviceName") String serviceName) + throws IOException { + OperationContext operationContext = + new OperationContext(Entity.QUERY, MetadataOperation.VIEW_QUERIES); + ListFilter filter = new ListFilter(null); + ResourceContext resourceContext = filter.getResourceContext(Entity.QUERY); + authorizer.authorize(securityContext, operationContext, resourceContext); + return repository.getQueryCostAggData(serviceName); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryMapper.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryMapper.java index 581e8167978..cf5ea0b3d09 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryMapper.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryMapper.java @@ -14,6 +14,7 @@ import org.openmetadata.service.util.EntityUtil; public class QueryMapper implements EntityMapper { @Override public Query createToEntity(CreateQuery create, String user) { + return copy(new Query(), create, user) .withQuery(create.getQuery()) .withChecksum(EntityUtil.hash(create.getQuery())) @@ -23,6 +24,7 @@ public class QueryMapper implements EntityMapper { .withUsers(getEntityReferences(USER, create.getUsers())) .withQueryUsedIn(EntityUtil.populateEntityReferences(create.getQueryUsedIn())) .withQueryDate(create.getQueryDate()) + .withUsedBy(create.getUsedBy()) .withTriggeredBy(create.getTriggeredBy()) .withProcessedLineage(create.getProcessedLineage()); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index 26b730f5d00..8ca36a24e55 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -17,6 +17,7 @@ import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; +import org.openmetadata.schema.entity.data.QueryCostSearchResult; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.tests.DataQualityReport; import org.openmetadata.schema.type.EntityReference; @@ -348,4 +349,6 @@ public interface SearchClient { Object getClient(); SearchHealthStatus getSearchHealthStatus() throws IOException; + + QueryCostSearchResult getQueryCostRecords(String serviceName) throws IOException; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index d3a3447533a..176f53c5f3e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -72,6 +72,7 @@ import org.openmetadata.schema.api.lineage.SearchLineageRequest; import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.entity.classification.Tag; +import org.openmetadata.schema.entity.data.QueryCostSearchResult; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.tests.DataQualityReport; import org.openmetadata.schema.tests.TestSuite; @@ -1214,4 +1215,8 @@ public class SearchRepository { new ImmutablePair<>("entityRelationship.docId.keyword", relationDocId), new ImmutablePair<>(String.format(REMOVE_ENTITY_RELATIONSHIP, relationDocId), null)); } + + public QueryCostSearchResult getQueryCostRecords(String serviceName) throws IOException { + return searchClient.getQueryCostRecords(serviceName); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 54eeb70ca94..af2c9b7f1de 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -162,6 +162,7 @@ import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; import org.openmetadata.schema.dataInsight.custom.FormulaHolder; import org.openmetadata.schema.entity.data.EntityHierarchy; +import org.openmetadata.schema.entity.data.QueryCostSearchResult; import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.tests.DataQualityReport; @@ -200,6 +201,7 @@ import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.Elas import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.ElasticSearchMostViewedEntitiesAggregator; import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.ElasticSearchPageViewsByEntitiesAggregator; import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.ElasticSearchUnusedAssetsAggregator; +import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.QueryCostRecordsAggregator; import org.openmetadata.service.search.elasticsearch.queries.ElasticQueryBuilder; import org.openmetadata.service.search.elasticsearch.queries.ElasticQueryBuilderFactory; import org.openmetadata.service.search.indexes.APIEndpointIndex; @@ -2500,6 +2502,14 @@ public class ElasticSearchClient implements SearchClient { return null; } + public QueryCostSearchResult getQueryCostRecords(String serviceName) throws IOException { + QueryCostRecordsAggregator queryCostRecordsAggregator = new QueryCostRecordsAggregator(); + es.org.elasticsearch.action.search.SearchRequest searchRequest = + queryCostRecordsAggregator.getQueryCostRecords(serviceName); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + return queryCostRecordsAggregator.parseQueryCostResponse(searchResponse); + } + private static AggregationBuilder buildQueryAggregation( DataInsightChartResult.DataInsightChartType dataInsightChartName) throws IllegalArgumentException { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/dataInsightAggregators/QueryCostRecordsAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/dataInsightAggregators/QueryCostRecordsAggregator.java new file mode 100644 index 00000000000..f54f29f65cc --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/dataInsightAggregators/QueryCostRecordsAggregator.java @@ -0,0 +1,209 @@ +package org.openmetadata.service.search.elasticsearch.dataInsightAggregators; + +import es.org.elasticsearch.action.search.SearchAction; +import es.org.elasticsearch.action.search.SearchRequest; +import es.org.elasticsearch.action.search.SearchRequestBuilder; +import es.org.elasticsearch.action.search.SearchResponse; +import es.org.elasticsearch.index.query.BoolQueryBuilder; +import es.org.elasticsearch.index.query.QueryBuilders; +import es.org.elasticsearch.script.Script; +import es.org.elasticsearch.search.aggregations.AbstractAggregationBuilder; +import es.org.elasticsearch.search.aggregations.AggregationBuilders; +import es.org.elasticsearch.search.aggregations.BucketOrder; +import es.org.elasticsearch.search.aggregations.PipelineAggregatorBuilders; +import es.org.elasticsearch.search.aggregations.bucket.terms.Terms; +import es.org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import es.org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; +import es.org.elasticsearch.search.aggregations.metrics.Stats; +import es.org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; +import es.org.elasticsearch.search.aggregations.metrics.Sum; +import es.org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import es.org.elasticsearch.search.aggregations.metrics.TopHits; +import es.org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; +import es.org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.openmetadata.schema.entity.data.OverallStats; +import org.openmetadata.schema.entity.data.QueryCostSearchResult; +import org.openmetadata.schema.entity.data.QueryDetails; +import org.openmetadata.schema.entity.data.QueryGroup; +import org.openmetadata.schema.entity.data.QueryHolder; + +public class QueryCostRecordsAggregator { + + public static SearchRequest getQueryCostRecords(String serviceName) { + SearchRequest searchRequest; + AbstractAggregationBuilder aggregationBuilder; + // Create search request builder + SearchRequestBuilder searchRequestBuilder = + new SearchRequestBuilder(null, SearchAction.INSTANCE); + searchRequestBuilder.setSize(0); + + // Create query groups aggregation with size 10 and order by total_cost + TermsAggregationBuilder queryGroupsAgg = + AggregationBuilders.terms("query_groups") + .field("query.query") + .size(10) + .order(BucketOrder.aggregation("total_cost", false)); + + // Add sub-aggregations to query_groups + // Users aggregation + TermsAggregationBuilder usersAgg = + AggregationBuilders.terms("users").field("query.usedBy.keyword").size(10); + queryGroupsAgg.subAggregation(usersAgg); + + // Total cost aggregation + SumAggregationBuilder totalCostAgg = AggregationBuilders.sum("total_cost").field("cost"); + queryGroupsAgg.subAggregation(totalCostAgg); + + // Total count aggregation + SumAggregationBuilder totalCountAgg = AggregationBuilders.sum("total_count").field("count"); + queryGroupsAgg.subAggregation(totalCountAgg); + + // Total duration aggregation + SumAggregationBuilder totalDurationAgg = + AggregationBuilders.sum("total_duration").field("totalDuration"); + queryGroupsAgg.subAggregation(totalDurationAgg); + + // Average duration aggregation (bucket script) + Map bucketsPathMap = new HashMap<>(); + bucketsPathMap.put("total_duration", "total_duration"); + bucketsPathMap.put("total_count", "total_count"); + BucketScriptPipelineAggregationBuilder avgDurationAgg = + PipelineAggregatorBuilders.bucketScript( + "avg_duration", + bucketsPathMap, + new Script("params.total_duration / params.total_count")); + queryGroupsAgg.subAggregation(avgDurationAgg); + + // Query details aggregation (top hits) + TopHitsAggregationBuilder queryDetailsAgg = + AggregationBuilders.topHits("query_details") + .size(1) + .fetchSource(new String[] {"query.*"}, null); + queryGroupsAgg.subAggregation(queryDetailsAgg); + + // set query size to 10 + queryGroupsAgg.size(10); + queryGroupsAgg.order(BucketOrder.aggregation("total_cost", false)); + + // Overall totals aggregation + StatsAggregationBuilder overallTotalsAgg = + AggregationBuilders.stats("overall_totals").field("cost"); + + // Total execution count aggregation + SumAggregationBuilder totalExecutionCountAgg = + AggregationBuilders.sum("total_execution_count").field("count"); + + // Add all top-level aggregations to the search request + searchRequestBuilder.addAggregation(queryGroupsAgg); + searchRequestBuilder.addAggregation(overallTotalsAgg); + searchRequestBuilder.addAggregation(totalExecutionCountAgg); + + // If serviceName is provided, add a filter + if (serviceName != null && !serviceName.isEmpty()) { + BoolQueryBuilder boolQuery = + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("service.name.keyword", serviceName)); + searchRequestBuilder.setQuery(boolQuery); + } + + // Build the search request + searchRequest = searchRequestBuilder.request().indices("query_cost_record_search_index"); + + return searchRequest; + } + + public static QueryCostSearchResult parseQueryCostResponse(SearchResponse response) { + List queryGroups = new ArrayList<>(); + + // Get the query_groups aggregation + Terms queryGroupsAgg = response.getAggregations().get("query_groups"); + + // Process each query group + for (Terms.Bucket bucket : queryGroupsAgg.getBuckets()) { + String queryText = bucket.getKeyAsString(); + + // Get users + Terms usersAgg = bucket.getAggregations().get("users"); + List users = + usersAgg.getBuckets().stream() + .map(Terms.Bucket::getKeyAsString) + .collect(Collectors.toList()); + + // Get metrics + double totalCost = ((Sum) bucket.getAggregations().get("total_cost")).getValue(); + long totalCount = (long) ((Sum) bucket.getAggregations().get("total_count")).getValue(); + double totalDuration = ((Sum) bucket.getAggregations().get("total_duration")).getValue(); + + // Get avg_duration using a more generic approach + double avgDuration; + Object avgDurationAgg = bucket.getAggregations().get("avg_duration"); + if (avgDurationAgg instanceof NumericMetricsAggregation.SingleValue) { + // This should work for most implementations + avgDuration = ((NumericMetricsAggregation.SingleValue) avgDurationAgg).value(); + } else { + // Fallback: calculate it ourselves if the aggregation result can't be accessed + avgDuration = totalCount > 0 ? totalDuration / totalCount : 0; + } + + // Get query details + TopHits queryDetailsHits = bucket.getAggregations().get("query_details"); + Map detailsMap = queryDetailsHits.getHits().getHits()[0].getSourceAsMap(); + + // Create QueryDetails object + QueryDetails queryDetails = new QueryDetails(); + + // Extract query information if available + if (detailsMap.containsKey("query")) { + // Create a QueryHolder object instead of using the Map directly + QueryHolder query = new QueryHolder(); + @SuppressWarnings("unchecked") + Map queryMap = (Map) detailsMap.get("query"); + + // Add all properties from queryMap to the Query__1 object as additional properties + for (Map.Entry entry : queryMap.entrySet()) { + query.withAdditionalProperty(entry.getKey(), entry.getValue()); + } + + queryDetails.withQuery(query); + } + + // Add any other fields from detailsMap to queryDetails + for (Map.Entry entry : detailsMap.entrySet()) { + if (!entry.getKey().equals("query")) { + queryDetails.withAdditionalProperty(entry.getKey(), entry.getValue()); + } + } + + QueryGroup queryGroup = + new QueryGroup() + .withQueryText(queryText) + .withUsers(users) + .withTotalCost(totalCost) + .withTotalCount((int) totalCount) + .withTotalDuration(totalDuration) + .withAvgDuration(avgDuration) + .withQueryDetails(queryDetails); + + queryGroups.add(queryGroup); + } + + // Get overall stats + Stats overallTotals = response.getAggregations().get("overall_totals"); + Sum totalExecutionCount = response.getAggregations().get("total_execution_count"); + + OverallStats overallStats = + new OverallStats() + .withTotalCost(overallTotals.getSum()) + .withMinCost(overallTotals.getMin()) + .withMaxCost(overallTotals.getMax()) + .withAvgCost(overallTotals.getAvg()) + .withTotalExecutionCount((int) totalExecutionCount.getValue()); + + return new QueryCostSearchResult().withQueryGroups(queryGroups).withOverallStats(overallStats); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 1691e223f64..85114ab91ef 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -80,6 +80,7 @@ import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; import org.openmetadata.schema.dataInsight.custom.FormulaHolder; import org.openmetadata.schema.entity.data.EntityHierarchy; +import org.openmetadata.schema.entity.data.QueryCostSearchResult; import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.tests.DataQualityReport; @@ -138,6 +139,7 @@ import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSear import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSearchMostViewedEntitiesAggregator; import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSearchPageViewsByEntitiesAggregator; import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSearchUnusedAssetsAggregator; +import org.openmetadata.service.search.opensearch.dataInsightAggregator.QueryCostRecordsAggregator; import org.openmetadata.service.search.opensearch.queries.OpenSearchQueryBuilder; import org.openmetadata.service.search.opensearch.queries.OpenSearchQueryBuilderFactory; import org.openmetadata.service.search.queries.OMQueryBuilder; @@ -2771,4 +2773,14 @@ public class OpenSearchClient implements SearchClient { return new SearchHealthStatus(UNHEALTHY_STATUS); } } + + @Override + public QueryCostSearchResult getQueryCostRecords(String serviceName) throws IOException { + QueryCostRecordsAggregator queryCostRecordsAggregator = new QueryCostRecordsAggregator(); + os.org.opensearch.action.search.SearchRequest searchRequest = + queryCostRecordsAggregator.getQueryCostRecords(serviceName); + os.org.opensearch.action.search.SearchResponse searchResponse = + client.search(searchRequest, RequestOptions.DEFAULT); + return queryCostRecordsAggregator.parseQueryCostResponse(searchResponse); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/dataInsightAggregator/QueryCostRecordsAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/dataInsightAggregator/QueryCostRecordsAggregator.java new file mode 100644 index 00000000000..214c0974990 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/dataInsightAggregator/QueryCostRecordsAggregator.java @@ -0,0 +1,209 @@ +package org.openmetadata.service.search.opensearch.dataInsightAggregator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.openmetadata.schema.entity.data.OverallStats; +import org.openmetadata.schema.entity.data.QueryCostSearchResult; +import org.openmetadata.schema.entity.data.QueryDetails; +import org.openmetadata.schema.entity.data.QueryGroup; +import org.openmetadata.schema.entity.data.QueryHolder; +import os.org.opensearch.action.search.SearchAction; +import os.org.opensearch.action.search.SearchRequest; +import os.org.opensearch.action.search.SearchRequestBuilder; +import os.org.opensearch.action.search.SearchResponse; +import os.org.opensearch.index.query.BoolQueryBuilder; +import os.org.opensearch.index.query.QueryBuilders; +import os.org.opensearch.script.Script; +import os.org.opensearch.search.aggregations.AbstractAggregationBuilder; +import os.org.opensearch.search.aggregations.AggregationBuilders; +import os.org.opensearch.search.aggregations.BucketOrder; +import os.org.opensearch.search.aggregations.PipelineAggregatorBuilders; +import os.org.opensearch.search.aggregations.bucket.terms.Terms; +import os.org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import os.org.opensearch.search.aggregations.metrics.NumericMetricsAggregation; +import os.org.opensearch.search.aggregations.metrics.Stats; +import os.org.opensearch.search.aggregations.metrics.StatsAggregationBuilder; +import os.org.opensearch.search.aggregations.metrics.Sum; +import os.org.opensearch.search.aggregations.metrics.SumAggregationBuilder; +import os.org.opensearch.search.aggregations.metrics.TopHits; +import os.org.opensearch.search.aggregations.metrics.TopHitsAggregationBuilder; +import os.org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; + +public class QueryCostRecordsAggregator { + + public static SearchRequest getQueryCostRecords(String serviceName) { + SearchRequest searchRequest; + AbstractAggregationBuilder aggregationBuilder; + // Create search request builder + SearchRequestBuilder searchRequestBuilder = + new SearchRequestBuilder(null, SearchAction.INSTANCE); + searchRequestBuilder.setSize(0); + + // Create query groups aggregation with size 10 and order by total_cost + TermsAggregationBuilder queryGroupsAgg = + AggregationBuilders.terms("query_groups") + .field("query.query") + .size(10) + .order(BucketOrder.aggregation("total_cost", false)); + + // Add sub-aggregations to query_groups + // Users aggregation + TermsAggregationBuilder usersAgg = + AggregationBuilders.terms("users").field("query.usedBy.keyword").size(10); + queryGroupsAgg.subAggregation(usersAgg); + + // Total cost aggregation + SumAggregationBuilder totalCostAgg = AggregationBuilders.sum("total_cost").field("cost"); + queryGroupsAgg.subAggregation(totalCostAgg); + + // Total count aggregation + SumAggregationBuilder totalCountAgg = AggregationBuilders.sum("total_count").field("count"); + queryGroupsAgg.subAggregation(totalCountAgg); + + // Total duration aggregation + SumAggregationBuilder totalDurationAgg = + AggregationBuilders.sum("total_duration").field("totalDuration"); + queryGroupsAgg.subAggregation(totalDurationAgg); + + // Average duration aggregation (bucket script) + Map bucketsPathMap = new HashMap<>(); + bucketsPathMap.put("total_duration", "total_duration"); + bucketsPathMap.put("total_count", "total_count"); + BucketScriptPipelineAggregationBuilder avgDurationAgg = + PipelineAggregatorBuilders.bucketScript( + "avg_duration", + bucketsPathMap, + new Script("params.total_duration / params.total_count")); + queryGroupsAgg.subAggregation(avgDurationAgg); + + // Query details aggregation (top hits) + TopHitsAggregationBuilder queryDetailsAgg = + AggregationBuilders.topHits("query_details") + .size(1) + .fetchSource(new String[] {"query.*"}, null); + queryGroupsAgg.subAggregation(queryDetailsAgg); + + // set query size to 10 + queryGroupsAgg.size(10); + queryGroupsAgg.order(BucketOrder.aggregation("total_cost", false)); + + // Overall totals aggregation + StatsAggregationBuilder overallTotalsAgg = + AggregationBuilders.stats("overall_totals").field("cost"); + + // Total execution count aggregation + SumAggregationBuilder totalExecutionCountAgg = + AggregationBuilders.sum("total_execution_count").field("count"); + + // Add all top-level aggregations to the search request + searchRequestBuilder.addAggregation(queryGroupsAgg); + searchRequestBuilder.addAggregation(overallTotalsAgg); + searchRequestBuilder.addAggregation(totalExecutionCountAgg); + + // If serviceName is provided, add a filter + if (serviceName != null && !serviceName.isEmpty()) { + BoolQueryBuilder boolQuery = + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("service.name.keyword", serviceName)); + searchRequestBuilder.setQuery(boolQuery); + } + + // Build the search request + searchRequest = searchRequestBuilder.request().indices("query_cost_record_search_index"); + + return searchRequest; + } + + public static QueryCostSearchResult parseQueryCostResponse(SearchResponse response) { + List queryGroups = new ArrayList<>(); + + // Get the query_groups aggregation + Terms queryGroupsAgg = response.getAggregations().get("query_groups"); + + // Process each query group + for (Terms.Bucket bucket : queryGroupsAgg.getBuckets()) { + String queryText = bucket.getKeyAsString(); + + // Get users + Terms usersAgg = bucket.getAggregations().get("users"); + List users = + usersAgg.getBuckets().stream() + .map(Terms.Bucket::getKeyAsString) + .collect(Collectors.toList()); + + // Get metrics + double totalCost = ((Sum) bucket.getAggregations().get("total_cost")).getValue(); + long totalCount = (long) ((Sum) bucket.getAggregations().get("total_count")).getValue(); + double totalDuration = ((Sum) bucket.getAggregations().get("total_duration")).getValue(); + + // Get avg_duration using a more generic approach + double avgDuration; + Object avgDurationAgg = bucket.getAggregations().get("avg_duration"); + if (avgDurationAgg instanceof NumericMetricsAggregation.SingleValue) { + // This should work for most implementations + avgDuration = ((NumericMetricsAggregation.SingleValue) avgDurationAgg).value(); + } else { + // Fallback: calculate it ourselves if the aggregation result can't be accessed + avgDuration = totalCount > 0 ? totalDuration / totalCount : 0; + } + + // Get query details + TopHits queryDetailsHits = bucket.getAggregations().get("query_details"); + Map detailsMap = queryDetailsHits.getHits().getHits()[0].getSourceAsMap(); + + // Create QueryDetails object + QueryDetails queryDetails = new QueryDetails(); + + // Extract query information if available + if (detailsMap.containsKey("query")) { + // Create a QueryHolder object instead of using the Map directly + QueryHolder query = new QueryHolder(); + @SuppressWarnings("unchecked") + Map queryMap = (Map) detailsMap.get("query"); + + // Add all properties from queryMap to the Query__1 object as additional properties + for (Map.Entry entry : queryMap.entrySet()) { + query.withAdditionalProperty(entry.getKey(), entry.getValue()); + } + + queryDetails.withQuery(query); + } + + // Add any other fields from detailsMap to queryDetails + for (Map.Entry entry : detailsMap.entrySet()) { + if (!entry.getKey().equals("query")) { + queryDetails.withAdditionalProperty(entry.getKey(), entry.getValue()); + } + } + + QueryGroup queryGroup = + new QueryGroup() + .withQueryText(queryText) + .withUsers(users) + .withTotalCost(totalCost) + .withTotalCount((int) totalCount) + .withTotalDuration(totalDuration) + .withAvgDuration(avgDuration) + .withQueryDetails(queryDetails); + + queryGroups.add(queryGroup); + } + + // Get overall stats + Stats overallTotals = response.getAggregations().get("overall_totals"); + Sum totalExecutionCount = response.getAggregations().get("total_execution_count"); + + OverallStats overallStats = + new OverallStats() + .withTotalCost(overallTotals.getSum()) + .withMinCost(overallTotals.getMin()) + .withMaxCost(overallTotals.getMax()) + .withAvgCost(overallTotals.getAvg()) + .withTotalExecutionCount((int) totalExecutionCount.getValue()); + + return new QueryCostSearchResult().withQueryGroups(queryGroups).withOverallStats(overallStats); + } +} diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/queryCostSearchResult.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/queryCostSearchResult.json new file mode 100644 index 00000000000..1e0645edce0 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/queryCostSearchResult.json @@ -0,0 +1,103 @@ +{ + "$id": "https://open-metadata.org/schema/entity/data/queryCostSearchResult.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "QueryCostSearchResult", + "description": "Query Cost Search Result", + "type": "object", + "javaType": "org.openmetadata.schema.entity.data.QueryCostSearchResult", + "definitions": { + "queryDetails": { + "type": "object", + "javaType": "org.openmetadata.schema.entity.data.QueryDetails", + "properties": { + "query": { + "type": "object", + "javaType": "org.openmetadata.schema.entity.data.QueryHolder", + "additionalProperties": true, + "description": "Query information" + } + }, + "additionalProperties": true, + "description": "Details about the query" + }, + "queryGroup": { + "type": "object", + "javaType": "org.openmetadata.schema.entity.data.QueryGroup", + "properties": { + "queryText": { + "type": "string", + "description": "The text of the query" + }, + "users": { + "type": "array", + "items": { + "type": "string" + }, + "description": "List of users who executed the query" + }, + "totalCost": { + "type": "number", + "description": "Total cost of all query executions" + }, + "totalCount": { + "type": "integer", + "description": "Total number of query executions" + }, + "totalDuration": { + "type": "number", + "description": "Total duration of all query executions" + }, + "avgDuration": { + "type": "number", + "description": "Average duration per query execution" + }, + "queryDetails": { + "$ref": "#/definitions/queryDetails", + "description": "Additional query details" + } + }, + "required": ["queryText", "users", "totalCost", "totalCount", "totalDuration", "avgDuration", "queryDetails"] + }, + "overallStats": { + "type": "object", + "javaType": "org.openmetadata.schema.entity.data.OverallStats", + "properties": { + "totalCost": { + "type": "number", + "description": "Total cost across all queries" + }, + "minCost": { + "type": "number", + "description": "Minimum cost among all queries" + }, + "maxCost": { + "type": "number", + "description": "Maximum cost among all queries" + }, + "avgCost": { + "type": "number", + "description": "Average cost across all queries" + }, + "totalExecutionCount": { + "type": "integer", + "description": "Total number of query executions" + } + }, + "required": ["totalCost", "minCost", "maxCost", "avgCost", "totalExecutionCount"] + } + }, + "properties": { + "queryGroups": { + "type": "array", + "items": { + "$ref": "#/definitions/queryGroup" + }, + "description": "List of query groups with their metrics" + }, + "overallStats": { + "$ref": "#/definitions/overallStats", + "description": "Overall statistics across all queries" + } + }, + "required": ["queryGroups", "overallStats"] +} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/queryCostSearchResult.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/queryCostSearchResult.ts new file mode 100644 index 00000000000..0f6034f5830 --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/queryCostSearchResult.ts @@ -0,0 +1,98 @@ +/* + * Copyright 2025 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Query Cost Search Result + */ +export interface QueryCostSearchResult { + /** + * Overall statistics across all queries + */ + overallStats: OverallStats; + /** + * List of query groups with their metrics + */ + queryGroups: QueryGroup[]; + [property: string]: any; +} + +/** + * Overall statistics across all queries + */ +export interface OverallStats { + /** + * Average cost across all queries + */ + avgCost: number; + /** + * Maximum cost among all queries + */ + maxCost: number; + /** + * Minimum cost among all queries + */ + minCost: number; + /** + * Total cost across all queries + */ + totalCost: number; + /** + * Total number of query executions + */ + totalExecutionCount: number; + [property: string]: any; +} + +export interface QueryGroup { + /** + * Average duration per query execution + */ + avgDuration: number; + /** + * Additional query details + */ + queryDetails: QueryDetails; + /** + * The text of the query + */ + queryText: string; + /** + * Total cost of all query executions + */ + totalCost: number; + /** + * Total number of query executions + */ + totalCount: number; + /** + * Total duration of all query executions + */ + totalDuration: number; + /** + * List of users who executed the query + */ + users: string[]; + [property: string]: any; +} + +/** + * Additional query details + * + * Details about the query + */ +export interface QueryDetails { + /** + * Query information + */ + query?: { [key: string]: any }; + [property: string]: any; +}