diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index 2783d51ef98..2ffea16c9a5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -195,6 +195,7 @@ import org.openmetadata.service.exception.EntityLockedException; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.exception.PreconditionFailedException; import org.openmetadata.service.exception.UnhandledServerException; +import org.openmetadata.service.formatter.util.FormatterUtil; import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord; import org.openmetadata.service.jdbi3.CollectionDAO.EntityVersionPair; import org.openmetadata.service.jdbi3.CollectionDAO.ExtensionRecord; @@ -6729,6 +6730,49 @@ public abstract class EntityRepository { return update(uriInfo, original, updated, userName, null); } + private void createChangeEventForBulkOperation(T entity, EventType eventType, String userName) { + try { + if (eventType.equals(ENTITY_NO_CHANGE)) { + return; + } + + ChangeEvent changeEvent = + FormatterUtil.createChangeEventForEntity(userName, eventType, entity); + + if (changeEvent.getEntity() != null) { + Object entityObject = changeEvent.getEntity(); + changeEvent = copyChangeEvent(changeEvent); + changeEvent.setEntity(JsonUtils.pojoToMaskedJson(entityObject)); + } + + LOG.debug( + "Recording change event for bulk operation {}:{}:{}:{}", + changeEvent.getTimestamp(), + changeEvent.getEntityId(), + changeEvent.getEventType(), + changeEvent.getEntityType()); + + Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); + } catch (Exception e) { + LOG.error("Failed to create change event for bulk operation", e); + } + } + + private static ChangeEvent copyChangeEvent(ChangeEvent changeEvent) { + return new ChangeEvent() + .withId(changeEvent.getId()) + .withEventType(changeEvent.getEventType()) + .withEntityId(changeEvent.getEntityId()) + .withEntityType(changeEvent.getEntityType()) + .withUserName(changeEvent.getUserName()) + .withImpersonatedBy(changeEvent.getImpersonatedBy()) + .withTimestamp(changeEvent.getTimestamp()) + .withChangeDescription(changeEvent.getChangeDescription()) + .withCurrentVersion(changeEvent.getCurrentVersion()) + .withPreviousVersion(changeEvent.getPreviousVersion()) + .withEntityFullyQualifiedName(changeEvent.getEntityFullyQualifiedName()); + } + public BulkOperationResult bulkCreateOrUpdateEntities( UriInfo uriInfo, List entities, String userName) { @@ -6745,11 +6789,13 @@ public abstract class EntityRepository { CompletableFuture.runAsync( () -> { try { - bulkCreateOrUpdateEntity(uriInfo, entity, userName); + PutResponse putResponse = bulkCreateOrUpdateEntity(uriInfo, entity, userName); successRequests.add( new BulkResponse() .withRequest(entity.getFullyQualifiedName()) .withStatus(Status.OK.getStatusCode())); + createChangeEventForBulkOperation( + putResponse.getEntity(), putResponse.getChangeType(), userName); } catch (Exception e) { LOG.warn("Failed to process entity in bulk operation", e); failedRequests.add( diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/databases/TableResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/databases/TableResourceTest.java index bec054dd272..04f7e505702 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/databases/TableResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/databases/TableResourceTest.java @@ -50,6 +50,9 @@ import static org.openmetadata.schema.type.ColumnDataType.STRING; import static org.openmetadata.schema.type.ColumnDataType.STRUCT; import static org.openmetadata.schema.type.ColumnDataType.TIMESTAMP; import static org.openmetadata.schema.type.ColumnDataType.VARCHAR; +import static org.openmetadata.schema.type.EventType.ENTITY_CREATED; +import static org.openmetadata.schema.type.EventType.ENTITY_FIELDS_CHANGED; +import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED; import static org.openmetadata.service.Entity.FIELD_OWNERS; import static org.openmetadata.service.Entity.FIELD_TAGS; import static org.openmetadata.service.Entity.TABLE; @@ -5955,4 +5958,101 @@ public class TableResourceTest extends EntityResourceTest { // Cleanup deleteEntity(table.getId(), false, true, ADMIN_AUTH_HEADERS); } + + @Test + void test_bulkCreateOrUpdate_generatesChangeEvents(TestInfo test) throws IOException { + int tableCount = 3; + List createRequests = new ArrayList<>(); + + for (int i = 0; i < tableCount; i++) { + String tableName = getEntityName(test, i); + CreateTable create = + createRequest(tableName).withDescription("Table " + i + " for bulk creation test"); + createRequests.add(create); + } + + WebTarget target = getResource("tables/bulk"); + BulkOperationResult result = + TestUtils.put(target, createRequests, BulkOperationResult.class, OK, ADMIN_AUTH_HEADERS); + + assertNotNull(result); + assertEquals(tableCount, result.getNumberOfRowsProcessed()); + assertEquals(tableCount, result.getNumberOfRowsPassed()); + assertEquals(0, result.getNumberOfRowsFailed()); + + ResultList changeEvents = + getChangeEvents(TABLE, null, null, null, ADMIN_AUTH_HEADERS); + assertNotNull(changeEvents); + assertNotNull(changeEvents.getData()); + + long bulkCreatedEventCount = + changeEvents.getData().stream() + .filter( + event -> + event.getEntityType().equals(TABLE) + && event.getEventType().equals(ENTITY_CREATED) + && event.getUserName().equals("admin")) + .count(); + + assertTrue( + bulkCreatedEventCount >= tableCount, + "Expected at least " + + tableCount + + " change events for bulk created tables, but found " + + bulkCreatedEventCount); + + for (CreateTable createRequest : createRequests) { + Optional tableEvent = + changeEvents.getData().stream() + .filter( + event -> + event.getEntityType().equals(TABLE) + && event.getEntityFullyQualifiedName() != null + && event.getEntityFullyQualifiedName().contains(createRequest.getName())) + .findFirst(); + + assertTrue( + tableEvent.isPresent(), "Change event not found for table: " + createRequest.getName()); + assertEquals(ENTITY_CREATED, tableEvent.get().getEventType()); + assertEquals("admin", tableEvent.get().getUserName()); + } + + // Update the descriptions for bulk update test + for (int i = 0; i < createRequests.size(); i++) { + CreateTable request = createRequests.get(i); + createRequests.set( + i, + createRequest(request.getName()).withDescription("Updated description for table " + i)); + } + + target = getResource("tables/bulk"); + result = + TestUtils.put(target, createRequests, BulkOperationResult.class, OK, ADMIN_AUTH_HEADERS); + + assertNotNull(result); + assertEquals(tableCount, result.getNumberOfRowsProcessed()); + assertEquals(tableCount, result.getNumberOfRowsPassed()); + assertEquals(0, result.getNumberOfRowsFailed()); + + changeEvents = getChangeEvents(null, TABLE, null, null, ADMIN_AUTH_HEADERS); + assertNotNull(changeEvents); + assertNotNull(changeEvents.getData()); + + long bulkUpdatedEventCount = + changeEvents.getData().stream() + .filter( + event -> + event.getEntityType().equals(TABLE) + && (event.getEventType().equals(ENTITY_UPDATED) + || event.getEventType().equals(ENTITY_FIELDS_CHANGED)) + && event.getUserName().equals("admin")) + .count(); + + assertTrue( + bulkUpdatedEventCount >= tableCount, + "Expected at least " + + tableCount + + " change events for bulk updated tables, but found " + + bulkUpdatedEventCount); + } }