mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-05 20:17:07 +00:00
[GEN-976] Generic Aggregation Endpoint for Data Quality Reports (#17181)
* fix: added parser for aggregation queryParams * feat: added generic aggregation logic * feat: added DQ generic aggregation endpoint * fix: typo in resources description
This commit is contained in:
parent
0e3cf02618
commit
ab7b1a444c
@ -9,8 +9,10 @@ import static org.openmetadata.service.Entity.TEST_CASE;
|
||||
import static org.openmetadata.service.Entity.TEST_SUITE;
|
||||
import static org.openmetadata.service.util.FullyQualifiedName.quoteName;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import javax.json.JsonArray;
|
||||
@ -20,6 +22,7 @@ import javax.ws.rs.core.SecurityContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.schema.entity.data.Table;
|
||||
import org.openmetadata.schema.tests.DataQualityReport;
|
||||
import org.openmetadata.schema.tests.ResultSummary;
|
||||
import org.openmetadata.schema.tests.TestSuite;
|
||||
import org.openmetadata.schema.tests.type.ColumnTestSummaryDefinition;
|
||||
@ -31,6 +34,7 @@ import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.resources.dqtests.TestSuiteResource;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.search.SearchClient;
|
||||
import org.openmetadata.service.search.SearchIndexUtils;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
@ -226,6 +230,12 @@ public class TestSuiteRepository extends EntityRepository<TestSuite> {
|
||||
.orElse(testSummary);
|
||||
}
|
||||
|
||||
public DataQualityReport getDataQualityReport(String q, String aggQuery, String index)
|
||||
throws IOException {
|
||||
Map<String, Object> aggregationString = SearchIndexUtils.buildAggregationString(aggQuery);
|
||||
return searchRepository.genericAggregation(q, index, aggregationString);
|
||||
}
|
||||
|
||||
public TestSummary getTestSummary(UUID testSuiteId) {
|
||||
JsonObject aggregationJson = JsonUtils.readJson(EXECUTION_SUMMARY_AGGS).asJsonObject();
|
||||
try {
|
||||
|
||||
@ -91,7 +91,7 @@ public class TestCaseResource extends EntityResource<TestCase, TestCaseRepositor
|
||||
|
||||
static final String FIELDS = "owner,testSuite,testDefinition,testSuites,incidentId,domain,tags";
|
||||
static final String SEARCH_FIELDS_EXCLUDE =
|
||||
"testPlatforms,table,database,databaseSchema,service,testSuite,dataQualityDimension";
|
||||
"testPlatforms,table,database,databaseSchema,service,testSuite,dataQualityDimension,testCaseType";
|
||||
|
||||
@Override
|
||||
public TestCase addHref(UriInfo uriInfo, TestCase test) {
|
||||
|
||||
@ -16,6 +16,7 @@ import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import javax.json.JsonPatch;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.validation.Valid;
|
||||
import javax.validation.constraints.Max;
|
||||
import javax.validation.constraints.Min;
|
||||
@ -40,6 +41,7 @@ import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.data.RestoreEntity;
|
||||
import org.openmetadata.schema.api.tests.CreateTestSuite;
|
||||
import org.openmetadata.schema.entity.data.Table;
|
||||
import org.openmetadata.schema.tests.DataQualityReport;
|
||||
import org.openmetadata.schema.tests.TestSuite;
|
||||
import org.openmetadata.schema.tests.type.TestSummary;
|
||||
import org.openmetadata.schema.type.EntityHistory;
|
||||
@ -460,6 +462,7 @@ public class TestSuiteResource extends EntityResource<TestSuite, TestSuiteReposi
|
||||
public TestSummary getTestsExecutionSummary(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Context HttpServletResponse response,
|
||||
@Parameter(
|
||||
description = "get summary for a specific test suite",
|
||||
schema = @Schema(type = "String", format = "uuid"))
|
||||
@ -469,9 +472,64 @@ public class TestSuiteResource extends EntityResource<TestSuite, TestSuiteReposi
|
||||
OperationContext operationContext =
|
||||
new OperationContext(Entity.TABLE, MetadataOperation.VIEW_TESTS);
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
// Set the deprecation header based on draft specification from IETF
|
||||
// https://datatracker.ietf.org/doc/html/draft-ietf-httpapi-deprecation-header-02
|
||||
response.setHeader("Deprecation", "Monday, October 30, 2024");
|
||||
response.setHeader(
|
||||
"Link", "api/v1/dataQuality/testSuites/dataQualityReport; rel=\"alternate\"");
|
||||
return repository.getTestSummary(testSuiteId);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/dataQualityReport")
|
||||
@Operation(
|
||||
operationId = "getDataQualityReport",
|
||||
summary = "Get Data Quality Report",
|
||||
description =
|
||||
"""
|
||||
Use the search service to perform data quality aggregation. You can use the `q` parameter to filter the results.
|
||||
the `aggregationQuery` is of the form `bucketName=<bucketName>:aggType=<aggType>:field=<field>`. You can sperate aggregation
|
||||
query with a comma `,` to perform nested aggregations.
|
||||
For example, `bucketName=table:aggType=terms:field=databaseName,bucketName=<bucketName>:aggType=<aggType>:field=<field>`
|
||||
""",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "Data Quality Report Results",
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = DataQualityReport.class)))
|
||||
})
|
||||
public DataQualityReport getDataQualityReport(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(
|
||||
description = "Search query to filter the aggregation results",
|
||||
schema = @Schema(type = "String"))
|
||||
@QueryParam("q")
|
||||
String query,
|
||||
@Parameter(
|
||||
description = "Aggregation query to perform aggregation on the search results",
|
||||
schema = @Schema(type = "String"))
|
||||
@QueryParam("aggregationQuery")
|
||||
String aggregationQuery,
|
||||
@Parameter(
|
||||
description = "Index to perform the aggregation against",
|
||||
schema = @Schema(type = "String"))
|
||||
@QueryParam("index")
|
||||
String index)
|
||||
throws IOException {
|
||||
ResourceContext<?> resourceContext = getResourceContext();
|
||||
OperationContext operationContext =
|
||||
new OperationContext(entityType, MetadataOperation.VIEW_TESTS);
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
if (nullOrEmpty(aggregationQuery) || nullOrEmpty(index)) {
|
||||
throw new IllegalArgumentException("aggregationQuery and index are required parameters");
|
||||
}
|
||||
return repository.getDataQualityReport(query, aggregationQuery, index);
|
||||
}
|
||||
|
||||
@POST
|
||||
@Operation(
|
||||
operationId = "createLogicalTestSuite",
|
||||
|
||||
@ -18,6 +18,7 @@ import lombok.Getter;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
|
||||
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
|
||||
import org.openmetadata.schema.tests.DataQualityReport;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.service.exception.CustomExceptionMessage;
|
||||
import org.openmetadata.service.search.models.IndexMapping;
|
||||
@ -122,6 +123,9 @@ public interface SearchClient {
|
||||
|
||||
JsonObject aggregate(String query, String index, JsonObject aggregationJson) throws IOException;
|
||||
|
||||
DataQualityReport genericAggregation(
|
||||
String query, String index, Map<String, Object> aggregationMetadata) throws IOException;
|
||||
|
||||
Response suggest(SearchRequest request) throws IOException;
|
||||
|
||||
void createEntity(String indexName, String docId, String doc);
|
||||
|
||||
@ -1,11 +1,19 @@
|
||||
package org.openmetadata.service.search;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.json.JsonArray;
|
||||
import javax.json.JsonObject;
|
||||
import org.openmetadata.schema.tests.DataQualityReport;
|
||||
import org.openmetadata.schema.tests.Datum;
|
||||
import org.openmetadata.schema.tests.type.DataQualityReportMetadata;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
|
||||
@ -55,6 +63,223 @@ public final class SearchIndexUtils {
|
||||
currentMap.remove(lastKey);
|
||||
}
|
||||
|
||||
private static void handleLeafTermsAggregation(
|
||||
JsonObject aggregationResults, List<Datum> reportData, Map<String, String> nodeData) {
|
||||
Optional<String> docCount = Optional.ofNullable(aggregationResults.get("doc_count").toString());
|
||||
docCount.ifPresentOrElse(
|
||||
s -> nodeData.put("document_count", s), () -> nodeData.put("document_count", null));
|
||||
|
||||
Datum datum = new Datum();
|
||||
for (Map.Entry<String, String> entry : nodeData.entrySet()) {
|
||||
datum.withAdditionalProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
reportData.add(datum);
|
||||
}
|
||||
|
||||
private static void handleLeafMetricsAggregation(
|
||||
JsonObject aggregationResults,
|
||||
List<Datum> reportData,
|
||||
Map<String, String> nodeData,
|
||||
String metric) {
|
||||
Optional<String> val = Optional.ofNullable(aggregationResults.getString("value_as_string"));
|
||||
val.ifPresentOrElse(s -> nodeData.put(metric, s), () -> nodeData.put(metric, null));
|
||||
|
||||
Datum datum = new Datum();
|
||||
for (Map.Entry<String, String> entry : nodeData.entrySet()) {
|
||||
datum.withAdditionalProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
reportData.add(datum);
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the metadata for the aggregation results. We'll use the metadata to build the report and
|
||||
* to traverse the aggregation tree. 3 types of metadata are returned:
|
||||
* 1. dimensions: the list of dimensions
|
||||
* 2. metrics: the list of metrics
|
||||
* 3. keys: the list of keys to traverse the aggregation tree
|
||||
*
|
||||
* @param aggregationMapList the list of aggregations
|
||||
* @return the metadata
|
||||
*/
|
||||
private static DataQualityReportMetadata getAggregationMetadata(
|
||||
List<List<Map<String, String>>> aggregationMapList) {
|
||||
DataQualityReportMetadata metadata = new DataQualityReportMetadata();
|
||||
List<String> dimensions = new ArrayList<>();
|
||||
List<String> metrics = new ArrayList<>();
|
||||
List<String> keys = new ArrayList<>();
|
||||
|
||||
for (List<Map<String, String>> aggregationsMap : aggregationMapList) {
|
||||
for (int j = 0; j < aggregationsMap.size(); j++) {
|
||||
Map<String, String> aggregationMap = aggregationsMap.get(j);
|
||||
String aggType = aggregationMap.get("aggType");
|
||||
String field = aggregationMap.get("field");
|
||||
|
||||
boolean isLeaf = j == aggregationsMap.size() - 1;
|
||||
if (isLeaf) {
|
||||
// leaf aggregation
|
||||
if (!aggType.contains("term")) {
|
||||
metrics.add(field);
|
||||
} else {
|
||||
dimensions.add(field);
|
||||
metrics.add("document_count");
|
||||
}
|
||||
} else {
|
||||
dimensions.add(field);
|
||||
}
|
||||
String formattedAggType = aggType.contains("term") ? "s%s".formatted(aggType) : aggType;
|
||||
keys.add("%s#%s".formatted(formattedAggType, aggregationMap.get("bucketName")));
|
||||
}
|
||||
}
|
||||
|
||||
metadata.withKeys(keys).withDimensions(dimensions).withMetrics(metrics);
|
||||
|
||||
return metadata;
|
||||
}
|
||||
|
||||
/*
|
||||
* Traverse the aggregation results and build the report data. Note that the method supports
|
||||
* n levels of nested aggregations, but does not support sibling aggregations.
|
||||
*
|
||||
* @Param aggregationResults the aggregation results
|
||||
* @Param reportData the report data
|
||||
* @Param nodeData the node data
|
||||
* @Param keys the keys to traverse the aggregation tree
|
||||
* @Param metric the metric to add to the report data
|
||||
* @Param dimensions the dimensions to add to the report data
|
||||
* @return the report data
|
||||
*/
|
||||
private static void traverseAggregationResults(
|
||||
JsonObject aggregationResults,
|
||||
List<Datum> reportData,
|
||||
Map<String, String> nodeData,
|
||||
List<String> keys,
|
||||
String metric,
|
||||
List<String> dimensions) {
|
||||
|
||||
if (keys.isEmpty()) {
|
||||
// We are in the leaf of the term aggregation. We'll add the count of documents as the metric
|
||||
handleLeafTermsAggregation(aggregationResults, reportData, nodeData);
|
||||
return;
|
||||
}
|
||||
|
||||
String currentKey =
|
||||
keys.get(0); // The current key represent the node in the aggregation tree (i.e. the current
|
||||
// bucket)
|
||||
Optional<JsonObject> aggregation =
|
||||
Optional.ofNullable(SearchClient.getAggregationObject(aggregationResults, currentKey));
|
||||
|
||||
aggregation.ifPresent(
|
||||
agg -> {
|
||||
Optional<JsonArray> buckets =
|
||||
Optional.ofNullable(SearchClient.getAggregationBuckets(agg));
|
||||
if (buckets.isEmpty()) {
|
||||
// If the current node in the aggregation tree does not have further bucket
|
||||
// it means we are in the leaf of the metric aggregation. We'll add the metric
|
||||
handleLeafMetricsAggregation(agg, reportData, nodeData, metric);
|
||||
} else {
|
||||
buckets
|
||||
.get()
|
||||
.forEach(
|
||||
bucket -> {
|
||||
JsonObject bucketObject = (JsonObject) bucket;
|
||||
Optional<String> bucketKey = Optional.of(bucketObject.getString("key"));
|
||||
|
||||
bucketKey.ifPresentOrElse(
|
||||
s -> nodeData.put(dimensions.get(0), s),
|
||||
() -> nodeData.put(dimensions.get(0), null));
|
||||
|
||||
// Traverse the next level of the aggregation tree.
|
||||
// Dimensions and keys represent the same level in the tree.
|
||||
// They are used for different purpose (i.e. dimensions are used to
|
||||
// generate the report while the keys are used to traverse the aggregation
|
||||
// tree)
|
||||
traverseAggregationResults(
|
||||
bucketObject,
|
||||
reportData,
|
||||
nodeData,
|
||||
keys.subList(1, keys.size()),
|
||||
metric,
|
||||
dimensions.subList(1, dimensions.size()));
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static DataQualityReport parseAggregationResults(
|
||||
Optional<JsonObject> aggregationResults, List<List<Map<String, String>>> aggregationMapList) {
|
||||
DataQualityReportMetadata metadata = getAggregationMetadata(aggregationMapList);
|
||||
List<Datum> reportData = new ArrayList<>();
|
||||
|
||||
aggregationResults.ifPresent(
|
||||
jsonObject ->
|
||||
traverseAggregationResults(
|
||||
jsonObject,
|
||||
reportData,
|
||||
new HashMap<>(),
|
||||
metadata.getKeys(),
|
||||
metadata.getMetrics().get(0),
|
||||
metadata.getDimensions()));
|
||||
|
||||
DataQualityReport report = new DataQualityReport();
|
||||
return report.withMetadata(metadata).withData(reportData);
|
||||
}
|
||||
|
||||
/*
|
||||
* Build the aggregation string for the given aggregation
|
||||
*
|
||||
* @param aggregation the aggregation to build the string for.
|
||||
* The aggregation string is in the form
|
||||
* `bucketName:aggType:key=value,bucketName:aggType:key=value;bucketName:aggType:key=value`
|
||||
* where `,` represents a nested aggregation and `;` represents a sibling aggregation
|
||||
* NOTE: As of 07/25/2024 sibling aggregation parsing and processing has not been added
|
||||
* @return the aggregation string
|
||||
*/
|
||||
public static Map<String, Object> buildAggregationString(String aggregation) {
|
||||
Map<String, Object> metadata = new HashMap<>();
|
||||
|
||||
StringBuilder aggregationString = new StringBuilder();
|
||||
String[] siblings = aggregation.split(";");
|
||||
List<List<Map<String, String>>> aggregationsMapList = new ArrayList<>();
|
||||
|
||||
for (String sibling : siblings) {
|
||||
List<Map<String, String>> aggregationsMap = new ArrayList<>();
|
||||
String[] nested = sibling.split(",");
|
||||
for (int i = 0; i < nested.length; i++) {
|
||||
Map<String, String> aggregationMap = new HashMap<>();
|
||||
String[] parts = nested[i].split(":");
|
||||
|
||||
for (int j = 0; j < parts.length; j++) {
|
||||
String part = parts[j];
|
||||
String[] kvPairs = part.split("=");
|
||||
if (kvPairs[0].equals("field")) {
|
||||
aggregationString
|
||||
.append("\"")
|
||||
.append(kvPairs[0])
|
||||
.append("\":\"")
|
||||
.append(kvPairs[1])
|
||||
.append("\"");
|
||||
aggregationString.append("}");
|
||||
} else {
|
||||
aggregationString.append("\"").append(kvPairs[1]).append("\":{");
|
||||
}
|
||||
aggregationMap.put(kvPairs[0], kvPairs[1]);
|
||||
}
|
||||
if (i < nested.length - 1) {
|
||||
aggregationString.append(",\"aggs\":{");
|
||||
}
|
||||
aggregationsMap.add(aggregationMap);
|
||||
}
|
||||
// nested aggregations will add the "aggs" key if nested.length > 1, hence *2
|
||||
aggregationString.append("}".repeat(((nested.length - 1) * 2) + 1));
|
||||
aggregationsMapList.add(aggregationsMap);
|
||||
}
|
||||
metadata.put("aggregationStr", aggregationString.toString());
|
||||
metadata.put("aggregationMapList", aggregationsMapList);
|
||||
return metadata;
|
||||
}
|
||||
|
||||
public static List<TagLabel> parseTags(List<TagLabel> tags) {
|
||||
if (tags == null) {
|
||||
return Collections.emptyList();
|
||||
|
||||
@ -56,6 +56,7 @@ import org.openmetadata.schema.EntityTimeSeriesInterface;
|
||||
import org.openmetadata.schema.analytics.ReportData;
|
||||
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
|
||||
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
|
||||
import org.openmetadata.schema.tests.DataQualityReport;
|
||||
import org.openmetadata.schema.tests.TestSuite;
|
||||
import org.openmetadata.schema.type.ChangeDescription;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
@ -765,6 +766,11 @@ public class SearchRepository {
|
||||
return searchClient.aggregate(query, index, aggregationJson);
|
||||
}
|
||||
|
||||
public DataQualityReport genericAggregation(
|
||||
String query, String index, Map<String, Object> aggregationMetadata) throws IOException {
|
||||
return searchClient.genericAggregation(query, index, aggregationMetadata);
|
||||
}
|
||||
|
||||
public Response suggest(SearchRequest request) throws IOException {
|
||||
return searchClient.suggest(request);
|
||||
}
|
||||
|
||||
@ -89,6 +89,7 @@ import es.org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramIn
|
||||
import es.org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude;
|
||||
import es.org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import es.org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
||||
import es.org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
|
||||
import es.org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
|
||||
import es.org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
|
||||
import es.org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
@ -117,6 +118,7 @@ import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -138,6 +140,7 @@ import org.openmetadata.schema.DataInsightInterface;
|
||||
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
|
||||
import org.openmetadata.schema.entity.data.EntityHierarchy__1;
|
||||
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
|
||||
import org.openmetadata.schema.tests.DataQualityReport;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.sdk.exception.SearchException;
|
||||
@ -146,6 +149,7 @@ import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface;
|
||||
import org.openmetadata.service.jdbi3.DataInsightChartRepository;
|
||||
import org.openmetadata.service.search.SearchClient;
|
||||
import org.openmetadata.service.search.SearchIndexUtils;
|
||||
import org.openmetadata.service.search.SearchRequest;
|
||||
import org.openmetadata.service.search.SearchSortFilter;
|
||||
import org.openmetadata.service.search.UpdateSearchEventsConstant;
|
||||
@ -955,6 +959,12 @@ public class ElasticSearchClient implements SearchClient {
|
||||
AggregationBuilders.terms(key).field(termAggregation.getString("field"));
|
||||
aggregationBuilders.add(termsAggregationBuilder);
|
||||
break;
|
||||
case "avg":
|
||||
JsonObject avgAggregation = aggregation.getJsonObject(aggregationType);
|
||||
AvgAggregationBuilder avgAggregationBuilder =
|
||||
AggregationBuilders.avg(key).field(avgAggregation.getString("field"));
|
||||
aggregationBuilders.add(avgAggregationBuilder);
|
||||
break;
|
||||
case "nested":
|
||||
JsonObject nestedAggregation = aggregation.getJsonObject("nested");
|
||||
AggregationBuilder nestedAggregationBuilder =
|
||||
@ -992,6 +1002,45 @@ public class ElasticSearchClient implements SearchClient {
|
||||
return aggregationBuilders;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataQualityReport genericAggregation(
|
||||
String query, String index, Map<String, Object> aggregationMetadata) throws IOException {
|
||||
String aggregationStr = (String) aggregationMetadata.get("aggregationStr");
|
||||
JsonObject aggregationObj = JsonUtils.readJson("{%s}".formatted(aggregationStr)).asJsonObject();
|
||||
List<AggregationBuilder> aggregationBuilder = buildAggregation(aggregationObj);
|
||||
|
||||
// Create search request
|
||||
es.org.elasticsearch.action.search.SearchRequest searchRequest =
|
||||
new es.org.elasticsearch.action.search.SearchRequest(
|
||||
Entity.getSearchRepository().getIndexOrAliasName(index));
|
||||
|
||||
// Create search source builder
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
if (query != null) {
|
||||
XContentParser queryParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, query);
|
||||
QueryBuilder parsedQuery = SearchSourceBuilder.fromXContent(queryParser).query();
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().must(parsedQuery);
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
}
|
||||
searchSourceBuilder.size(0).timeout(new TimeValue(30, TimeUnit.SECONDS));
|
||||
|
||||
for (AggregationBuilder aggregation : aggregationBuilder) {
|
||||
searchSourceBuilder.aggregation(aggregation);
|
||||
}
|
||||
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
String response = client.search(searchRequest, RequestOptions.DEFAULT).toString();
|
||||
JsonObject jsonResponse = JsonUtils.readJson(response).asJsonObject();
|
||||
Optional<JsonObject> aggregationResults =
|
||||
Optional.ofNullable(jsonResponse.getJsonObject("aggregations"));
|
||||
return SearchIndexUtils.parseAggregationResults(
|
||||
aggregationResults,
|
||||
(List<List<Map<String, String>>>) aggregationMetadata.get("aggregationMapList"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonObject aggregate(String query, String index, JsonObject aggregationJson)
|
||||
throws IOException {
|
||||
|
||||
@ -54,6 +54,7 @@ public record TestCaseIndex(TestCase testCase) implements SearchIndex {
|
||||
doc.put("testPlatforms", testDefinition.getTestPlatforms());
|
||||
doc.put("dataQualityDimension", testDefinition.getDataQualityDimension());
|
||||
doc.put("followers", SearchIndexUtils.parseFollowers(testCase.getFollowers()));
|
||||
doc.put("testCaseType", testDefinition.getEntityType());
|
||||
setParentRelationships(doc, testCase);
|
||||
return doc;
|
||||
}
|
||||
|
||||
@ -43,6 +43,7 @@ import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -64,6 +65,7 @@ import org.openmetadata.schema.DataInsightInterface;
|
||||
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
|
||||
import org.openmetadata.schema.entity.data.EntityHierarchy__1;
|
||||
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
|
||||
import org.openmetadata.schema.tests.DataQualityReport;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.sdk.exception.SearchException;
|
||||
@ -72,6 +74,7 @@ import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface;
|
||||
import org.openmetadata.service.jdbi3.DataInsightChartRepository;
|
||||
import org.openmetadata.service.search.SearchClient;
|
||||
import org.openmetadata.service.search.SearchIndexUtils;
|
||||
import org.openmetadata.service.search.SearchRequest;
|
||||
import org.openmetadata.service.search.SearchSortFilter;
|
||||
import org.openmetadata.service.search.indexes.ContainerIndex;
|
||||
@ -1006,6 +1009,45 @@ public class OpenSearchClient implements SearchClient {
|
||||
return aggregationBuilders;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataQualityReport genericAggregation(
|
||||
String query, String index, Map<String, Object> aggregationMetadata) throws IOException {
|
||||
String aggregationStr = (String) aggregationMetadata.get("aggregationStr");
|
||||
JsonObject aggregationObj = JsonUtils.readJson("{%s}".formatted(aggregationStr)).asJsonObject();
|
||||
List<AggregationBuilder> aggregationBuilder = buildAggregation(aggregationObj);
|
||||
|
||||
// Create search request
|
||||
os.org.opensearch.action.search.SearchRequest searchRequest =
|
||||
new os.org.opensearch.action.search.SearchRequest(
|
||||
Entity.getSearchRepository().getIndexOrAliasName(index));
|
||||
|
||||
// Create search source builder
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
if (query != null) {
|
||||
XContentParser queryParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, query);
|
||||
QueryBuilder parsedQuery = SearchSourceBuilder.fromXContent(queryParser).query();
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().must(parsedQuery);
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
}
|
||||
searchSourceBuilder.size(0).timeout(new TimeValue(30, TimeUnit.SECONDS));
|
||||
|
||||
for (AggregationBuilder aggregation : aggregationBuilder) {
|
||||
searchSourceBuilder.aggregation(aggregation);
|
||||
}
|
||||
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
String response = client.search(searchRequest, RequestOptions.DEFAULT).toString();
|
||||
JsonObject jsonResponse = JsonUtils.readJson(response).asJsonObject();
|
||||
Optional<JsonObject> aggregationResults =
|
||||
Optional.ofNullable(jsonResponse.getJsonObject("aggregations"));
|
||||
return SearchIndexUtils.parseAggregationResults(
|
||||
aggregationResults,
|
||||
(List<List<Map<String, String>>>) aggregationMetadata.get("aggregationMapList"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonObject aggregate(String query, String index, JsonObject aggregationJson)
|
||||
throws IOException {
|
||||
|
||||
@ -152,20 +152,14 @@
|
||||
"description": {
|
||||
"type": "text"
|
||||
},
|
||||
"entityType": {
|
||||
"type": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"testPlatforms": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"testCaseType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"entityType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
|
||||
@ -175,20 +175,14 @@
|
||||
"type": "text",
|
||||
"analyzer": "om_analyzer_jp"
|
||||
},
|
||||
"entityType": {
|
||||
"type": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"testPlatforms": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"testCaseType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"entityType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
|
||||
@ -198,20 +198,14 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"entityType": {
|
||||
"type": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"testPlatforms": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"testCaseType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"entityType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
|
||||
@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOf;
|
||||
import static org.openmetadata.service.Entity.FIELD_OWNER;
|
||||
import static org.openmetadata.service.Entity.TAG;
|
||||
import static org.openmetadata.service.Entity.getSearchRepository;
|
||||
import static org.openmetadata.service.util.EntityUtil.fieldAdded;
|
||||
import static org.openmetadata.service.util.EntityUtil.fieldUpdated;
|
||||
import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS;
|
||||
@ -49,6 +50,7 @@ import org.openmetadata.schema.api.data.CreateSearchIndex;
|
||||
import org.openmetadata.schema.api.services.CreateSearchService;
|
||||
import org.openmetadata.schema.entity.data.SearchIndex;
|
||||
import org.openmetadata.schema.entity.services.SearchService;
|
||||
import org.openmetadata.schema.tests.DataQualityReport;
|
||||
import org.openmetadata.schema.type.ChangeDescription;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.SearchIndexDataType;
|
||||
@ -59,6 +61,8 @@ import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.resources.EntityResourceTest;
|
||||
import org.openmetadata.service.resources.services.SearchServiceResourceTest;
|
||||
import org.openmetadata.service.search.SearchIndexUtils;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
import org.openmetadata.service.util.TestUtils;
|
||||
@ -408,6 +412,107 @@ public class SearchIndexResourceTest extends EntityResourceTest<SearchIndex, Cre
|
||||
listEntities(null, ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testBuildAggregationString(TestInfo testInfo) {
|
||||
String aggregationString = "bucketName=my-agg-name:aggType=terms:field=my-field";
|
||||
String expectedAggregationString = "\"my-agg-name\":{\"terms\":{\"field\":\"my-field\"}}";
|
||||
Map<String, Object> actualAggregationstring =
|
||||
SearchIndexUtils.buildAggregationString(aggregationString);
|
||||
assertEquals(expectedAggregationString, actualAggregationstring.get("aggregationStr"));
|
||||
|
||||
// Nested Aggregation (1 level)
|
||||
aggregationString =
|
||||
"bucketName=entityLinks:aggType=terms:field=entityLinks.nonNormalized,bucketName=status_counts:aggType=terms:field=testCaseResults.testCaseStatus";
|
||||
expectedAggregationString =
|
||||
"\"entityLinks\":{\"terms\":{\"field\":\"entityLinks.nonNormalized\"},\"aggs\":{\"status_counts\":{\"terms\":{\"field\":\"testCaseResults.testCaseStatus\"}}}}";
|
||||
actualAggregationstring = SearchIndexUtils.buildAggregationString(aggregationString);
|
||||
assertEquals(expectedAggregationString, actualAggregationstring.get("aggregationStr"));
|
||||
|
||||
// Nested Aggregation (2 levels)
|
||||
aggregationString =
|
||||
"bucketName=entityLinks:aggType=terms:field=entityLinks.nonNormalized,bucketName=statusCount:aggType=terms:field=testCaseResults.testCaseStatus,bucketName=owner:aggType=terms:field=testSuite.owner";
|
||||
expectedAggregationString =
|
||||
"\"entityLinks\":{\"terms\":{\"field\":\"entityLinks.nonNormalized\"},\"aggs\":{\"statusCount\":{\"terms\":{\"field\":\"testCaseResults.testCaseStatus\"},\"aggs\":{\"owner\":{\"terms\":{\"field\":\"testSuite.owner\"}}}}}}";
|
||||
actualAggregationstring = SearchIndexUtils.buildAggregationString(aggregationString);
|
||||
assertEquals(expectedAggregationString, actualAggregationstring.get("aggregationStr"));
|
||||
|
||||
// Metric Aggregation
|
||||
aggregationString =
|
||||
"bucketName=entityLinks:aggType=terms:field=entityLinks.nonNormalized,bucketName=minPrice:aggType=min:field=price.adjusted";
|
||||
actualAggregationstring = SearchIndexUtils.buildAggregationString(aggregationString);
|
||||
expectedAggregationString =
|
||||
"\"entityLinks\":{\"terms\":{\"field\":\"entityLinks.nonNormalized\"},\"aggs\":{\"minPrice\":{\"min\":{\"field\":\"price.adjusted\"}}}}";
|
||||
assertEquals(expectedAggregationString, actualAggregationstring.get("aggregationStr"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testNewAggregation(TestInfo testInfo) throws IOException {
|
||||
DataQualityReport dataQualityReport = new DataQualityReport();
|
||||
SearchRepository searchRepository = getSearchRepository();
|
||||
String query =
|
||||
"{\"query\":{\"bool\":{\"should\":[{\"wildcard\":{\"fullyQualifiedName\":{\"value\":\"*tableForExecutableTestSuite\"}}},{\"wildcard\":{\"fullyQualifiedName\":{\"value\":\"*tableForExecutableTestSuiteTwo\"}}}]}}}";
|
||||
|
||||
String aggregationQuery =
|
||||
"bucketName=fqn:aggType=terms:field=fullyQualifiedName,bucketName=avgTime:aggType=avg:field=updatedAt";
|
||||
Map<String, Object> aggregationString =
|
||||
SearchIndexUtils.buildAggregationString(aggregationQuery);
|
||||
dataQualityReport = searchRepository.genericAggregation(query, "table", aggregationString);
|
||||
dataQualityReport
|
||||
.getData()
|
||||
.forEach(
|
||||
(datum) -> {
|
||||
Map<String, String> m = datum.getAdditionalProperties();
|
||||
assertTrue(m.keySet().containsAll(List.of("fullyQualifiedName", "updatedAt")));
|
||||
});
|
||||
|
||||
aggregationQuery =
|
||||
"bucketName=fqn:aggType=terms:field=fullyQualifiedName,bucketName=owner:aggType=terms:field=owner.name";
|
||||
aggregationString = SearchIndexUtils.buildAggregationString(aggregationQuery);
|
||||
dataQualityReport = searchRepository.genericAggregation(query, "table", aggregationString);
|
||||
dataQualityReport
|
||||
.getData()
|
||||
.forEach(
|
||||
(datum) -> {
|
||||
Map<String, String> m = datum.getAdditionalProperties();
|
||||
assertTrue(m.keySet().containsAll(List.of("fullyQualifiedName", "owner.name")));
|
||||
});
|
||||
|
||||
aggregationQuery =
|
||||
"bucketName=fqn:aggType=terms:field=fullyQualifiedName,bucketName=owner:aggType=terms:field=owner.name,bucketName=avgTime:aggType=avg:field=updatedAt";
|
||||
aggregationString = SearchIndexUtils.buildAggregationString(aggregationQuery);
|
||||
dataQualityReport = searchRepository.genericAggregation(query, "table", aggregationString);
|
||||
dataQualityReport
|
||||
.getData()
|
||||
.forEach(
|
||||
(datum) -> {
|
||||
Map<String, String> m = datum.getAdditionalProperties();
|
||||
assertTrue(
|
||||
m.keySet().containsAll(List.of("fullyQualifiedName", "owner.name", "updatedAt")));
|
||||
});
|
||||
|
||||
aggregationQuery = "bucketName=avgTime:aggType=avg:field=updatedAt";
|
||||
aggregationString = SearchIndexUtils.buildAggregationString(aggregationQuery);
|
||||
dataQualityReport = searchRepository.genericAggregation(query, "table", aggregationString);
|
||||
dataQualityReport
|
||||
.getData()
|
||||
.forEach(
|
||||
(datum) -> {
|
||||
Map<String, String> m = datum.getAdditionalProperties();
|
||||
assertTrue(m.keySet().containsAll(List.of("updatedAt")));
|
||||
});
|
||||
|
||||
aggregationQuery = "bucketName=fqn:aggType=terms:field=fullyQualifiedName";
|
||||
aggregationString = SearchIndexUtils.buildAggregationString(aggregationQuery);
|
||||
dataQualityReport = searchRepository.genericAggregation(query, "table", aggregationString);
|
||||
dataQualityReport
|
||||
.getData()
|
||||
.forEach(
|
||||
(datum) -> {
|
||||
Map<String, String> m = datum.getAdditionalProperties();
|
||||
assertTrue(m.keySet().containsAll(List.of("fullyQualifiedName")));
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchIndex validateGetWithDifferentFields(SearchIndex searchIndex, boolean byName)
|
||||
throws HttpResponseException {
|
||||
|
||||
@ -0,0 +1,54 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/tests/dataQualityReport.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "DataQualityReport",
|
||||
"description": "Data Quality report and aggregation model.",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.tests.DataQualityReport",
|
||||
"definitions": {
|
||||
"dataQualityReportMetadata": {
|
||||
"description": "Schema to capture data quality reports and aggregation data.",
|
||||
"javaType": "org.openmetadata.schema.tests.type.DataQualityReportMetadata",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"keys": {
|
||||
"description": "Keys to identify the data quality report.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"dimensions": {
|
||||
"description": "Dimensions to capture the data quality report.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"metrics": {
|
||||
"description": "Metrics to capture the data quality report.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"metadata": {
|
||||
"description": "Metadata for the data quality report.",
|
||||
"$ref": "#/definitions/dataQualityReportMetadata"
|
||||
},
|
||||
"data": {
|
||||
"description": "Data for the data quality report.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"additionalProperties": {"type": "string"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["metadata", "data"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user