diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChangeEventRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChangeEventRepository.java index 6dd3b916945..76185c10c16 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChangeEventRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChangeEventRepository.java @@ -23,10 +23,8 @@ import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.List; import org.jdbi.v3.sqlobject.transaction.Transaction; -import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; import org.openmetadata.catalog.type.ChangeEvent; import org.openmetadata.catalog.util.JsonUtils; -import org.openmetadata.catalog.util.ResultList; public class ChangeEventRepository { private final CollectionDAO dao; @@ -36,7 +34,7 @@ public class ChangeEventRepository { } @Transaction - public ResultList list( + public List list( long timestamp, List entityCreatedList, List entityUpdatedList, List entityDeletedList) throws IOException, GeneralSecurityException { List jsons = new ArrayList<>(); @@ -49,6 +47,6 @@ public class ChangeEventRepository { for (String json : jsons) { changeEvents.add(JsonUtils.readValue(json, ChangeEvent.class)); } - return new ChangeEventList(changeEvents, null, null, changeEvents.size()); // TODO + return changeEvents; } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventResource.java index e8c3346633a..8dc13810d1d 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/EventResource.java @@ -40,6 +40,7 @@ import org.openmetadata.catalog.jdbi3.CollectionDAO; import org.openmetadata.catalog.resources.Collection; import org.openmetadata.catalog.security.Authorizer; import org.openmetadata.catalog.type.ChangeEvent; +import org.openmetadata.catalog.util.EntityUtil; import org.openmetadata.catalog.util.ResultList; @Path("/v1/events") @@ -117,6 +118,8 @@ public class EventResource { List entityCreatedList = EntityList.getEntityList("entityCreated", entityCreated); List entityUpdatedList = EntityList.getEntityList("entityUpdated", entityUpdated); List entityDeletedList = EntityList.getEntityList("entityDeleted", entityDeleted); - return dao.list(timestamp, entityCreatedList, entityUpdatedList, entityDeletedList); + List events = dao.list(timestamp, entityCreatedList, entityUpdatedList, entityDeletedList); + events.sort(EntityUtil.compareChangeEvent); // Sort change events based on time + return new ChangeEventList(events, null, null, events.size()); // TODO } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index e22e0764e5c..9a16d782de2 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -45,6 +45,7 @@ import org.openmetadata.catalog.jdbi3.CollectionDAO.TeamDAO; import org.openmetadata.catalog.jdbi3.CollectionDAO.UsageDAO; import org.openmetadata.catalog.jdbi3.CollectionDAO.UserDAO; import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink; +import org.openmetadata.catalog.type.ChangeEvent; import org.openmetadata.catalog.type.Column; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EventFilter; @@ -80,6 +81,7 @@ public final class EntityUtil { public static final Comparator compareFieldChange = Comparator.comparing(FieldChange::getName); public static final Comparator compareTableConstraint = Comparator.comparing(TableConstraint::getConstraintType); + public static final Comparator compareChangeEvent = Comparator.comparing(ChangeEvent::getTimestamp); // // Matchers used for matching two items in a list diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java index 213c703dd71..7541ac806d0 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java @@ -266,9 +266,7 @@ public class WebhookResourceTest extends EntityResourceTest callbackEvents = details.getEvents(); assertNotNull(callbackEvents); assertNotNull(callbackEvents.peek()); - List actualEvents = - getChangeEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), ADMIN_AUTH_HEADERS).getData(); - waitAndCheckForEvents(actualEvents, callbackEvents, 15, 250); + waitAndCheckForEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), callbackEvents, 15, 250); assertWebhookStatusSuccess("healthy"); } @@ -279,15 +277,13 @@ public class WebhookResourceTest extends EntityResourceTest callbackEvents = webhookCallbackResource.getEntityCallbackEvents(EventType.ENTITY_CREATED, entity); long timestamp = callbackEvents.get(0).getTimestamp(); - List events = getChangeEvents(entity, null, null, timestamp, ADMIN_AUTH_HEADERS).getData(); - waitAndCheckForEvents(callbackEvents, events, 30, 100); + waitAndCheckForEvents(entity, null, null, timestamp, callbackEvents, 30, 100); // For the entity all the webhooks registered for updated events have the right number of events callbackEvents = webhookCallbackResource.getEntityCallbackEvents(EventType.ENTITY_UPDATED, entity); // Use previous date if no update events timestamp = callbackEvents.size() > 0 ? callbackEvents.get(0).getTimestamp() : timestamp; - events = getChangeEvents(null, entity, null, timestamp, ADMIN_AUTH_HEADERS).getData(); - waitAndCheckForEvents(callbackEvents, events, 30, 100); + waitAndCheckForEvents(null, entity, null, timestamp, callbackEvents, 30, 100); // TODO add delete event support } @@ -311,9 +307,7 @@ public class WebhookResourceTest extends EntityResourceTest callbackEvents = details.getEvents(); assertNotNull(callbackEvents.peek()); - List actualEvents = - getChangeEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), ADMIN_AUTH_HEADERS).getData(); - waitAndCheckForEvents(actualEvents, callbackEvents, 30, 100); + waitAndCheckForEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), callbackEvents, 30, 100); // Check all webhook status assertWebhookStatusSuccess("slowServer"); @@ -356,13 +350,23 @@ public class WebhookResourceTest extends EntityResourceTest expected, Collection received, int iteration, long sleepMillis) - throws InterruptedException { + String entityCreated, + String entityUpdated, + String entityDeleted, + long timestamp, + Collection received, + int iteration, + long sleepMillis) + throws InterruptedException, HttpResponseException { int i = 0; + List expected = + getChangeEvents(entityCreated, entityUpdated, entityDeleted, timestamp, ADMIN_AUTH_HEADERS).getData(); while (expected.size() < received.size() && i < iteration) { Thread.sleep(sleepMillis); i++; } + // Refresh the expected events again by getting list of events to compare with webhook received events + expected = getChangeEvents(entityCreated, entityUpdated, entityDeleted, timestamp, ADMIN_AUTH_HEADERS).getData(); if (expected.size() != received.size()) { expected.forEach( c1 ->