mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-29 19:35:56 +00:00
MINOR - createMany endpoint for test case resource (#19340)
* feat: createMany endpoint for test case resource * test: createMany tests for test cases * fix: change ops context to test case + copy nullified object
This commit is contained in:
parent
195857acac
commit
084d2efa6c
@ -0,0 +1,12 @@
|
||||
package org.openmetadata.service.exception;
|
||||
|
||||
import javax.ws.rs.core.Response;
|
||||
import org.openmetadata.sdk.exception.WebServiceException;
|
||||
|
||||
public class BulkLimitException extends WebServiceException {
|
||||
private static final String BATCH_ERROR_TYPE = "BULK_LIMIT_EXCEPTION";
|
||||
|
||||
public BulkLimitException(String message) {
|
||||
super(Response.Status.REQUEST_ENTITY_TOO_LARGE, BATCH_ERROR_TYPE, message);
|
||||
}
|
||||
}
|
@ -714,6 +714,24 @@ public interface CollectionDAO {
|
||||
@Bind("jsonSchema") String jsonSchema,
|
||||
@Bind("json") String json);
|
||||
|
||||
@Transaction
|
||||
@ConnectionAwareSqlBatch(
|
||||
value =
|
||||
"REPLACE INTO entity_extension(id, extension, jsonSchema, json) "
|
||||
+ "VALUES (:id, :extension, :jsonSchema, :json)",
|
||||
connectionType = MYSQL)
|
||||
@ConnectionAwareSqlBatch(
|
||||
value =
|
||||
"INSERT INTO entity_extension(id, extension, jsonSchema, json) "
|
||||
+ "VALUES (:id, :extension, :jsonSchema, (:json :: jsonb)) "
|
||||
+ "ON CONFLICT (id, extension) DO UPDATE SET jsonSchema = EXCLUDED.jsonSchema, json = EXCLUDED.json",
|
||||
connectionType = POSTGRES)
|
||||
void insertMany(
|
||||
@BindUUID("id") List<UUID> id,
|
||||
@Bind("extension") List<String> extension,
|
||||
@Bind("jsonSchema") String jsonSchema,
|
||||
@Bind("json") List<String> json);
|
||||
|
||||
@ConnectionAwareSqlUpdate(
|
||||
value = "UPDATE entity_extension SET json = :json where (json -> '$.id') = :id",
|
||||
connectionType = MYSQL)
|
||||
|
@ -26,13 +26,16 @@ import lombok.SneakyThrows;
|
||||
import org.jdbi.v3.sqlobject.customizer.Bind;
|
||||
import org.jdbi.v3.sqlobject.customizer.BindMap;
|
||||
import org.jdbi.v3.sqlobject.customizer.Define;
|
||||
import org.jdbi.v3.sqlobject.statement.BatchChunkSize;
|
||||
import org.jdbi.v3.sqlobject.statement.SqlQuery;
|
||||
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlBatch;
|
||||
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery;
|
||||
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
@ -71,6 +74,22 @@ public interface EntityDAO<T extends EntityInterface> {
|
||||
@BindFQN("nameHashColumnValue") String nameHashColumnValue,
|
||||
@Bind("json") String json);
|
||||
|
||||
/** Common queries for all entities implemented here. Do not override. */
|
||||
@Transaction
|
||||
@ConnectionAwareSqlBatch(
|
||||
value = "INSERT INTO <table> (<nameHashColumn>, json) VALUES (:nameHashColumnValue, :json)",
|
||||
connectionType = MYSQL)
|
||||
@ConnectionAwareSqlBatch(
|
||||
value =
|
||||
"INSERT INTO <table> (<nameHashColumn>, json) VALUES (:nameHashColumnValue, :json :: jsonb)",
|
||||
connectionType = POSTGRES)
|
||||
@BatchChunkSize(100)
|
||||
void insertMany(
|
||||
@Define("table") String table,
|
||||
@Define("nameHashColumn") String nameHashColumn,
|
||||
@BindFQN("nameHashColumnValue") List<String> nameHashColumnValue,
|
||||
@Bind("json") List<String> json);
|
||||
|
||||
@ConnectionAwareSqlUpdate(
|
||||
value =
|
||||
"UPDATE <table> SET json = :json, <nameHashColumn> = :nameHashColumnValue WHERE id = :id",
|
||||
@ -366,6 +385,16 @@ public interface EntityDAO<T extends EntityInterface> {
|
||||
insert(getTableName(), getNameHashColumn(), fqn, JsonUtils.pojoToJson(entity));
|
||||
}
|
||||
|
||||
/** Default methods that interfaces with implementation. Don't override */
|
||||
default void insertMany(List<EntityInterface> entities) {
|
||||
List<String> fqns = entities.stream().map(EntityInterface::getFullyQualifiedName).toList();
|
||||
insertMany(
|
||||
getTableName(),
|
||||
getNameHashColumn(),
|
||||
fqns,
|
||||
entities.stream().map(JsonUtils::pojoToJson).toList());
|
||||
}
|
||||
|
||||
default void insert(String nameHash, EntityInterface entity, String fqn) {
|
||||
insert(getTableName(), nameHash, fqn, JsonUtils.pojoToJson(entity));
|
||||
}
|
||||
|
@ -75,6 +75,7 @@ import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.util.concurrent.UncheckedExecutionException;
|
||||
import com.google.gson.Gson;
|
||||
import com.networknt.schema.JsonSchema;
|
||||
import com.networknt.schema.ValidationMessage;
|
||||
import java.io.IOException;
|
||||
@ -421,6 +422,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
*/
|
||||
protected abstract void storeEntity(T entity, boolean update);
|
||||
|
||||
protected void storeEntities(List<T> entities) {
|
||||
// Nothing to do here. This method is overridden in the child class if required
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is called to store all the relationships of an entity. It is expected that all relationships are
|
||||
* already validated and completely setup before this method is called and no validation of relationships is required.
|
||||
@ -442,6 +447,13 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
protected void setInheritedFields(List<T> entities, Fields fields) {
|
||||
for (T entity : entities) {
|
||||
setInheritedFields(entity, fields);
|
||||
}
|
||||
}
|
||||
|
||||
protected final void addServiceRelationship(T entity, EntityReference service) {
|
||||
if (service != null) {
|
||||
addRelationship(
|
||||
@ -893,6 +905,14 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
return new EntityHistory().withEntityType(entityType).withVersions(allVersions);
|
||||
}
|
||||
|
||||
public final List<T> createMany(UriInfo uriInfo, List<T> entities) {
|
||||
for (T e : entities) {
|
||||
prepareInternal(e, false);
|
||||
}
|
||||
List<T> createdEntities = createManyEntities(entities);
|
||||
return createdEntities;
|
||||
}
|
||||
|
||||
public final T create(UriInfo uriInfo, T entity) {
|
||||
entity = withHref(uriInfo, createInternal(entity));
|
||||
return entity;
|
||||
@ -920,6 +940,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
storeRelationships(entity);
|
||||
}
|
||||
|
||||
public final void storeRelationshipsInternal(List<T> entity) {
|
||||
entity.forEach(this::storeRelationshipsInternal);
|
||||
}
|
||||
|
||||
public final T setFieldsInternal(T entity, Fields fields) {
|
||||
entity.setOwners(fields.contains(FIELD_OWNERS) ? getOwners(entity) : entity.getOwners());
|
||||
entity.setTags(fields.contains(FIELD_TAGS) ? getTags(entity) : entity.getTags());
|
||||
@ -973,6 +997,12 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
}
|
||||
}
|
||||
|
||||
protected void postCreate(List<T> entities) {
|
||||
if (supportsSearch) {
|
||||
searchRepository.createEntities((List<EntityInterface>) entities);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
protected void postUpdate(T original, T updated) {
|
||||
if (supportsSearch) {
|
||||
@ -1426,25 +1456,41 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
return entity;
|
||||
}
|
||||
|
||||
@Transaction
|
||||
private List<T> createManyEntities(List<T> entities) {
|
||||
storeEntities(entities);
|
||||
storeExtensions(entities);
|
||||
// TODO: [START] Optimize the below ops to store all in one go
|
||||
storeRelationshipsInternal(entities);
|
||||
setInheritedFields(entities, new Fields(allowedFields));
|
||||
// TODO: [END]
|
||||
postCreate(entities);
|
||||
return entities;
|
||||
}
|
||||
|
||||
private void nullifyEntityFields(T entity) {
|
||||
entity.setHref(null);
|
||||
entity.setOwners(null);
|
||||
entity.setChildren(null);
|
||||
entity.setTags(null);
|
||||
entity.setDomain(null);
|
||||
entity.setDataProducts(null);
|
||||
entity.setFollowers(null);
|
||||
entity.setExperts(null);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
protected void store(T entity, boolean update) {
|
||||
// Don't store owner, database, href and tags as JSON. Build it on the fly based on
|
||||
// relationships
|
||||
entity.withHref(null);
|
||||
List<EntityReference> owners = entity.getOwners();
|
||||
entity.setOwners(null);
|
||||
List<EntityReference> children = entity.getChildren();
|
||||
entity.setChildren(null);
|
||||
List<TagLabel> tags = entity.getTags();
|
||||
entity.setTags(null);
|
||||
EntityReference domain = entity.getDomain();
|
||||
entity.setDomain(null);
|
||||
List<EntityReference> dataProducts = entity.getDataProducts();
|
||||
entity.setDataProducts(null);
|
||||
List<EntityReference> followers = entity.getFollowers();
|
||||
entity.setFollowers(null);
|
||||
List<EntityReference> experts = entity.getExperts();
|
||||
entity.setExperts(null);
|
||||
nullifyEntityFields(entity);
|
||||
|
||||
if (update) {
|
||||
dao.update(entity.getId(), entity.getFullyQualifiedName(), JsonUtils.pojoToJson(entity));
|
||||
@ -1465,6 +1511,37 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
entity.setExperts(experts);
|
||||
}
|
||||
|
||||
protected void storeMany(List<T> entities) {
|
||||
List<EntityInterface> nullifiedEntities = new ArrayList<>();
|
||||
Gson gson = new Gson();
|
||||
for (T entity : entities) {
|
||||
// Don't store owner, database, href and tags as JSON. Build it on the fly based on
|
||||
// relationships
|
||||
List<EntityReference> owners = entity.getOwners();
|
||||
List<EntityReference> children = entity.getChildren();
|
||||
List<TagLabel> tags = entity.getTags();
|
||||
EntityReference domain = entity.getDomain();
|
||||
List<EntityReference> dataProducts = entity.getDataProducts();
|
||||
List<EntityReference> followers = entity.getFollowers();
|
||||
List<EntityReference> experts = entity.getExperts();
|
||||
nullifyEntityFields(entity);
|
||||
|
||||
String jsonCopy = gson.toJson(entity);
|
||||
nullifiedEntities.add(gson.fromJson(jsonCopy, entityClass));
|
||||
|
||||
// Restore the relationships
|
||||
entity.setOwners(owners);
|
||||
entity.setChildren(children);
|
||||
entity.setTags(tags);
|
||||
entity.setDomain(domain);
|
||||
entity.setDataProducts(dataProducts);
|
||||
entity.setFollowers(followers);
|
||||
entity.setExperts(experts);
|
||||
}
|
||||
|
||||
dao.insertMany(nullifiedEntities);
|
||||
}
|
||||
|
||||
@Transaction
|
||||
protected void storeTimeSeries(
|
||||
String fqn, String extension, String jsonSchema, String entityJson) {
|
||||
@ -1665,6 +1742,23 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
}
|
||||
}
|
||||
|
||||
public final void storeExtensions(List<T> entities) {
|
||||
List<UUID> entityIds = new ArrayList<>();
|
||||
List<String> fieldFQNs = new ArrayList<>();
|
||||
List<String> jsons = new ArrayList<>();
|
||||
for (EntityInterface entity : entities) {
|
||||
JsonNode jsonNode = JsonUtils.valueToTree(entity.getExtension());
|
||||
Iterator<Entry<String, JsonNode>> customFields = jsonNode.fields();
|
||||
while (customFields.hasNext()) {
|
||||
Entry<String, JsonNode> entry = customFields.next();
|
||||
fieldFQNs.add(TypeRegistry.getCustomPropertyFQN(entityType, entry.getKey()));
|
||||
jsons.add(JsonUtils.pojoToJson(entry.getValue()));
|
||||
entityIds.add(entity.getId());
|
||||
}
|
||||
}
|
||||
storeCustomProperties(entityIds, fieldFQNs, jsons);
|
||||
}
|
||||
|
||||
public final void removeExtension(EntityInterface entity) {
|
||||
JsonNode jsonNode = JsonUtils.valueToTree(entity.getExtension());
|
||||
Iterator<Entry<String, JsonNode>> customFields = jsonNode.fields();
|
||||
@ -1681,6 +1775,11 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
.insert(entity.getId(), fieldFQN, "customFieldSchema", JsonUtils.pojoToJson(value));
|
||||
}
|
||||
|
||||
private void storeCustomProperties(
|
||||
List<UUID> uuids, List<String> fieldFQNs, List<String> values) {
|
||||
daoCollection.entityExtensionDAO().insertMany(uuids, fieldFQNs, "customFieldSchema", values);
|
||||
}
|
||||
|
||||
private void removeCustomProperty(EntityInterface entity, String fieldName) {
|
||||
String fieldFQN = TypeRegistry.getCustomPropertyFQN(entityType, fieldName);
|
||||
daoCollection.entityExtensionDAO().delete(entity.getId(), fieldFQN);
|
||||
@ -2640,7 +2739,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
* version goes to v-1 and new version v0 replaces v1 for the entity.
|
||||
* </ol>
|
||||
*
|
||||
* @see TableRepository.TableUpdater#entitySpecificUpdate() for example.
|
||||
* @see TableRepository.TableUpdater::entitySpecificUpdate() for example.
|
||||
*/
|
||||
public class EntityUpdater {
|
||||
private static volatile long sessionTimeoutMillis = 10L * 60 * 1000; // 10 minutes
|
||||
|
@ -17,6 +17,7 @@ import static org.openmetadata.service.Entity.populateEntityFieldTags;
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound;
|
||||
import static org.openmetadata.service.security.mask.PIIMasker.maskSampleData;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -258,6 +259,35 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
|
||||
.withTestCaseResult(testCaseResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeEntities(List<TestCase> testCases) {
|
||||
List<TestCase> testCasesToStore = new ArrayList<>();
|
||||
Gson gson = new Gson();
|
||||
for (TestCase testCase : testCases) {
|
||||
EntityReference testSuite = testCase.getTestSuite();
|
||||
EntityReference testDefinition = testCase.getTestDefinition();
|
||||
TestCaseResult testCaseResult = testCase.getTestCaseResult();
|
||||
List<TestSuite> testSuites = testCase.getTestSuites();
|
||||
|
||||
String jsonCopy =
|
||||
gson.toJson(
|
||||
testCase
|
||||
.withTestSuite(null)
|
||||
.withTestSuites(null)
|
||||
.withTestDefinition(null)
|
||||
.withTestCaseResult(null));
|
||||
testCasesToStore.add(gson.fromJson(jsonCopy, TestCase.class));
|
||||
|
||||
// restore the relationships
|
||||
testCase
|
||||
.withTestSuite(testSuite)
|
||||
.withTestSuites(testSuites)
|
||||
.withTestDefinition(testDefinition)
|
||||
.withTestCaseResult(testCaseResult);
|
||||
}
|
||||
storeMany(testCasesToStore);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeRelationships(TestCase test) {
|
||||
EntityLink entityLink = EntityLink.parse(test.getEntityLink());
|
||||
|
@ -5,6 +5,7 @@ import javax.ws.rs.core.SecurityContext;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.openmetadata.schema.system.LimitsConfig;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.exception.BulkLimitException;
|
||||
import org.openmetadata.service.security.policyevaluator.OperationContext;
|
||||
import org.openmetadata.service.security.policyevaluator.ResourceContextInterface;
|
||||
|
||||
@ -16,6 +17,13 @@ public interface Limits {
|
||||
ResourceContextInterface resourceContext,
|
||||
OperationContext operationContext);
|
||||
|
||||
default void enforceBulkSizeLimit(String entityType, int bulkSize) {
|
||||
if (bulkSize > 100) {
|
||||
throw new BulkLimitException(
|
||||
"Bulk size limit per request reached for entity type: " + entityType);
|
||||
}
|
||||
}
|
||||
|
||||
LimitsConfig getLimitsConfig();
|
||||
|
||||
Response getLimitsForaFeature(String entityType, boolean cache);
|
||||
|
@ -14,6 +14,7 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
@ -652,6 +653,62 @@ public class TestCaseResource extends EntityResource<TestCase, TestCaseRepositor
|
||||
return Response.created(test.getHref()).entity(test).build();
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/createMany")
|
||||
@Operation(
|
||||
operationId = "createManyTestCase",
|
||||
summary = "Create multiple test cases at once",
|
||||
description = "Create multiple test cases at once up to a limit of 100 per request.",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "The test",
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = TestCase.class))),
|
||||
@ApiResponse(responseCode = "400", description = "Bad request"),
|
||||
@ApiResponse(
|
||||
responseCode = "413",
|
||||
description = "Request entity too large (more than 100 test cases)")
|
||||
})
|
||||
public Response createMany(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Valid List<CreateTestCase> createTestCases) {
|
||||
List<TestCase> testCases = new ArrayList<>();
|
||||
Set<String> entityLinks =
|
||||
createTestCases.stream().map(CreateTestCase::getEntityLink).collect(Collectors.toSet());
|
||||
Set<String> testSuites =
|
||||
createTestCases.stream().map(CreateTestCase::getTestSuite).collect(Collectors.toSet());
|
||||
|
||||
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.CREATE);
|
||||
|
||||
entityLinks.forEach(
|
||||
link -> {
|
||||
EntityLink entityLink = EntityLink.parse(link);
|
||||
ResourceContextInterface resourceContext =
|
||||
TestCaseResourceContext.builder().entityLink(entityLink).build();
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
});
|
||||
|
||||
testSuites.forEach(repository::isTestSuiteBasic);
|
||||
limits.enforceBulkSizeLimit(entityType, createTestCases.size());
|
||||
|
||||
createTestCases.forEach(
|
||||
create -> {
|
||||
TestCase test =
|
||||
mapper.createToEntity(create, securityContext.getUserPrincipal().getName());
|
||||
limits.enforceLimits(
|
||||
securityContext,
|
||||
new CreateResourceContext<>(entityType, test),
|
||||
new OperationContext(Entity.TEST_CASE, MetadataOperation.EDIT_TESTS));
|
||||
testCases.add(test);
|
||||
});
|
||||
repository.createMany(uriInfo, testCases);
|
||||
return Response.ok(testCases).build();
|
||||
}
|
||||
|
||||
@PATCH
|
||||
@Path("/{id}")
|
||||
@Operation(
|
||||
|
@ -256,6 +256,8 @@ public interface SearchClient {
|
||||
|
||||
void createEntity(String indexName, String docId, String doc);
|
||||
|
||||
void createEntities(String indexName, List<Map<String, String>> docsAndIds) throws IOException;
|
||||
|
||||
void createTimeSeriesEntity(String indexName, String docId, String doc);
|
||||
|
||||
void updateEntity(String indexName, String docId, Map<String, Object> doc, String scriptTxt);
|
||||
|
@ -298,6 +298,31 @@ public class SearchRepository {
|
||||
}
|
||||
}
|
||||
|
||||
public void createEntities(List<EntityInterface> entities) {
|
||||
if (!nullOrEmpty(entities)) {
|
||||
// All entities in the list are of the same type
|
||||
String entityType = entities.get(0).getEntityReference().getType();
|
||||
IndexMapping indexMapping = entityIndexMap.get(entityType);
|
||||
List<Map<String, String>> docs = new ArrayList<>();
|
||||
for (EntityInterface entity : entities) {
|
||||
SearchIndex index = searchIndexFactory.buildIndex(entityType, entity);
|
||||
String doc = JsonUtils.pojoToJson(index.buildSearchIndexDoc());
|
||||
docs.add(Collections.singletonMap(entity.getId().toString(), doc));
|
||||
}
|
||||
|
||||
try {
|
||||
searchClient.createEntities(indexMapping.getIndexName(clusterAlias), docs);
|
||||
} catch (Exception ie) {
|
||||
LOG.error(
|
||||
"Issue in Creating entities document for entityType [{}]. Reason[{}], Cause[{}], Stack [{}]",
|
||||
entityType,
|
||||
ie.getMessage(),
|
||||
ie.getCause(),
|
||||
ExceptionUtils.getStackTrace(ie));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void createTimeSeriesEntity(EntityTimeSeriesInterface entity) {
|
||||
if (entity != null) {
|
||||
String entityType;
|
||||
|
@ -38,6 +38,7 @@ import static org.openmetadata.service.util.FullyQualifiedName.getParentFQN;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import es.org.elasticsearch.ElasticsearchStatusException;
|
||||
import es.org.elasticsearch.action.ActionListener;
|
||||
import es.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import es.org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import es.org.elasticsearch.action.bulk.BulkRequest;
|
||||
@ -45,6 +46,7 @@ import es.org.elasticsearch.action.bulk.BulkResponse;
|
||||
import es.org.elasticsearch.action.delete.DeleteRequest;
|
||||
import es.org.elasticsearch.action.get.GetRequest;
|
||||
import es.org.elasticsearch.action.get.GetResponse;
|
||||
import es.org.elasticsearch.action.index.IndexRequest;
|
||||
import es.org.elasticsearch.action.search.SearchResponse;
|
||||
import es.org.elasticsearch.action.support.WriteRequest;
|
||||
import es.org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
@ -1953,6 +1955,42 @@ public class ElasticSearchClient implements SearchClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createEntities(String indexName, List<Map<String, String>> docsAndIds)
|
||||
throws IOException {
|
||||
if (isClientAvailable) {
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
for (Map<String, String> docAndId : docsAndIds) {
|
||||
Map.Entry<String, String> entry = docAndId.entrySet().iterator().next();
|
||||
IndexRequest indexRequest =
|
||||
new IndexRequest(indexName)
|
||||
.id(entry.getKey())
|
||||
.source(entry.getValue(), XContentType.JSON);
|
||||
bulkRequest.add(indexRequest);
|
||||
}
|
||||
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
ActionListener<BulkResponse> listener =
|
||||
new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkItemResponses) {
|
||||
if (bulkItemResponses.hasFailures()) {
|
||||
LOG.error(
|
||||
"Failed to create entities in ElasticSearch: {}",
|
||||
bulkItemResponses.buildFailureMessage());
|
||||
} else {
|
||||
LOG.debug("Successfully created {} entities in ElasticSearch", docsAndIds.size());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
LOG.error("Failed to create entities in ElasticSearch", e);
|
||||
}
|
||||
};
|
||||
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createTimeSeriesEntity(String indexName, String docId, String doc) {
|
||||
if (isClientAvailable) {
|
||||
|
@ -134,6 +134,7 @@ import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
|
||||
import os.org.opensearch.OpenSearchException;
|
||||
import os.org.opensearch.OpenSearchStatusException;
|
||||
import os.org.opensearch.action.ActionListener;
|
||||
import os.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import os.org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import os.org.opensearch.action.bulk.BulkRequest;
|
||||
@ -1938,6 +1939,40 @@ public class OpenSearchClient implements SearchClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createEntities(String indexName, List<Map<String, String>> docsAndIds)
|
||||
throws IOException {
|
||||
if (isClientAvailable) {
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
for (Map<String, String> docAndId : docsAndIds) {
|
||||
Map.Entry<String, String> entry = docAndId.entrySet().iterator().next();
|
||||
UpdateRequest updateRequest = new UpdateRequest(indexName, entry.getKey());
|
||||
updateRequest.doc(entry.getValue(), XContentType.JSON);
|
||||
bulkRequest.add(updateRequest);
|
||||
}
|
||||
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
ActionListener<BulkResponse> listener =
|
||||
new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkItemResponses) {
|
||||
if (bulkItemResponses.hasFailures()) {
|
||||
LOG.error(
|
||||
"Failed to create entities in ElasticSearch: {}",
|
||||
bulkItemResponses.buildFailureMessage());
|
||||
} else {
|
||||
LOG.debug("Successfully created {} entities in ElasticSearch", docsAndIds.size());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
LOG.error("Failed to create entities in ElasticSearch", e);
|
||||
}
|
||||
};
|
||||
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createTimeSeriesEntity(String indexName, String docId, String doc) {
|
||||
if (isClientAvailable) {
|
||||
|
@ -2512,6 +2512,44 @@ public class TestCaseResourceTest extends EntityResourceTest<TestCase, CreateTes
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void test_createMany(TestInfo test) throws HttpResponseException {
|
||||
List<CreateTestCase> createTestCases = new ArrayList<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
CreateTestCase createTestCase = createRequest(test, i);
|
||||
if (i % 2 == 0) {
|
||||
createTestCase.withTestSuite(TEST_SUITE1.getFullyQualifiedName());
|
||||
} else {
|
||||
createTestCase.withTestSuite(TEST_SUITE2.getFullyQualifiedName());
|
||||
}
|
||||
createTestCases.add(createTestCase);
|
||||
}
|
||||
List<Map<String, Object>> testCases = createManyTestCases(createTestCases);
|
||||
for (Map<String, Object> testCase : testCases) {
|
||||
TestCase storedTestCase =
|
||||
getTestCase(
|
||||
(String) testCase.get("fullyQualifiedName"),
|
||||
Map.of("fields", "testSuite,testDefinition"),
|
||||
ADMIN_AUTH_HEADERS);
|
||||
CreateTestCase createTestCase =
|
||||
createTestCases.stream()
|
||||
.filter(t -> t.getName().equals(storedTestCase.getName()))
|
||||
.findFirst()
|
||||
.get();
|
||||
validateCreatedEntity(storedTestCase, createTestCase, ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
for (Map<String, Object> testCase : testCases) {
|
||||
String entityLink = (String) testCase.get("entityLink");
|
||||
ResultList<TestCase> testCasesFromSearch =
|
||||
listEntitiesFromSearch(Map.of("entityLink", entityLink), 100, 0, ADMIN_AUTH_HEADERS);
|
||||
testCasesFromSearch.getData().stream()
|
||||
.filter(t -> t.getId().toString().equals(testCase.get("id")))
|
||||
.findFirst()
|
||||
.orElseThrow();
|
||||
}
|
||||
}
|
||||
|
||||
// Test utils methods
|
||||
|
||||
public ResultList<TestCaseResult> listTestCaseResultsFromSearch(
|
||||
@ -2889,7 +2927,6 @@ public class TestCaseResourceTest extends EntityResourceTest<TestCase, CreateTes
|
||||
assertEquals(request.getEntityLink(), createdEntity.getEntityLink());
|
||||
assertReference(request.getTestSuite(), createdEntity.getTestSuite());
|
||||
assertReference(request.getTestDefinition(), createdEntity.getTestDefinition());
|
||||
assertReference(request.getTestSuite(), createdEntity.getTestSuite());
|
||||
assertEquals(request.getParameterValues(), createdEntity.getParameterValues());
|
||||
}
|
||||
|
||||
@ -2988,6 +3025,14 @@ public class TestCaseResourceTest extends EntityResourceTest<TestCase, CreateTes
|
||||
ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
private List<Map<String, Object>> createManyTestCases(List<CreateTestCase> createTestCases)
|
||||
throws HttpResponseException {
|
||||
String pathUrl = "/createMany/";
|
||||
WebTarget target = getCollection().path(pathUrl);
|
||||
return TestUtils.post(
|
||||
target, createTestCases, List.class, OK.getStatusCode(), ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
private ResultList<TestCaseResolutionStatus> getTestCaseFailureStatus(
|
||||
int limit, String offset, Boolean latest, Long startTs, Long endTs, String testCaseFqn)
|
||||
throws HttpResponseException {
|
||||
|
@ -314,10 +314,16 @@ public final class TestUtils {
|
||||
|
||||
public static <K> void post(WebTarget target, K request, Map<String, String> headers)
|
||||
throws HttpResponseException {
|
||||
post(target, request, Status.CREATED.getStatusCode(), headers);
|
||||
}
|
||||
|
||||
public static <K> void post(
|
||||
WebTarget target, K request, int expectedStatus, Map<String, String> headers)
|
||||
throws HttpResponseException {
|
||||
Entity<K> entity =
|
||||
(request == null) ? null : Entity.entity(request, MediaType.APPLICATION_JSON);
|
||||
Response response = SecurityUtil.addHeaders(target, headers).post(entity);
|
||||
readResponse(response, Status.CREATED.getStatusCode());
|
||||
readResponse(response, expectedStatus);
|
||||
}
|
||||
|
||||
public static <T, K> T post(
|
||||
|
Loading…
x
Reference in New Issue
Block a user