Minor fix test suite reindex regression (#22572)

* fix: added fetcher to set testSuite fields

* fix: entityTimeSeries reindex regression

* fix: added tests checking before after reindex

---------

Co-authored-by: Aniket Katkar <aniketkatkar97@gmail.com>
This commit is contained in:
Teddy 2025-07-25 14:51:23 +02:00 committed by GitHub
parent 4e3740bbad
commit 4f700e9b98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 190 additions and 8 deletions

View File

@ -206,7 +206,7 @@ public class ElasticSearchBulkSink implements BulkSink {
if (!entities.isEmpty() && entities.get(0) instanceof EntityTimeSeriesInterface) {
List<EntityTimeSeriesInterface> tsEntities = (List<EntityTimeSeriesInterface>) entities;
for (EntityTimeSeriesInterface entity : tsEntities) {
addTimeSeriesEntity(entity, indexName);
addTimeSeriesEntity(entity, indexName, entityType);
}
} else {
List<EntityInterface> entityInterfaces = (List<EntityInterface>) entities;
@ -256,8 +256,10 @@ public class ElasticSearchBulkSink implements BulkSink {
}
}
private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) {
String json = JsonUtils.pojoToJson(entity);
private void addTimeSeriesEntity(
EntityTimeSeriesInterface entity, String indexName, String entityType) {
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
String json = JsonUtils.pojoToJson(searchIndexDoc);
String docId = entity.getId().toString();
IndexRequest indexRequest =

View File

@ -206,7 +206,7 @@ public class OpenSearchBulkSink implements BulkSink {
if (!entities.isEmpty() && entities.getFirst() instanceof EntityTimeSeriesInterface) {
List<EntityTimeSeriesInterface> tsEntities = (List<EntityTimeSeriesInterface>) entities;
for (EntityTimeSeriesInterface entity : tsEntities) {
addTimeSeriesEntity(entity, indexName);
addTimeSeriesEntity(entity, indexName, entityType);
}
} else {
List<EntityInterface> entityInterfaces = (List<EntityInterface>) entities;
@ -256,8 +256,10 @@ public class OpenSearchBulkSink implements BulkSink {
}
}
private void addTimeSeriesEntity(EntityTimeSeriesInterface entity, String indexName) {
String json = JsonUtils.pojoToJson(entity);
private void addTimeSeriesEntity(
EntityTimeSeriesInterface entity, String indexName, String entityType) {
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
String json = JsonUtils.pojoToJson(searchIndexDoc);
String docId = entity.getId().toString();
IndexRequest indexRequest =

View File

@ -117,13 +117,15 @@ public class TestSuiteRepository extends EntityRepository<TestSuite> {
UPDATE_FIELDS);
quoteFqn = false;
supportsSearch = true;
fieldFetchers.put("summary", this::fetchAndSetTestCaseResultSummary);
fieldFetchers.put("pipelines", this::fetchAndSetIngestionPipelines);
}
@Override
public void setFields(TestSuite entity, EntityUtil.Fields fields) {
entity.setPipelines(
fields.contains("pipelines") ? getIngestionPipelines(entity) : entity.getPipelines());
entity.setTests(fields.contains(UPDATE_FIELDS) ? getTestCases(entity) : entity.getTests());
entity.setTests(fields.contains("tests") ? getTestCases(entity) : entity.getTests());
entity.setTestCaseResultSummary(
fields.contains("summary")
? getResultSummary(entity.getId())
@ -169,7 +171,9 @@ public class TestSuiteRepository extends EntityRepository<TestSuite> {
}
var testSuiteIds = testSuites.stream().map(ts -> ts.getId().toString()).toList();
var records =
daoCollection.relationshipDAO().findFromBatch(testSuiteIds, Relationship.HAS.ordinal());
daoCollection
.relationshipDAO()
.findToBatch(testSuiteIds, Relationship.CONTAINS.ordinal(), TEST_SUITE, TEST_CASE);
if (records.isEmpty()) {
return Map.of();
}
@ -411,6 +415,40 @@ public class TestSuiteRepository extends EntityRepository<TestSuite> {
}
}
private void fetchAndSetTestCaseResultSummary(
List<TestSuite> testSuites, EntityUtil.Fields fields) {
if (!fields.contains("summary") || testSuites == null || testSuites.isEmpty()) {
return;
}
Map<UUID, List<ResultSummary>> testCaseResultSummaryMap =
testSuites.stream()
.collect(
Collectors.toMap(
TestSuite::getId, testSuite -> getResultSummary(testSuite.getId())));
Map<UUID, TestSummary> testSummaryMap =
testCaseResultSummaryMap.entrySet().stream()
.collect(
Collectors.toMap(Map.Entry::getKey, entry -> getTestSummary(entry.getValue())));
setFieldFromMap(
true, testSuites, testCaseResultSummaryMap, TestSuite::setTestCaseResultSummary);
setFieldFromMap(true, testSuites, testSummaryMap, TestSuite::setSummary);
}
protected void fetchAndSetIngestionPipelines(List<TestSuite> entities, EntityUtil.Fields fields) {
if (!fields.contains("pipelines") || entities == null || entities.isEmpty()) {
return;
}
Map<UUID, List<EntityReference>> ingestionPipelineMap =
entities.stream()
.collect(Collectors.toMap(EntityInterface::getId, this::getIngestionPipelines));
setFieldFromMap(true, entities, ingestionPipelineMap, TestSuite::setPipelines);
}
@SneakyThrows
private List<ResultSummary> getResultSummary(UUID testSuiteId) {
List<ResultSummary> resultSummaries = new ArrayList<>();

View File

@ -35,6 +35,9 @@ public class ElasticTermsAggregations implements ElasticAggregations {
IncludeExclude includeExclude = new IncludeExclude(includes, null);
termsAggregationBuilder.includeExclude(includeExclude);
}
if (params.get("missing") != null) {
termsAggregationBuilder.missing(params.get("missing"));
}
setElasticAggregationBuilder(termsAggregationBuilder);
}

View File

@ -35,6 +35,9 @@ public class OpenTermsAggregations implements OpenAggregations {
IncludeExclude includeExclude = new IncludeExclude(includes, null);
termsAggregationBuilder.includeExclude(includeExclude);
}
if (params.get("missing") != null) {
termsAggregationBuilder.missing(params.get("missing"));
}
setElasticAggregationBuilder(termsAggregationBuilder);
}

View File

@ -3,6 +3,7 @@ package org.openmetadata.service.resources.dqtests;
import static jakarta.ws.rs.core.Response.Status.BAD_REQUEST;
import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -50,6 +51,7 @@ import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.metadataIngestion.TestSuitePipeline;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.TestSuite;
import org.openmetadata.schema.tests.type.TestCaseResult;
import org.openmetadata.schema.tests.type.TestCaseStatus;
import org.openmetadata.schema.tests.type.TestSummary;
import org.openmetadata.schema.type.Column;
@ -65,6 +67,7 @@ import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.resources.services.ingestionpipelines.IngestionPipelineResourceTest;
import org.openmetadata.service.resources.teams.TeamResourceTest;
import org.openmetadata.service.resources.teams.UserResourceTest;
import org.openmetadata.service.security.SecurityUtil;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.util.TestUtils;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
@ -1030,4 +1033,135 @@ public class TestSuiteResourceTest extends EntityResourceTest<TestSuite, CreateT
public void assertFieldChange(String fieldName, Object expected, Object actual) {
assertCommonFieldChange(fieldName, expected, actual);
}
@Test
void test_testSuiteReindexConsistency(TestInfo test)
throws IOException, InterruptedException, ParseException {
// 1. Create a table and test suite for it
TableResourceTest tableResourceTest = new TableResourceTest();
CreateTable tableReq =
tableResourceTest
.createRequest(test)
.withColumns(
List.of(
new Column()
.withName(C1)
.withDisplayName("c1")
.withDataType(ColumnDataType.VARCHAR)
.withDataLength(10)));
Table table = tableResourceTest.createEntity(tableReq, ADMIN_AUTH_HEADERS);
CreateTestSuite createTestSuite = createRequest(table.getFullyQualifiedName());
TestSuite testSuite = createBasicTestSuite(createTestSuite, ADMIN_AUTH_HEADERS);
// 2. Add a new test case to the table
TestCaseResourceTest testCaseResourceTest = new TestCaseResourceTest();
CreateTestCase createTestCase =
testCaseResourceTest.createRequest(
"test_reindex_consistency_" + test.getDisplayName(),
new MessageParser.EntityLink(Entity.TABLE, table.getFullyQualifiedName()));
TestCase testCase =
testCaseResourceTest.createAndCheckEntity(createTestCase, ADMIN_AUTH_HEADERS);
// 3. Ingest test case results for this test case
CreateTestCaseResult createTestCaseResult =
new CreateTestCaseResult()
.withResult("tested")
.withTestCaseStatus(TestCaseStatus.Success)
.withTimestamp(TestUtils.dateToTimestamp("2021-09-09"));
testCaseResourceTest.postTestCaseResult(
testCase.getFullyQualifiedName(), createTestCaseResult, ADMIN_AUTH_HEADERS);
// 3a. Verify test case results are available via search endpoint before reindex
Map<String, String> testCaseResultsQueryParams = new HashMap<>();
testCaseResultsQueryParams.put("testCaseId", testCase.getId().toString());
testCaseResultsQueryParams.put("fields", "testCase,testDefinition,testCaseStatus");
ResultList<TestCaseResult> testCaseResultsBeforeReindex =
testCaseResourceTest.listTestCaseResultsFromSearch(
testCaseResultsQueryParams, 10, 0, "/testCaseResults/search/list", ADMIN_AUTH_HEADERS);
assertNotNull(testCaseResultsBeforeReindex);
assertFalse(testCaseResultsBeforeReindex.getData().isEmpty());
TestCaseResult testCaseResultBeforeReindex = testCaseResultsBeforeReindex.getData().getFirst();
// 4. Fetch the test suite linked to the table using the search endpoint (before reindex)
Map<String, String> queryParams = new HashMap<>();
queryParams.put("fullyQualifiedName", testSuite.getFullyQualifiedName());
queryParams.put("fields", "tests,testCaseResultSummary");
ResultList<TestSuite> testSuitesBeforeReindex =
listEntitiesFromSearch(queryParams, 10, 0, ADMIN_AUTH_HEADERS);
assertNotNull(testSuitesBeforeReindex);
assertFalse(testSuitesBeforeReindex.getData().isEmpty());
TestSuite testSuiteBeforeReindex = testSuitesBeforeReindex.getData().getFirst();
// 5. Trigger an Elasticsearch index refresh for the test suite entity (simulating a reindex)
postTriggerSearchIndexingApp(ADMIN_AUTH_HEADERS);
// Wait for reindexing to complete
Thread.sleep(5000);
// 6. Fetch the test suite again using the search endpoint (after reindex)
ResultList<TestSuite> testSuitesAfterReindex =
listEntitiesFromSearch(queryParams, 10, 0, ADMIN_AUTH_HEADERS);
assertNotNull(testSuitesAfterReindex);
assertTrue(testSuitesAfterReindex.getData().size() > 0);
TestSuite testSuiteAfterReindex = testSuitesAfterReindex.getData().get(0);
// 6a. Verify test case results are still available via search endpoint after reindex
ResultList<TestCaseResult> testCaseResultsAfterReindex =
testCaseResourceTest.listTestCaseResultsFromSearch(
testCaseResultsQueryParams, 10, 0, "/testCaseResults/search/list", ADMIN_AUTH_HEADERS);
assertNotNull(testCaseResultsAfterReindex);
assertFalse(testCaseResultsAfterReindex.getData().isEmpty());
TestCaseResult testCaseResultAfterReindex = testCaseResultsAfterReindex.getData().getFirst();
// Compare test case results before and after reindex
assertEquals(
testCaseResultBeforeReindex.getTestCaseStatus(),
testCaseResultAfterReindex.getTestCaseStatus());
assertEquals(testCaseResultBeforeReindex.getResult(), testCaseResultAfterReindex.getResult());
assertEquals(
testCaseResultBeforeReindex.getTimestamp(), testCaseResultAfterReindex.getTimestamp());
assertNotNull(testCaseResultAfterReindex.getTestCase());
assertEquals(testCase.getId(), testCaseResultAfterReindex.getTestCase().getId());
// Compare testDefinition
assertEquals(
testCaseResultBeforeReindex.getTestDefinition(),
testCaseResultAfterReindex.getTestDefinition());
// 7. Compare the test suite results from before and after the reindex to ensure they are
// identical
assertEquals(testSuiteBeforeReindex.getId(), testSuiteAfterReindex.getId());
assertEquals(testSuiteBeforeReindex.getName(), testSuiteAfterReindex.getName());
assertEquals(
testSuiteBeforeReindex.getFullyQualifiedName(),
testSuiteAfterReindex.getFullyQualifiedName());
assertEquals(testSuiteBeforeReindex.getDescription(), testSuiteAfterReindex.getDescription());
// Assert that every test ID is in both before and after testSuite
for (EntityReference testRef : testSuiteBeforeReindex.getTests()) {
assertTrue(
testSuiteAfterReindex.getTests().stream()
.allMatch(t -> t.getId().equals(testRef.getId())),
"Test ID " + testRef.getId() + " should be present in testSuite after reindex");
}
// Compare test cases
assertEquals(testSuiteBeforeReindex.getTests().size(), testSuiteAfterReindex.getTests().size());
if (testSuiteBeforeReindex.getTests() != null && testSuiteAfterReindex.getTests() != null) {
verifyTestCases(testSuiteBeforeReindex.getTests(), testSuiteAfterReindex.getTests());
}
// Compare test case result summaries
assertEquals(
testSuiteBeforeReindex.getTestCaseResultSummary(),
testSuiteAfterReindex.getTestCaseResultSummary());
}
private void postTriggerSearchIndexingApp(Map<String, String> authHeaders) throws IOException {
WebTarget target = getResource("apps/trigger").path("SearchIndexingApplication");
Response response =
SecurityUtil.addHeaders(target, authHeaders)
.post(jakarta.ws.rs.client.Entity.json(Map.of()));
TestUtils.readResponse(response, Response.Status.OK.getStatusCode());
}
}