GEN-1380 - Implement support for date_histogram aggregation (#17738)

* feat: indexed test case results

* feat: added indexation logic for test case results

* style: ran java linting

* fix: IDE warnigns

* chore: added test case results migration

* style: ran java linting

* fix: postgres migration column json ref

* empty commit to trigger queued

* feat: add support for date_histogram aggregation

* style: ran java linting

* chore: clean up conflicts

* fix: move from iterator loop to forEach

* fix: removed comment

* style: ran java linting
This commit is contained in:
Teddy 2024-09-12 13:11:30 +02:00 committed by GitHub
parent 3282b057d8
commit e066929ff7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 107 additions and 15 deletions

View File

@ -4,13 +4,17 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.json.JsonArray;
import javax.json.JsonNumber;
import javax.json.JsonObject;
import javax.json.JsonString;
import javax.json.JsonValue;
import org.openmetadata.schema.tests.DataQualityReport;
import org.openmetadata.schema.tests.Datum;
import org.openmetadata.schema.tests.type.DataQualityReportMetadata;
@ -191,10 +195,17 @@ public final class SearchIndexUtils {
.forEach(
bucket -> {
JsonObject bucketObject = (JsonObject) bucket;
Optional<String> bucketKey = Optional.of(bucketObject.getString("key"));
Optional<JsonValue> val = Optional.of(bucketObject.get("key"));
bucketKey.ifPresentOrElse(
s -> nodeData.put(dimensions.get(0), s),
val.ifPresentOrElse(
s -> {
switch (s.getValueType()) {
case NUMBER -> nodeData.put(
dimensions.get(0), String.valueOf((JsonNumber) s));
default -> nodeData.put(
dimensions.get(0), ((JsonString) s).getString());
}
},
() -> nodeData.put(dimensions.get(0), null));
// Traverse the next level of the aggregation tree.
@ -257,21 +268,36 @@ public final class SearchIndexUtils {
Map<String, String> aggregationMap = new HashMap<>();
String[] parts = nested[i].split(":");
for (String part : parts) {
String[] kvPairs = part.split("=");
if (kvPairs[0].equals("field")) {
aggregationString
.append("\"")
.append(kvPairs[0])
.append("\":\"")
.append(kvPairs[1])
.append("\"");
Iterator<String> partsIterator = Arrays.stream(parts).iterator();
while (partsIterator.hasNext()) {
String part = partsIterator.next();
if (!partsIterator.hasNext()) {
// last element = key=value pairs of the aggregation
String[] subParts = part.split("&");
Arrays.stream(subParts)
.forEach(
subPart -> {
String[] kvPairs = subPart.split("=");
aggregationString
.append("\"")
.append(kvPairs[0])
.append("\":\"")
.append(kvPairs[1])
.append("\"");
aggregationMap.put(kvPairs[0], kvPairs[1]);
// add comma if not the last element
if (Arrays.asList(subParts).indexOf(subPart) < subParts.length - 1)
aggregationString.append(",");
});
aggregationString.append("}");
} else {
String[] kvPairs = part.split("=");
aggregationString.append("\"").append(kvPairs[1]).append("\":{");
aggregationMap.put(kvPairs[0], kvPairs[1]);
}
aggregationMap.put(kvPairs[0], kvPairs[1]);
}
if (i < nested.length - 1) {
aggregationString.append(",\"aggs\":{");
}

View File

@ -1009,6 +1009,15 @@ public class ElasticSearchClient implements SearchClient {
AggregationBuilders.avg(key).field(avgAggregation.getString("field"));
aggregationBuilders.add(avgAggregationBuilder);
break;
case "date_histogram":
JsonObject dateHistogramAggregation = aggregation.getJsonObject(aggregationType);
String calendarInterval = dateHistogramAggregation.getString("calendar_interval");
DateHistogramAggregationBuilder dateHistogramAggregationBuilder =
AggregationBuilders.dateHistogram(key)
.field(dateHistogramAggregation.getString("field"))
.calendarInterval(new DateHistogramInterval(calendarInterval));
aggregationBuilders.add(dateHistogramAggregationBuilder);
break;
case "nested":
JsonObject nestedAggregation = aggregation.getJsonObject("nested");
AggregationBuilder nestedAggregationBuilder =

View File

@ -28,6 +28,7 @@ import static org.openmetadata.common.utils.CommonUtil.listOf;
import static org.openmetadata.schema.type.ColumnDataType.BIGINT;
import static org.openmetadata.schema.type.MetadataOperation.EDIT_TESTS;
import static org.openmetadata.service.Entity.ADMIN_USER_NAME;
import static org.openmetadata.service.Entity.getSearchRepository;
import static org.openmetadata.service.exception.CatalogExceptionMessage.permissionNotAllowed;
import static org.openmetadata.service.jdbi3.TestCaseRepository.FAILED_ROWS_SAMPLE_EXTENSION;
import static org.openmetadata.service.security.SecurityUtil.authHeaders;
@ -75,6 +76,7 @@ import org.openmetadata.schema.api.tests.CreateTestCaseResolutionStatus;
import org.openmetadata.schema.api.tests.CreateTestSuite;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.tests.DataQualityReport;
import org.openmetadata.schema.tests.ResultSummary;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.TestCaseParameterValue;
@ -101,6 +103,8 @@ import org.openmetadata.service.resources.EntityResourceTest;
import org.openmetadata.service.resources.databases.TableResourceTest;
import org.openmetadata.service.resources.feeds.FeedResourceTest;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.search.SearchIndexUtils;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.indexes.TestCaseIndex;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.util.JsonUtils;
@ -2751,6 +2755,37 @@ public class TestCaseResourceTest extends EntityResourceTest<TestCase, CreateTes
assertFalse(testCase.getUseDynamicAssertion());
}
@Test
void aggregate_testCaseResults(TestInfo testInfo) throws IOException, ParseException {
// Set up tests
SearchRepository searchRepository = getSearchRepository();
CreateTestCase create = createRequest(testInfo);
create
.withEntityLink(TABLE_COLUMN_LINK)
.withTestSuite(TEST_SUITE1.getFullyQualifiedName())
.withTestDefinition(TEST_DEFINITION3.getFullyQualifiedName())
.withParameterValues(
List.of(new TestCaseParameterValue().withValue("100").withName("missingCountValue")));
TestCase testCase = createAndCheckEntity(create, ADMIN_AUTH_HEADERS);
for (int i = 1; i < 10; i++) {
TestCaseResult testCaseResult =
new TestCaseResult()
.withResult("tested")
.withTestCaseStatus(TestCaseStatus.Success)
.withTimestamp(TestUtils.dateToTimestamp("2021-09-0%s".formatted(i)));
putTestCaseResult(testCase.getFullyQualifiedName(), testCaseResult, ADMIN_AUTH_HEADERS);
}
// Test aggregation
String aggregationQuery =
"bucketName=dates:aggType=date_histogram:field=timestamp&calendar_interval=1d,bucketName=dimesion:aggType=terms:field=testDefinition.dataQualityDimension";
Map<String, Object> aggregationString =
SearchIndexUtils.buildAggregationString(aggregationQuery);
DataQualityReport dataQualityReport =
searchRepository.genericAggregation(null, "testCaseResult", aggregationString);
assertNotNull(dataQualityReport.getData());
}
@Test
void createTestCaseResults_wrongTs(TestInfo testInfo) throws IOException, HttpResponseException {
CreateTestCase create = createRequest(testInfo);

View File

@ -447,6 +447,22 @@ public class SearchIndexResourceTest extends EntityResourceTest<SearchIndex, Cre
expectedAggregationString =
"\"entityLinks\":{\"terms\":{\"field\":\"entityLinks.nonNormalized\"},\"aggs\":{\"minPrice\":{\"min\":{\"field\":\"price.adjusted\"}}}}";
assertEquals(expectedAggregationString, actualAggregationstring.get("aggregationStr"));
// Date histogram aggregation
aggregationString =
"bucketName=dates:aggType=date_histogram:field=timestamp&calendar_interval=2d";
actualAggregationstring = SearchIndexUtils.buildAggregationString(aggregationString);
expectedAggregationString =
"\"dates\":{\"date_histogram\":{\"field\":\"timestamp\",\"calendar_interval\":\"2d\"}}";
assertEquals(expectedAggregationString, actualAggregationstring.get("aggregationStr"));
// Date histogram aggregation with sub aggregation
aggregationString =
"bucketName=dates:aggType=date_histogram:field=timestamp&calendar_interval=2d,bucketName=minPrice:aggType=min:field=price.adjusted";
actualAggregationstring = SearchIndexUtils.buildAggregationString(aggregationString);
expectedAggregationString =
"\"dates\":{\"date_histogram\":{\"field\":\"timestamp\",\"calendar_interval\":\"2d\"},\"aggs\":{\"minPrice\":{\"min\":{\"field\":\"price.adjusted\"}}}}";
assertEquals(expectedAggregationString, actualAggregationstring.get("aggregationStr"));
}
@Test
@ -515,6 +531,12 @@ public class SearchIndexResourceTest extends EntityResourceTest<SearchIndex, Cre
Map<String, String> m = datum.getAdditionalProperties();
assertTrue(m.containsKey("fullyQualifiedName"));
});
aggregationQuery =
"bucketName=dates:aggType=date_histogram:field=timestamp&calendar_interval=1d,bucketName=dimesion:aggType=terms:field=testDefinition.dataQualityDimension";
aggregationString = SearchIndexUtils.buildAggregationString(aggregationQuery);
dataQualityReport =
searchRepository.genericAggregation(null, "testCaseResult", aggregationString);
}
@Override

View File

@ -176,10 +176,10 @@ class ServiceBaseClass {
.getByLabel('Ingestions')
.getByTestId('loader')
.waitFor({ state: 'detached' });
// need manual wait to settle down the deployed pipeline, before triggering the pipeline
await page.waitForTimeout(2000);
await page.getByTestId('more-actions').first().click();
await page.getByTestId('run-button').click();