Bulk apis change events (#24394)

* add change event for bulk apis

* add change event for bulk apis

* fix tests

* fix tests
This commit is contained in:
Sriharsha Chintalapani 2025-11-19 18:36:10 -08:00 committed by GitHub
parent 5db13f8937
commit de416f4c08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 147 additions and 1 deletions

View File

@ -195,6 +195,7 @@ import org.openmetadata.service.exception.EntityLockedException;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.exception.PreconditionFailedException;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.formatter.util.FormatterUtil;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityVersionPair;
import org.openmetadata.service.jdbi3.CollectionDAO.ExtensionRecord;
@ -6729,6 +6730,49 @@ public abstract class EntityRepository<T extends EntityInterface> {
return update(uriInfo, original, updated, userName, null);
}
private void createChangeEventForBulkOperation(T entity, EventType eventType, String userName) {
try {
if (eventType.equals(ENTITY_NO_CHANGE)) {
return;
}
ChangeEvent changeEvent =
FormatterUtil.createChangeEventForEntity(userName, eventType, entity);
if (changeEvent.getEntity() != null) {
Object entityObject = changeEvent.getEntity();
changeEvent = copyChangeEvent(changeEvent);
changeEvent.setEntity(JsonUtils.pojoToMaskedJson(entityObject));
}
LOG.debug(
"Recording change event for bulk operation {}:{}:{}:{}",
changeEvent.getTimestamp(),
changeEvent.getEntityId(),
changeEvent.getEventType(),
changeEvent.getEntityType());
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
} catch (Exception e) {
LOG.error("Failed to create change event for bulk operation", e);
}
}
private static ChangeEvent copyChangeEvent(ChangeEvent changeEvent) {
return new ChangeEvent()
.withId(changeEvent.getId())
.withEventType(changeEvent.getEventType())
.withEntityId(changeEvent.getEntityId())
.withEntityType(changeEvent.getEntityType())
.withUserName(changeEvent.getUserName())
.withImpersonatedBy(changeEvent.getImpersonatedBy())
.withTimestamp(changeEvent.getTimestamp())
.withChangeDescription(changeEvent.getChangeDescription())
.withCurrentVersion(changeEvent.getCurrentVersion())
.withPreviousVersion(changeEvent.getPreviousVersion())
.withEntityFullyQualifiedName(changeEvent.getEntityFullyQualifiedName());
}
public BulkOperationResult bulkCreateOrUpdateEntities(
UriInfo uriInfo, List<T> entities, String userName) {
@ -6745,11 +6789,13 @@ public abstract class EntityRepository<T extends EntityInterface> {
CompletableFuture.runAsync(
() -> {
try {
bulkCreateOrUpdateEntity(uriInfo, entity, userName);
PutResponse<T> putResponse = bulkCreateOrUpdateEntity(uriInfo, entity, userName);
successRequests.add(
new BulkResponse()
.withRequest(entity.getFullyQualifiedName())
.withStatus(Status.OK.getStatusCode()));
createChangeEventForBulkOperation(
putResponse.getEntity(), putResponse.getChangeType(), userName);
} catch (Exception e) {
LOG.warn("Failed to process entity in bulk operation", e);
failedRequests.add(

View File

@ -50,6 +50,9 @@ import static org.openmetadata.schema.type.ColumnDataType.STRING;
import static org.openmetadata.schema.type.ColumnDataType.STRUCT;
import static org.openmetadata.schema.type.ColumnDataType.TIMESTAMP;
import static org.openmetadata.schema.type.ColumnDataType.VARCHAR;
import static org.openmetadata.schema.type.EventType.ENTITY_CREATED;
import static org.openmetadata.schema.type.EventType.ENTITY_FIELDS_CHANGED;
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.service.Entity.FIELD_OWNERS;
import static org.openmetadata.service.Entity.FIELD_TAGS;
import static org.openmetadata.service.Entity.TABLE;
@ -5955,4 +5958,101 @@ public class TableResourceTest extends EntityResourceTest<Table, CreateTable> {
// Cleanup
deleteEntity(table.getId(), false, true, ADMIN_AUTH_HEADERS);
}
@Test
void test_bulkCreateOrUpdate_generatesChangeEvents(TestInfo test) throws IOException {
int tableCount = 3;
List<CreateTable> createRequests = new ArrayList<>();
for (int i = 0; i < tableCount; i++) {
String tableName = getEntityName(test, i);
CreateTable create =
createRequest(tableName).withDescription("Table " + i + " for bulk creation test");
createRequests.add(create);
}
WebTarget target = getResource("tables/bulk");
BulkOperationResult result =
TestUtils.put(target, createRequests, BulkOperationResult.class, OK, ADMIN_AUTH_HEADERS);
assertNotNull(result);
assertEquals(tableCount, result.getNumberOfRowsProcessed());
assertEquals(tableCount, result.getNumberOfRowsPassed());
assertEquals(0, result.getNumberOfRowsFailed());
ResultList<ChangeEvent> changeEvents =
getChangeEvents(TABLE, null, null, null, ADMIN_AUTH_HEADERS);
assertNotNull(changeEvents);
assertNotNull(changeEvents.getData());
long bulkCreatedEventCount =
changeEvents.getData().stream()
.filter(
event ->
event.getEntityType().equals(TABLE)
&& event.getEventType().equals(ENTITY_CREATED)
&& event.getUserName().equals("admin"))
.count();
assertTrue(
bulkCreatedEventCount >= tableCount,
"Expected at least "
+ tableCount
+ " change events for bulk created tables, but found "
+ bulkCreatedEventCount);
for (CreateTable createRequest : createRequests) {
Optional<ChangeEvent> tableEvent =
changeEvents.getData().stream()
.filter(
event ->
event.getEntityType().equals(TABLE)
&& event.getEntityFullyQualifiedName() != null
&& event.getEntityFullyQualifiedName().contains(createRequest.getName()))
.findFirst();
assertTrue(
tableEvent.isPresent(), "Change event not found for table: " + createRequest.getName());
assertEquals(ENTITY_CREATED, tableEvent.get().getEventType());
assertEquals("admin", tableEvent.get().getUserName());
}
// Update the descriptions for bulk update test
for (int i = 0; i < createRequests.size(); i++) {
CreateTable request = createRequests.get(i);
createRequests.set(
i,
createRequest(request.getName()).withDescription("Updated description for table " + i));
}
target = getResource("tables/bulk");
result =
TestUtils.put(target, createRequests, BulkOperationResult.class, OK, ADMIN_AUTH_HEADERS);
assertNotNull(result);
assertEquals(tableCount, result.getNumberOfRowsProcessed());
assertEquals(tableCount, result.getNumberOfRowsPassed());
assertEquals(0, result.getNumberOfRowsFailed());
changeEvents = getChangeEvents(null, TABLE, null, null, ADMIN_AUTH_HEADERS);
assertNotNull(changeEvents);
assertNotNull(changeEvents.getData());
long bulkUpdatedEventCount =
changeEvents.getData().stream()
.filter(
event ->
event.getEntityType().equals(TABLE)
&& (event.getEventType().equals(ENTITY_UPDATED)
|| event.getEventType().equals(ENTITY_FIELDS_CHANGED))
&& event.getUserName().equals("admin"))
.count();
assertTrue(
bulkUpdatedEventCount >= tableCount,
"Expected at least "
+ tableCount
+ " change events for bulk updated tables, but found "
+ bulkUpdatedEventCount);
}
}