ISSUE #1052 - Implement List entities with test suites repo logic (#19461)

* fix: centralized listWithOffset logic

* feat: dq app config + list entities with tests logic

* fix: test case
This commit is contained in:
Teddy 2025-01-22 08:08:07 +01:00 committed by GitHub
parent 5064602dc8
commit cacfabd9ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 316 additions and 62 deletions

View File

@ -4650,6 +4650,37 @@ public interface CollectionDAO {
return listAfter(
getTableName(), mySqlCondition, postgresCondition, limit, afterName, afterId, groupBy);
}
@SqlQuery(
"SELECT json FROM <table> tn\n"
+ "INNER JOIN (SELECT DISTINCT fromId FROM entity_relationship er\n"
+ "<cond> AND toEntity = 'testSuite' and fromEntity = :entityType) er ON fromId = tn.id\n"
+ "LIMIT :limit OFFSET :offset;")
List<String> listEntitiesWithTestSuite(
@Define("table") String table,
@BindMap Map<String, ?> params,
@Define("cond") String cond,
@Bind("entityType") String entityType,
@Bind("limit") int limit,
@Bind("offset") int offset);
default List<String> listEntitiesWithTestsuite(
ListFilter filter, String table, String entityType, int limit, int offset) {
return listEntitiesWithTestSuite(
table, filter.getQueryParams(), filter.getCondition(), entityType, limit, offset);
}
@SqlQuery(
"SELECT COUNT(DISTINCT fromId) FROM entity_relationship er\n"
+ "<cond> AND toEntity = 'testSuite' and fromEntity = :entityType;")
Integer countEntitiesWithTestSuite(
@BindMap Map<String, ?> params,
@Define("cond") String cond,
@Bind("entityType") String entityType);
default Integer countEntitiesWithTestsuite(ListFilter filter, String entityType) {
return countEntitiesWithTestSuite(filter.getQueryParams(), filter.getCondition(), entityType);
}
}
interface TestCaseDAO extends EntityDAO<TestCase> {

View File

@ -67,6 +67,9 @@ import static org.openmetadata.service.util.EntityUtil.nextMajorVersion;
import static org.openmetadata.service.util.EntityUtil.nextVersion;
import static org.openmetadata.service.util.EntityUtil.objectMatch;
import static org.openmetadata.service.util.EntityUtil.tagLabelMatch;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getAfterOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getBeforeOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getOffset;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@ -180,11 +183,13 @@ import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ListWithOffsetFunction;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.RestUtil.DeleteResponse;
import org.openmetadata.service.util.RestUtil.PatchResponse;
import org.openmetadata.service.util.RestUtil.PutResponse;
import org.openmetadata.service.util.ResultList;
import software.amazon.awssdk.utils.Either;
/**
* This is the base class used by Entity Resources to perform READ and WRITE operations to the backend database to
@ -767,40 +772,6 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
}
public final ResultList<T> listAfterWithSkipFailure(
UriInfo uriInfo, Fields fields, ListFilter filter, int limitParam, String after) {
List<EntityError> errors = new ArrayList<>();
List<T> entities = new ArrayList<>();
int beforeOffset = Integer.parseInt(RestUtil.decodeCursor(after));
int currentOffset = beforeOffset;
int total = dao.listCount(filter);
if (limitParam > 0) {
List<String> jsons = dao.listAfter(filter, limitParam, currentOffset);
for (String json : jsons) {
T parsedEntity = JsonUtils.readValue(json, entityClass);
try {
T entity = setFieldsInternal(parsedEntity, fields);
setInheritedFields(entity, fields);
clearFieldsInternal(entity, fields);
entities.add(withHref(uriInfo, entity));
} catch (Exception e) {
clearFieldsInternal(parsedEntity, fields);
EntityError entityError =
new EntityError().withMessage(e.getMessage()).withEntity(parsedEntity);
errors.add(entityError);
LOG.error("[ListForIndexing] Failed for Entity : {}", entityError);
}
}
currentOffset = currentOffset + limitParam;
String newAfter = currentOffset > total ? null : String.valueOf(currentOffset);
return getResultList(entities, errors, String.valueOf(beforeOffset), newAfter, total);
} else {
// limit == 0 , return total count of entity.
return getResultList(entities, errors, null, null, total);
}
}
@SuppressWarnings("unchecked")
Map<String, String> parseCursorMap(String param) {
Map<String, String> cursorMap;
@ -897,6 +868,45 @@ public abstract class EntityRepository<T extends EntityInterface> {
new EntityHistory().withEntityType(entityType).withVersions(versions), offset + limit);
}
public final ResultList<T> listWithOffset(
ListWithOffsetFunction<ListFilter, Integer, Integer, List<String>> callable,
Function<ListFilter, Integer> countCallable,
ListFilter filter,
Integer limitParam,
String offset,
boolean skipErrors,
Fields fields,
UriInfo uriInfo) {
List<T> entities = new ArrayList<>();
List<EntityError> errors = new ArrayList<>();
Integer total = countCallable.apply(filter);
int offsetInt = getOffset(offset);
String afterOffset = getAfterOffset(offsetInt, limitParam, total);
String beforeOffset = getBeforeOffset(offsetInt, limitParam);
if (limitParam > 0) {
List<String> jsons = callable.apply(filter, limitParam, offsetInt);
Iterator<Either<T, EntityError>> iterator = serializeJsons(jsons, fields, uriInfo);
while (iterator.hasNext()) {
Either<T, EntityError> either = iterator.next();
if (either.right().isPresent()) {
if (!skipErrors) {
throw new RuntimeException(either.right().get().getMessage());
} else {
errors.add(either.right().get());
LOG.error("[List] Failed for Entity : {}", either.right().get());
}
} else {
entities.add(either.left().get());
}
}
return getResultList(entities, errors, beforeOffset, afterOffset, total);
} else {
return getResultList(entities, errors, null, null, total);
}
}
public final EntityHistory listVersions(UUID id) {
T latest = setFieldsInternal(find(id, ALL), putFields);
setInheritedFields(latest, putFields);
@ -1580,6 +1590,23 @@ public abstract class EntityRepository<T extends EntityInterface> {
.listBetweenTimestampsByOrder(fqn, extension, startTs, endTs, orderBy);
}
@Transaction
public ResultList<T> getEntitiesWithTestSuite(
ListFilter filter, Integer limit, String offset, EntityUtil.Fields fields) {
CollectionDAO.TestSuiteDAO testSuiteDAO = daoCollection.testSuiteDAO();
return listWithOffset(
(filterParam, limitParam, offsetParam) ->
testSuiteDAO.listEntitiesWithTestsuite(
filterParam, dao.getTableName(), entityType, limitParam, offsetParam),
(filterParam) -> testSuiteDAO.countEntitiesWithTestsuite(filterParam, entityType),
filter,
limit,
offset,
false,
fields,
null);
}
@Transaction
public final void deleteExtensionAtTimestamp(String fqn, String extension, Long timestamp) {
daoCollection.entityExtensionTimeSeriesDao().deleteAtTimestamp(fqn, extension, timestamp);
@ -3925,4 +3952,36 @@ public abstract class EntityRepository<T extends EntityInterface> {
private List<String> entityListToStrings(List<T> entities) {
return entities.stream().map(EntityInterface::getId).map(UUID::toString).toList();
}
private Iterator<Either<T, EntityError>> serializeJsons(
List<String> jsons, Fields fields, UriInfo uriInfo) {
return new Iterator<>() {
private final Iterator<String> iterator = jsons.iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public Either<T, EntityError> next() {
String json = iterator.next();
T entity = JsonUtils.readValue(json, entityClass);
try {
setFieldsInternal(entity, fields);
setInheritedFields(entity, fields);
clearFieldsInternal(entity, fields);
if (!nullOrEmpty(uriInfo)) {
entity = withHref(uriInfo, entity);
}
return Either.left(entity);
} catch (Exception e) {
clearFieldsInternal(entity, fields);
EntityError entityError =
new EntityError().withMessage(e.getMessage()).withEntity(entity);
return Either.right(entityError);
}
}
};
}
}

View File

@ -3,6 +3,9 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.getEntityFields;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getAfterOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getBeforeOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getOffset;
import java.beans.IntrospectionException;
import java.io.IOException;
@ -316,26 +319,6 @@ public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInter
timeSeriesDao.deleteById(id);
}
private String getAfterOffset(int offsetInt, int limit, int total) {
int afterOffset = offsetInt + limit;
// If afterOffset is greater than total, then set it to null to indicate end of list
return afterOffset >= total ? null : String.valueOf(afterOffset);
}
private String getBeforeOffset(int offsetInt, int limit) {
int beforeOffsetInt = offsetInt - limit;
// If offset is negative, then set it to 0 if you pass offset 4 and limit 10, then the previous
// page will be at offset 0
if (beforeOffsetInt < 0) beforeOffsetInt = 0;
// if offsetInt is 0 (i.e. either no offset or offset is 0), then set it to null as there is no
// previous page
return (offsetInt == 0) ? null : String.valueOf(beforeOffsetInt);
}
private int getOffset(String offset) {
return offset != null ? Integer.parseInt(RestUtil.decodeCursor(offset)) : 0;
}
private Map<String, List<?>> getEntityList(List<String> jsons, boolean skipErrors) {
List<T> entityList = new ArrayList<>();
List<EntityError> errors = new ArrayList<>();

View File

@ -0,0 +1,14 @@
package org.openmetadata.service.util;
import java.util.Objects;
import java.util.function.Function;
@FunctionalInterface
public interface ListWithOffsetFunction<A, B, C, R> {
R apply(A a, B b, C c);
default <V> ListWithOffsetFunction<A, B, C, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (A a, B b, C c) -> after.apply(apply(a, b, c));
}
}

View File

@ -9,6 +9,7 @@ import org.jdbi.v3.core.statement.SqlStatements;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;
import org.jdbi.v3.sqlobject.SqlObjects;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
import org.openmetadata.service.util.RestUtil;
public class JdbiUtils {
@ -52,4 +53,24 @@ public class JdbiUtils {
return jdbiInstance;
}
public static int getOffset(String offset) {
return offset != null ? Integer.parseInt(RestUtil.decodeCursor(offset)) : 0;
}
public static String getAfterOffset(int offsetInt, int limit, int total) {
int afterOffset = offsetInt + limit;
// If afterOffset is greater than total, then set it to null to indicate end of list
return afterOffset >= total ? null : String.valueOf(afterOffset);
}
public static String getBeforeOffset(int offsetInt, int limit) {
int beforeOffsetInt = offsetInt - limit;
// If offset is negative, then set it to 0 if you pass offset 4 and limit 10, then the previous
// page will be at offset 0
if (beforeOffsetInt < 0) beforeOffsetInt = 0;
// if offsetInt is 0 (i.e. either no offset or offset is 0), then set it to null as there is no
// previous page
return (offsetInt == 0) ? null : String.valueOf(beforeOffsetInt);
}
}

View File

@ -29,6 +29,7 @@ import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.EntityDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.util.RestUtil;
@ -96,9 +97,17 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
ResultList<? extends EntityInterface> result;
try {
EntityDAO<?> entityDAO = entityRepository.getDao();
result =
entityRepository.listAfterWithSkipFailure(
null, Entity.getFields(entityType, fields), filter, batchSize, cursor);
entityRepository.listWithOffset(
entityDAO::listAfter,
entityDAO::listCount,
filter,
batchSize,
cursor,
true,
Entity.getFields(entityType, fields),
null);
if (!result.getErrors().isEmpty()) {
lastFailedCursor = this.cursor.get();
if (result.getPaging().getAfter() == null) {
@ -154,9 +163,17 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
ResultList<? extends EntityInterface> result;
try {
EntityDAO<?> entityDAO = entityRepository.getDao();
result =
entityRepository.listAfterWithSkipFailure(
null, Entity.getFields(entityType, fields), filter, batchSize, currentCursor);
entityRepository.listWithOffset(
entityDAO::listAfter,
entityDAO::listCount,
filter,
batchSize,
currentCursor,
true,
Entity.getFields(entityType, fields),
null);
LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, result.getData().size(), result.getErrors().size());

View File

@ -48,7 +48,13 @@
"mappings": {
"properties": {
"id": {
"type": "text"
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",

View File

@ -80,6 +80,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -149,6 +150,8 @@ import org.openmetadata.schema.type.TagLabel.LabelType;
import org.openmetadata.schema.type.csv.CsvImportResult;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.TableRepository;
import org.openmetadata.service.jdbi3.TableRepository.TableCsv;
import org.openmetadata.service.resources.EntityResourceTest;
import org.openmetadata.service.resources.databases.TableResource.TableList;
@ -2266,6 +2269,46 @@ public class TableResourceTest extends EntityResourceTest<Table, CreateTable> {
schemaTest.assertOwnershipInheritanceOverride(schema, createSchema.withOwners(null), USER2_REF);
}
@Test
void test_listTablesWithTestSuite(TestInfo test) throws IOException {
CreateDatabase createDb = dbTest.createRequest(test).withOwners(Lists.newArrayList(USER1_REF));
Database db = dbTest.createEntity(createDb, ADMIN_AUTH_HEADERS);
CreateDatabaseSchema createSchema =
schemaTest.createRequest(test).withDatabase(db.getFullyQualifiedName());
DatabaseSchema schema = schemaTest.createEntity(createSchema, ADMIN_AUTH_HEADERS);
CreateTable createTable =
createRequest(test).withDatabaseSchema(schema.getFullyQualifiedName());
Table table = createEntity(createTable, ADMIN_AUTH_HEADERS);
CreateTestSuite createTestSuite =
testSuiteResourceTest.createRequest(table.getFullyQualifiedName());
TestSuite testSuite =
testSuiteResourceTest.createBasicTestSuite(createTestSuite, ADMIN_AUTH_HEADERS);
CreateTestCase createTestCase =
testCaseResourceTest
.createRequest(test)
.withEntityLink(String.format("<#E::table::%s>", table.getFullyQualifiedName()))
.withTestSuite(testSuite.getFullyQualifiedName())
.withTestDefinition(TEST_DEFINITION2.getFullyQualifiedName());
TestCase testCase = testCaseResourceTest.createEntity(createTestCase, ADMIN_AUTH_HEADERS);
TableRepository tableRepository = (TableRepository) Entity.getEntityRepository(TABLE);
ResultList<Table> allTables = listEntities(null, ADMIN_AUTH_HEADERS);
ResultList<Table> tablesWithTestSuite =
tableRepository.getEntitiesWithTestSuite(
new ListFilter(),
10,
"MA==",
new Fields(
Set.of(
"tags", "testSuite", "columns", "table.tableProfile", "table.columnProfile")));
// Ensure the number of tables with test suite is less than the total number of tables
assertTrue(allTables.getData().size() > tablesWithTestSuite.getData().size());
}
@Test
void test_domainInheritance(TestInfo test) throws IOException {
// Domain is inherited from databaseService > database > databaseSchema > table
@ -2441,7 +2484,7 @@ public class TableResourceTest extends EntityResourceTest<Table, CreateTable> {
queryParams.put("limit", "100");
ResultList<Table> tables = listEntities(queryParams, ADMIN_AUTH_HEADERS);
assertEquals(4, tables.getData().size());
assertEquals(5, tables.getData().size());
assertNotNull(tables.getData().get(0).getTestSuite());
}

View File

@ -13,6 +13,9 @@
{
"$ref": "external/automatorAppConfig.json"
},
{
"$ref": "external/slackAppTokenConfiguration.json"
},
{
"$ref": "internal/dataInsightsAppConfig.json"
},
@ -23,7 +26,7 @@
"$ref": "internal/searchIndexingAppConfig.json"
},
{
"$ref": "external/slackAppTokenConfiguration.json"
"$ref": "internal/collateAIQualityAgentAppConfig.json"
},
{
"$ref": "internal/dataRetentionConfiguration.json"

View File

@ -0,0 +1,37 @@
{
"$id": "https://open-metadata.org/schema/entity/applications/configuration/external/collateAIQualityAgentAppConfig.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "CollateAIQualityAgentAppConfig.json",
"description": "Configuration for the Collate AI Quality Agent.",
"type": "object",
"javaType": "org.openmetadata.schema.entity.app.internal.CollateAIQualityAgentAppConfig",
"definitions": {
"collateAIQualityAgentAppType": {
"description": "Application type.",
"type": "string",
"enum": ["CollateAIQualityAgent"],
"default": "CollateAIQualityAgent"
}
},
"properties": {
"type": {
"title": "Application Type",
"description": "Application Type",
"$ref": "#/definitions/collateAIQualityAgentAppType",
"default": "CollateAIQualityAgent"
},
"filter": {
"title": "Filter",
"description": "Query filter to be passed to ES. E.g., `{\"query\":{\"bool\":{\"must\":[{\"bool\":{\"should\":[{\"term\":{\"domain.displayName.keyword\":\"DG Anim\"}}]}}]}}}`. This is the same payload as in the Explore page.",
"type": "string"
},
"active": {
"title": "Active",
"description": "Whether the suggested tests should be active or not upon suggestion",
"type": "boolean",
"default": false
}
},
"additionalProperties": false,
"required": ["filter"]
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Configuration for the Collate AI Quality Agent.
*/
export interface CollateAIQualityAgentAppConfig {
/**
* Whether the suggested tests should be active or not upon suggestion
*/
active?: boolean;
/**
* Query filter to be passed to ES. E.g.,
* `{"query":{"bool":{"must":[{"bool":{"should":[{"term":{"domain.displayName.keyword":"DG
* Anim"}}]}}]}}}`. This is the same payload as in the Explore page.
*/
filter: string;
/**
* Application Type
*/
type?: CollateAIQualityAgentAppType;
}
/**
* Application Type
*
* Application type.
*/
export enum CollateAIQualityAgentAppType {
CollateAIQualityAgent = "CollateAIQualityAgent",
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at