mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-25 17:04:54 +00:00
parent
dfebc85c4e
commit
ab0a4f5e36
@ -23,10 +23,8 @@ import java.security.GeneralSecurityException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
|
||||
import org.openmetadata.catalog.type.ChangeEvent;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import org.openmetadata.catalog.util.ResultList;
|
||||
|
||||
public class ChangeEventRepository {
|
||||
private final CollectionDAO dao;
|
||||
@ -36,7 +34,7 @@ public class ChangeEventRepository {
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public ResultList<ChangeEvent> list(
|
||||
public List<ChangeEvent> list(
|
||||
long timestamp, List<String> entityCreatedList, List<String> entityUpdatedList, List<String> entityDeletedList)
|
||||
throws IOException, GeneralSecurityException {
|
||||
List<String> jsons = new ArrayList<>();
|
||||
@ -49,6 +47,6 @@ public class ChangeEventRepository {
|
||||
for (String json : jsons) {
|
||||
changeEvents.add(JsonUtils.readValue(json, ChangeEvent.class));
|
||||
}
|
||||
return new ChangeEventList(changeEvents, null, null, changeEvents.size()); // TODO
|
||||
return changeEvents;
|
||||
}
|
||||
}
|
||||
|
@ -40,6 +40,7 @@ import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.catalog.resources.Collection;
|
||||
import org.openmetadata.catalog.security.Authorizer;
|
||||
import org.openmetadata.catalog.type.ChangeEvent;
|
||||
import org.openmetadata.catalog.util.EntityUtil;
|
||||
import org.openmetadata.catalog.util.ResultList;
|
||||
|
||||
@Path("/v1/events")
|
||||
@ -117,6 +118,8 @@ public class EventResource {
|
||||
List<String> entityCreatedList = EntityList.getEntityList("entityCreated", entityCreated);
|
||||
List<String> entityUpdatedList = EntityList.getEntityList("entityUpdated", entityUpdated);
|
||||
List<String> entityDeletedList = EntityList.getEntityList("entityDeleted", entityDeleted);
|
||||
return dao.list(timestamp, entityCreatedList, entityUpdatedList, entityDeletedList);
|
||||
List<ChangeEvent> events = dao.list(timestamp, entityCreatedList, entityUpdatedList, entityDeletedList);
|
||||
events.sort(EntityUtil.compareChangeEvent); // Sort change events based on time
|
||||
return new ChangeEventList(events, null, null, events.size()); // TODO
|
||||
}
|
||||
}
|
||||
|
@ -45,6 +45,7 @@ import org.openmetadata.catalog.jdbi3.CollectionDAO.TeamDAO;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO.UsageDAO;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO.UserDAO;
|
||||
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.catalog.type.ChangeEvent;
|
||||
import org.openmetadata.catalog.type.Column;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.type.EventFilter;
|
||||
@ -80,6 +81,7 @@ public final class EntityUtil {
|
||||
public static final Comparator<FieldChange> compareFieldChange = Comparator.comparing(FieldChange::getName);
|
||||
public static final Comparator<TableConstraint> compareTableConstraint =
|
||||
Comparator.comparing(TableConstraint::getConstraintType);
|
||||
public static final Comparator<ChangeEvent> compareChangeEvent = Comparator.comparing(ChangeEvent::getTimestamp);
|
||||
|
||||
//
|
||||
// Matchers used for matching two items in a list
|
||||
|
@ -266,9 +266,7 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = details.getEvents();
|
||||
assertNotNull(callbackEvents);
|
||||
assertNotNull(callbackEvents.peek());
|
||||
List<ChangeEvent> actualEvents =
|
||||
getChangeEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), ADMIN_AUTH_HEADERS).getData();
|
||||
waitAndCheckForEvents(actualEvents, callbackEvents, 15, 250);
|
||||
waitAndCheckForEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), callbackEvents, 15, 250);
|
||||
assertWebhookStatusSuccess("healthy");
|
||||
}
|
||||
|
||||
@ -279,15 +277,13 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
List<ChangeEvent> callbackEvents =
|
||||
webhookCallbackResource.getEntityCallbackEvents(EventType.ENTITY_CREATED, entity);
|
||||
long timestamp = callbackEvents.get(0).getTimestamp();
|
||||
List<ChangeEvent> events = getChangeEvents(entity, null, null, timestamp, ADMIN_AUTH_HEADERS).getData();
|
||||
waitAndCheckForEvents(callbackEvents, events, 30, 100);
|
||||
waitAndCheckForEvents(entity, null, null, timestamp, callbackEvents, 30, 100);
|
||||
|
||||
// For the entity all the webhooks registered for updated events have the right number of events
|
||||
callbackEvents = webhookCallbackResource.getEntityCallbackEvents(EventType.ENTITY_UPDATED, entity);
|
||||
// Use previous date if no update events
|
||||
timestamp = callbackEvents.size() > 0 ? callbackEvents.get(0).getTimestamp() : timestamp;
|
||||
events = getChangeEvents(null, entity, null, timestamp, ADMIN_AUTH_HEADERS).getData();
|
||||
waitAndCheckForEvents(callbackEvents, events, 30, 100);
|
||||
waitAndCheckForEvents(null, entity, null, timestamp, callbackEvents, 30, 100);
|
||||
|
||||
// TODO add delete event support
|
||||
}
|
||||
@ -311,9 +307,7 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = details.getEvents();
|
||||
assertNotNull(callbackEvents.peek());
|
||||
|
||||
List<ChangeEvent> actualEvents =
|
||||
getChangeEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), ADMIN_AUTH_HEADERS).getData();
|
||||
waitAndCheckForEvents(actualEvents, callbackEvents, 30, 100);
|
||||
waitAndCheckForEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), callbackEvents, 30, 100);
|
||||
|
||||
// Check all webhook status
|
||||
assertWebhookStatusSuccess("slowServer");
|
||||
@ -356,13 +350,23 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
}
|
||||
|
||||
public void waitAndCheckForEvents(
|
||||
Collection<ChangeEvent> expected, Collection<ChangeEvent> received, int iteration, long sleepMillis)
|
||||
throws InterruptedException {
|
||||
String entityCreated,
|
||||
String entityUpdated,
|
||||
String entityDeleted,
|
||||
long timestamp,
|
||||
Collection<ChangeEvent> received,
|
||||
int iteration,
|
||||
long sleepMillis)
|
||||
throws InterruptedException, HttpResponseException {
|
||||
int i = 0;
|
||||
List<ChangeEvent> expected =
|
||||
getChangeEvents(entityCreated, entityUpdated, entityDeleted, timestamp, ADMIN_AUTH_HEADERS).getData();
|
||||
while (expected.size() < received.size() && i < iteration) {
|
||||
Thread.sleep(sleepMillis);
|
||||
i++;
|
||||
}
|
||||
// Refresh the expected events again by getting list of events to compare with webhook received events
|
||||
expected = getChangeEvents(entityCreated, entityUpdated, entityDeleted, timestamp, ADMIN_AUTH_HEADERS).getData();
|
||||
if (expected.size() != received.size()) {
|
||||
expected.forEach(
|
||||
c1 ->
|
||||
|
Loading…
x
Reference in New Issue
Block a user