diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/exception/BulkLimitException.java b/openmetadata-service/src/main/java/org/openmetadata/service/exception/BulkLimitException.java new file mode 100644 index 00000000000..050395aabe2 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/exception/BulkLimitException.java @@ -0,0 +1,12 @@ +package org.openmetadata.service.exception; + +import javax.ws.rs.core.Response; +import org.openmetadata.sdk.exception.WebServiceException; + +public class BulkLimitException extends WebServiceException { + private static final String BATCH_ERROR_TYPE = "BULK_LIMIT_EXCEPTION"; + + public BulkLimitException(String message) { + super(Response.Status.REQUEST_ENTITY_TOO_LARGE, BATCH_ERROR_TYPE, message); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 169f687f045..4c77797df00 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -714,6 +714,24 @@ public interface CollectionDAO { @Bind("jsonSchema") String jsonSchema, @Bind("json") String json); + @Transaction + @ConnectionAwareSqlBatch( + value = + "REPLACE INTO entity_extension(id, extension, jsonSchema, json) " + + "VALUES (:id, :extension, :jsonSchema, :json)", + connectionType = MYSQL) + @ConnectionAwareSqlBatch( + value = + "INSERT INTO entity_extension(id, extension, jsonSchema, json) " + + "VALUES (:id, :extension, :jsonSchema, (:json :: jsonb)) " + + "ON CONFLICT (id, extension) DO UPDATE SET jsonSchema = EXCLUDED.jsonSchema, json = EXCLUDED.json", + connectionType = POSTGRES) + void insertMany( + @BindUUID("id") List id, + @Bind("extension") List extension, + @Bind("jsonSchema") String jsonSchema, + @Bind("json") List json); + @ConnectionAwareSqlUpdate( value = "UPDATE entity_extension SET json = :json where (json -> '$.id') = :id", connectionType = MYSQL) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java index 67d5a62078a..4816f5f09b8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java @@ -26,13 +26,16 @@ import lombok.SneakyThrows; import org.jdbi.v3.sqlobject.customizer.Bind; import org.jdbi.v3.sqlobject.customizer.BindMap; import org.jdbi.v3.sqlobject.customizer.Define; +import org.jdbi.v3.sqlobject.statement.BatchChunkSize; import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; +import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.type.Include; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.CatalogExceptionMessage; import org.openmetadata.service.exception.EntityNotFoundException; +import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlBatch; import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery; import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate; import org.openmetadata.service.util.FullyQualifiedName; @@ -71,6 +74,22 @@ public interface EntityDAO { @BindFQN("nameHashColumnValue") String nameHashColumnValue, @Bind("json") String json); + /** Common queries for all entities implemented here. Do not override. */ + @Transaction + @ConnectionAwareSqlBatch( + value = "INSERT INTO (, json) VALUES (:nameHashColumnValue, :json)", + connectionType = MYSQL) + @ConnectionAwareSqlBatch( + value = + "INSERT INTO
(, json) VALUES (:nameHashColumnValue, :json :: jsonb)", + connectionType = POSTGRES) + @BatchChunkSize(100) + void insertMany( + @Define("table") String table, + @Define("nameHashColumn") String nameHashColumn, + @BindFQN("nameHashColumnValue") List nameHashColumnValue, + @Bind("json") List json); + @ConnectionAwareSqlUpdate( value = "UPDATE
SET json = :json, = :nameHashColumnValue WHERE id = :id", @@ -366,6 +385,16 @@ public interface EntityDAO { insert(getTableName(), getNameHashColumn(), fqn, JsonUtils.pojoToJson(entity)); } + /** Default methods that interfaces with implementation. Don't override */ + default void insertMany(List entities) { + List fqns = entities.stream().map(EntityInterface::getFullyQualifiedName).toList(); + insertMany( + getTableName(), + getNameHashColumn(), + fqns, + entities.stream().map(JsonUtils::pojoToJson).toList()); + } + default void insert(String nameHash, EntityInterface entity, String fqn) { insert(getTableName(), nameHash, fqn, JsonUtils.pojoToJson(entity)); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index 03273955aff..036eee1f3d6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -75,6 +75,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.UncheckedExecutionException; +import com.google.gson.Gson; import com.networknt.schema.JsonSchema; import com.networknt.schema.ValidationMessage; import java.io.IOException; @@ -421,6 +422,10 @@ public abstract class EntityRepository { */ protected abstract void storeEntity(T entity, boolean update); + protected void storeEntities(List entities) { + // Nothing to do here. This method is overridden in the child class if required + } + /** * This method is called to store all the relationships of an entity. It is expected that all relationships are * already validated and completely setup before this method is called and no validation of relationships is required. @@ -442,6 +447,13 @@ public abstract class EntityRepository { } } + @SuppressWarnings("unused") + protected void setInheritedFields(List entities, Fields fields) { + for (T entity : entities) { + setInheritedFields(entity, fields); + } + } + protected final void addServiceRelationship(T entity, EntityReference service) { if (service != null) { addRelationship( @@ -893,6 +905,14 @@ public abstract class EntityRepository { return new EntityHistory().withEntityType(entityType).withVersions(allVersions); } + public final List createMany(UriInfo uriInfo, List entities) { + for (T e : entities) { + prepareInternal(e, false); + } + List createdEntities = createManyEntities(entities); + return createdEntities; + } + public final T create(UriInfo uriInfo, T entity) { entity = withHref(uriInfo, createInternal(entity)); return entity; @@ -920,6 +940,10 @@ public abstract class EntityRepository { storeRelationships(entity); } + public final void storeRelationshipsInternal(List entity) { + entity.forEach(this::storeRelationshipsInternal); + } + public final T setFieldsInternal(T entity, Fields fields) { entity.setOwners(fields.contains(FIELD_OWNERS) ? getOwners(entity) : entity.getOwners()); entity.setTags(fields.contains(FIELD_TAGS) ? getTags(entity) : entity.getTags()); @@ -973,6 +997,12 @@ public abstract class EntityRepository { } } + protected void postCreate(List entities) { + if (supportsSearch) { + searchRepository.createEntities((List) entities); + } + } + @SuppressWarnings("unused") protected void postUpdate(T original, T updated) { if (supportsSearch) { @@ -1426,25 +1456,41 @@ public abstract class EntityRepository { return entity; } + @Transaction + private List createManyEntities(List entities) { + storeEntities(entities); + storeExtensions(entities); + // TODO: [START] Optimize the below ops to store all in one go + storeRelationshipsInternal(entities); + setInheritedFields(entities, new Fields(allowedFields)); + // TODO: [END] + postCreate(entities); + return entities; + } + + private void nullifyEntityFields(T entity) { + entity.setHref(null); + entity.setOwners(null); + entity.setChildren(null); + entity.setTags(null); + entity.setDomain(null); + entity.setDataProducts(null); + entity.setFollowers(null); + entity.setExperts(null); + } + @Transaction protected void store(T entity, boolean update) { // Don't store owner, database, href and tags as JSON. Build it on the fly based on // relationships - entity.withHref(null); List owners = entity.getOwners(); - entity.setOwners(null); List children = entity.getChildren(); - entity.setChildren(null); List tags = entity.getTags(); - entity.setTags(null); EntityReference domain = entity.getDomain(); - entity.setDomain(null); List dataProducts = entity.getDataProducts(); - entity.setDataProducts(null); List followers = entity.getFollowers(); - entity.setFollowers(null); List experts = entity.getExperts(); - entity.setExperts(null); + nullifyEntityFields(entity); if (update) { dao.update(entity.getId(), entity.getFullyQualifiedName(), JsonUtils.pojoToJson(entity)); @@ -1465,6 +1511,37 @@ public abstract class EntityRepository { entity.setExperts(experts); } + protected void storeMany(List entities) { + List nullifiedEntities = new ArrayList<>(); + Gson gson = new Gson(); + for (T entity : entities) { + // Don't store owner, database, href and tags as JSON. Build it on the fly based on + // relationships + List owners = entity.getOwners(); + List children = entity.getChildren(); + List tags = entity.getTags(); + EntityReference domain = entity.getDomain(); + List dataProducts = entity.getDataProducts(); + List followers = entity.getFollowers(); + List experts = entity.getExperts(); + nullifyEntityFields(entity); + + String jsonCopy = gson.toJson(entity); + nullifiedEntities.add(gson.fromJson(jsonCopy, entityClass)); + + // Restore the relationships + entity.setOwners(owners); + entity.setChildren(children); + entity.setTags(tags); + entity.setDomain(domain); + entity.setDataProducts(dataProducts); + entity.setFollowers(followers); + entity.setExperts(experts); + } + + dao.insertMany(nullifiedEntities); + } + @Transaction protected void storeTimeSeries( String fqn, String extension, String jsonSchema, String entityJson) { @@ -1665,6 +1742,23 @@ public abstract class EntityRepository { } } + public final void storeExtensions(List entities) { + List entityIds = new ArrayList<>(); + List fieldFQNs = new ArrayList<>(); + List jsons = new ArrayList<>(); + for (EntityInterface entity : entities) { + JsonNode jsonNode = JsonUtils.valueToTree(entity.getExtension()); + Iterator> customFields = jsonNode.fields(); + while (customFields.hasNext()) { + Entry entry = customFields.next(); + fieldFQNs.add(TypeRegistry.getCustomPropertyFQN(entityType, entry.getKey())); + jsons.add(JsonUtils.pojoToJson(entry.getValue())); + entityIds.add(entity.getId()); + } + } + storeCustomProperties(entityIds, fieldFQNs, jsons); + } + public final void removeExtension(EntityInterface entity) { JsonNode jsonNode = JsonUtils.valueToTree(entity.getExtension()); Iterator> customFields = jsonNode.fields(); @@ -1681,6 +1775,11 @@ public abstract class EntityRepository { .insert(entity.getId(), fieldFQN, "customFieldSchema", JsonUtils.pojoToJson(value)); } + private void storeCustomProperties( + List uuids, List fieldFQNs, List values) { + daoCollection.entityExtensionDAO().insertMany(uuids, fieldFQNs, "customFieldSchema", values); + } + private void removeCustomProperty(EntityInterface entity, String fieldName) { String fieldFQN = TypeRegistry.getCustomPropertyFQN(entityType, fieldName); daoCollection.entityExtensionDAO().delete(entity.getId(), fieldFQN); @@ -2640,7 +2739,7 @@ public abstract class EntityRepository { * version goes to v-1 and new version v0 replaces v1 for the entity. * * - * @see TableRepository.TableUpdater#entitySpecificUpdate() for example. + * @see TableRepository.TableUpdater::entitySpecificUpdate() for example. */ public class EntityUpdater { private static volatile long sessionTimeoutMillis = 10L * 60 * 1000; // 10 minutes diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseRepository.java index c1015ddf940..2a73c791122 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseRepository.java @@ -17,6 +17,7 @@ import static org.openmetadata.service.Entity.populateEntityFieldTags; import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound; import static org.openmetadata.service.security.mask.PIIMasker.maskSampleData; +import com.google.gson.Gson; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -258,6 +259,35 @@ public class TestCaseRepository extends EntityRepository { .withTestCaseResult(testCaseResult); } + @Override + public void storeEntities(List testCases) { + List testCasesToStore = new ArrayList<>(); + Gson gson = new Gson(); + for (TestCase testCase : testCases) { + EntityReference testSuite = testCase.getTestSuite(); + EntityReference testDefinition = testCase.getTestDefinition(); + TestCaseResult testCaseResult = testCase.getTestCaseResult(); + List testSuites = testCase.getTestSuites(); + + String jsonCopy = + gson.toJson( + testCase + .withTestSuite(null) + .withTestSuites(null) + .withTestDefinition(null) + .withTestCaseResult(null)); + testCasesToStore.add(gson.fromJson(jsonCopy, TestCase.class)); + + // restore the relationships + testCase + .withTestSuite(testSuite) + .withTestSuites(testSuites) + .withTestDefinition(testDefinition) + .withTestCaseResult(testCaseResult); + } + storeMany(testCasesToStore); + } + @Override public void storeRelationships(TestCase test) { EntityLink entityLink = EntityLink.parse(test.getEntityLink()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/limits/Limits.java b/openmetadata-service/src/main/java/org/openmetadata/service/limits/Limits.java index b1a9a8a70dd..dd5f6a423c8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/limits/Limits.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/limits/Limits.java @@ -5,6 +5,7 @@ import javax.ws.rs.core.SecurityContext; import org.jdbi.v3.core.Jdbi; import org.openmetadata.schema.system.LimitsConfig; import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.exception.BulkLimitException; import org.openmetadata.service.security.policyevaluator.OperationContext; import org.openmetadata.service.security.policyevaluator.ResourceContextInterface; @@ -16,6 +17,13 @@ public interface Limits { ResourceContextInterface resourceContext, OperationContext operationContext); + default void enforceBulkSizeLimit(String entityType, int bulkSize) { + if (bulkSize > 100) { + throw new BulkLimitException( + "Bulk size limit per request reached for entity type: " + entityType); + } + } + LimitsConfig getLimitsConfig(); Response getLimitsForaFeature(String entityType, boolean cache); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResource.java index 9f277a24994..9c5f374d989 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResource.java @@ -14,6 +14,7 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.UUID; @@ -652,6 +653,62 @@ public class TestCaseResource extends EntityResource createTestCases) { + List testCases = new ArrayList<>(); + Set entityLinks = + createTestCases.stream().map(CreateTestCase::getEntityLink).collect(Collectors.toSet()); + Set testSuites = + createTestCases.stream().map(CreateTestCase::getTestSuite).collect(Collectors.toSet()); + + OperationContext operationContext = new OperationContext(entityType, MetadataOperation.CREATE); + + entityLinks.forEach( + link -> { + EntityLink entityLink = EntityLink.parse(link); + ResourceContextInterface resourceContext = + TestCaseResourceContext.builder().entityLink(entityLink).build(); + authorizer.authorize(securityContext, operationContext, resourceContext); + }); + + testSuites.forEach(repository::isTestSuiteBasic); + limits.enforceBulkSizeLimit(entityType, createTestCases.size()); + + createTestCases.forEach( + create -> { + TestCase test = + mapper.createToEntity(create, securityContext.getUserPrincipal().getName()); + limits.enforceLimits( + securityContext, + new CreateResourceContext<>(entityType, test), + new OperationContext(Entity.TEST_CASE, MetadataOperation.EDIT_TESTS)); + testCases.add(test); + }); + repository.createMany(uriInfo, testCases); + return Response.ok(testCases).build(); + } + @PATCH @Path("/{id}") @Operation( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index d764b11a8b0..b739ebcb9a7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -256,6 +256,8 @@ public interface SearchClient { void createEntity(String indexName, String docId, String doc); + void createEntities(String indexName, List> docsAndIds) throws IOException; + void createTimeSeriesEntity(String indexName, String docId, String doc); void updateEntity(String indexName, String docId, Map doc, String scriptTxt); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 015aa7639fa..943a23e593f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -298,6 +298,31 @@ public class SearchRepository { } } + public void createEntities(List entities) { + if (!nullOrEmpty(entities)) { + // All entities in the list are of the same type + String entityType = entities.get(0).getEntityReference().getType(); + IndexMapping indexMapping = entityIndexMap.get(entityType); + List> docs = new ArrayList<>(); + for (EntityInterface entity : entities) { + SearchIndex index = searchIndexFactory.buildIndex(entityType, entity); + String doc = JsonUtils.pojoToJson(index.buildSearchIndexDoc()); + docs.add(Collections.singletonMap(entity.getId().toString(), doc)); + } + + try { + searchClient.createEntities(indexMapping.getIndexName(clusterAlias), docs); + } catch (Exception ie) { + LOG.error( + "Issue in Creating entities document for entityType [{}]. Reason[{}], Cause[{}], Stack [{}]", + entityType, + ie.getMessage(), + ie.getCause(), + ExceptionUtils.getStackTrace(ie)); + } + } + } + public void createTimeSeriesEntity(EntityTimeSeriesInterface entity) { if (entity != null) { String entityType; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 3813221fe59..82fa864163a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -38,6 +38,7 @@ import static org.openmetadata.service.util.FullyQualifiedName.getParentFQN; import com.fasterxml.jackson.databind.JsonNode; import es.org.elasticsearch.ElasticsearchStatusException; +import es.org.elasticsearch.action.ActionListener; import es.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import es.org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import es.org.elasticsearch.action.bulk.BulkRequest; @@ -45,6 +46,7 @@ import es.org.elasticsearch.action.bulk.BulkResponse; import es.org.elasticsearch.action.delete.DeleteRequest; import es.org.elasticsearch.action.get.GetRequest; import es.org.elasticsearch.action.get.GetResponse; +import es.org.elasticsearch.action.index.IndexRequest; import es.org.elasticsearch.action.search.SearchResponse; import es.org.elasticsearch.action.support.WriteRequest; import es.org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -1953,6 +1955,42 @@ public class ElasticSearchClient implements SearchClient { } } + @Override + public void createEntities(String indexName, List> docsAndIds) + throws IOException { + if (isClientAvailable) { + BulkRequest bulkRequest = new BulkRequest(); + for (Map docAndId : docsAndIds) { + Map.Entry entry = docAndId.entrySet().iterator().next(); + IndexRequest indexRequest = + new IndexRequest(indexName) + .id(entry.getKey()) + .source(entry.getValue(), XContentType.JSON); + bulkRequest.add(indexRequest); + } + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + ActionListener listener = + new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (bulkItemResponses.hasFailures()) { + LOG.error( + "Failed to create entities in ElasticSearch: {}", + bulkItemResponses.buildFailureMessage()); + } else { + LOG.debug("Successfully created {} entities in ElasticSearch", docsAndIds.size()); + } + } + + @Override + public void onFailure(Exception e) { + LOG.error("Failed to create entities in ElasticSearch", e); + } + }; + client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); + } + } + @Override public void createTimeSeriesEntity(String indexName, String docId, String doc) { if (isClientAvailable) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index a6baeed0fac..994f82260e1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -134,6 +134,7 @@ import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; import os.org.opensearch.OpenSearchException; import os.org.opensearch.OpenSearchStatusException; +import os.org.opensearch.action.ActionListener; import os.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import os.org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import os.org.opensearch.action.bulk.BulkRequest; @@ -1938,6 +1939,40 @@ public class OpenSearchClient implements SearchClient { } } + @Override + public void createEntities(String indexName, List> docsAndIds) + throws IOException { + if (isClientAvailable) { + BulkRequest bulkRequest = new BulkRequest(); + for (Map docAndId : docsAndIds) { + Map.Entry entry = docAndId.entrySet().iterator().next(); + UpdateRequest updateRequest = new UpdateRequest(indexName, entry.getKey()); + updateRequest.doc(entry.getValue(), XContentType.JSON); + bulkRequest.add(updateRequest); + } + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + ActionListener listener = + new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (bulkItemResponses.hasFailures()) { + LOG.error( + "Failed to create entities in ElasticSearch: {}", + bulkItemResponses.buildFailureMessage()); + } else { + LOG.debug("Successfully created {} entities in ElasticSearch", docsAndIds.size()); + } + } + + @Override + public void onFailure(Exception e) { + LOG.error("Failed to create entities in ElasticSearch", e); + } + }; + client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); + } + } + @Override public void createTimeSeriesEntity(String indexName, String docId, String doc) { if (isClientAvailable) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/dqtests/TestCaseResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/dqtests/TestCaseResourceTest.java index 0b8a2c3d4a6..d707822538b 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/dqtests/TestCaseResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/dqtests/TestCaseResourceTest.java @@ -2512,6 +2512,44 @@ public class TestCaseResourceTest extends EntityResourceTest createTestCases = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + CreateTestCase createTestCase = createRequest(test, i); + if (i % 2 == 0) { + createTestCase.withTestSuite(TEST_SUITE1.getFullyQualifiedName()); + } else { + createTestCase.withTestSuite(TEST_SUITE2.getFullyQualifiedName()); + } + createTestCases.add(createTestCase); + } + List> testCases = createManyTestCases(createTestCases); + for (Map testCase : testCases) { + TestCase storedTestCase = + getTestCase( + (String) testCase.get("fullyQualifiedName"), + Map.of("fields", "testSuite,testDefinition"), + ADMIN_AUTH_HEADERS); + CreateTestCase createTestCase = + createTestCases.stream() + .filter(t -> t.getName().equals(storedTestCase.getName())) + .findFirst() + .get(); + validateCreatedEntity(storedTestCase, createTestCase, ADMIN_AUTH_HEADERS); + } + + for (Map testCase : testCases) { + String entityLink = (String) testCase.get("entityLink"); + ResultList testCasesFromSearch = + listEntitiesFromSearch(Map.of("entityLink", entityLink), 100, 0, ADMIN_AUTH_HEADERS); + testCasesFromSearch.getData().stream() + .filter(t -> t.getId().toString().equals(testCase.get("id"))) + .findFirst() + .orElseThrow(); + } + } + // Test utils methods public ResultList listTestCaseResultsFromSearch( @@ -2889,7 +2927,6 @@ public class TestCaseResourceTest extends EntityResourceTest> createManyTestCases(List createTestCases) + throws HttpResponseException { + String pathUrl = "/createMany/"; + WebTarget target = getCollection().path(pathUrl); + return TestUtils.post( + target, createTestCases, List.class, OK.getStatusCode(), ADMIN_AUTH_HEADERS); + } + private ResultList getTestCaseFailureStatus( int limit, String offset, Boolean latest, Long startTs, Long endTs, String testCaseFqn) throws HttpResponseException { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java b/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java index 534315afa9b..9e4a3cc25d0 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/util/TestUtils.java @@ -314,10 +314,16 @@ public final class TestUtils { public static void post(WebTarget target, K request, Map headers) throws HttpResponseException { + post(target, request, Status.CREATED.getStatusCode(), headers); + } + + public static void post( + WebTarget target, K request, int expectedStatus, Map headers) + throws HttpResponseException { Entity entity = (request == null) ? null : Entity.entity(request, MediaType.APPLICATION_JSON); Response response = SecurityUtil.addHeaders(target, headers).post(entity); - readResponse(response, Status.CREATED.getStatusCode()); + readResponse(response, expectedStatus); } public static T post(