Fixes #1919 Fix flaky tests related to webhook (#1920)

This commit is contained in:
Suresh Srinivas 2021-12-24 17:35:42 -08:00 committed by GitHub
parent f5c5acda66
commit 1b44e0480a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 157 additions and 155 deletions

View File

@ -0,0 +1,8 @@
--
-- Change timestamp column precision to include microseconds
--
ALTER TABLE change_event
DROP INDEX dateTime,
DROP COLUMN dateTime,
ADD COLUMN dateTime TIMESTAMP(6) GENERATED ALWAYS AS (STR_TO_DATE(json ->> '$.dateTime', '%Y-%m-%dT%T.%fZ')) NOT NULL AFTER username,
ADD INDEX (dateTime);

View File

@ -13,22 +13,23 @@
package org.openmetadata.catalog.jdbi3;
import static org.openmetadata.catalog.type.EventType.ENTITY_CREATED;
import static org.openmetadata.catalog.type.EventType.ENTITY_DELETED;
import static org.openmetadata.catalog.type.EventType.ENTITY_UPDATED;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.ResultList;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import static org.openmetadata.catalog.type.EventType.ENTITY_CREATED;
import static org.openmetadata.catalog.type.EventType.ENTITY_DELETED;
import static org.openmetadata.catalog.type.EventType.ENTITY_UPDATED;
public class ChangeEventRepository {
private final CollectionDAO dao;
@ -41,15 +42,15 @@ public class ChangeEventRepository {
Date date, List<String> entityCreatedList, List<String> entityUpdatedList, List<String> entityDeletedList)
throws IOException, GeneralSecurityException {
List<String> jsons = new ArrayList<>();
jsons.addAll(dao.changeEventDAO().list(ENTITY_CREATED.value(), entityCreatedList, date.getTime()));
jsons.addAll(dao.changeEventDAO().list(ENTITY_UPDATED.value(), entityUpdatedList, date.getTime()));
jsons.addAll(dao.changeEventDAO().list(ENTITY_DELETED.value(), entityDeletedList, date.getTime()));
String dateParam = RestUtil.DATE_TIME_FORMAT.format(date);
jsons.addAll(dao.changeEventDAO().list(ENTITY_CREATED.value(), entityCreatedList, dateParam));
jsons.addAll(dao.changeEventDAO().list(ENTITY_UPDATED.value(), entityUpdatedList, dateParam));
jsons.addAll(dao.changeEventDAO().list(ENTITY_DELETED.value(), entityDeletedList, dateParam));
List<ChangeEvent> changeEvents = new ArrayList<>();
for (String json : jsons) {
changeEvents.add(JsonUtils.readValue(json, ChangeEvent.class));
}
changeEvents.sort(
Comparator.comparing((ChangeEvent changeEvent) -> changeEvent.getDateTime().getTime()).reversed());
return new ChangeEventList(changeEvents, null, null, changeEvents.size()); // TODO
}
}

View File

@ -13,10 +13,6 @@
package org.openmetadata.catalog.jdbi3;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.sqlobject.CreateSqlObject;
@ -75,6 +71,11 @@ import org.openmetadata.catalog.type.UsageStats;
import org.openmetadata.catalog.type.Webhook;
import org.openmetadata.catalog.util.EntityUtil;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
public interface CollectionDAO {
@CreateSqlObject
DatabaseDAO databaseDAO();
@ -1111,7 +1112,7 @@ public interface CollectionDAO {
@SqlUpdate("INSERT INTO change_event (json) VALUES (:json)")
void insert(@Bind("json") String json);
default List<String> list(String eventType, List<String> entityTypes, long dateTime) {
default List<String> list(String eventType, List<String> entityTypes, String dateTime) {
if (entityTypes == null || entityTypes.isEmpty()) {
return listWithoutEntityFilter(eventType, dateTime);
}
@ -1119,21 +1120,17 @@ public interface CollectionDAO {
}
@SqlQuery(
"SELECT json FROM change_event WHERE "
+ "eventType = :eventType AND "
+ "(entityType IN (<entityTypes>)) AND "
+ "dateTime >= :dateTime "
+ "ORDER BY dateTime DESC")
"SELECT json FROM change_event WHERE "
+ "eventType = :eventType AND (entityType IN (<entityTypes>)) AND dateTime >= :dateTime "
+ "ORDER BY dateTime ASC")
List<String> listWithEntityFilter(
@Bind("eventType") String eventType,
@BindList("entityTypes") List<String> entityTypes,
@Bind("dateTime") long dateTime);
@Bind("dateTime") String dateTime);
@SqlQuery(
"SELECT json FROM change_event WHERE "
+ "eventType = :eventType AND "
+ "dateTime >= :dateTime "
+ "ORDER BY dateTime DESC")
List<String> listWithoutEntityFilter(@Bind("eventType") String eventType, @Bind("dateTime") long dateTime);
@SqlQuery("SELECT json FROM change_event WHERE "
+ "eventType = :eventType AND dateTime >= :dateTime "
+ "ORDER BY dateTime ASC")
List<String> listWithoutEntityFilter(@Bind("eventType") String eventType, @Bind("dateTime") String dateTime);
}
}

View File

@ -118,35 +118,37 @@ public class WebhookRepository extends EntityRepository<Webhook> {
}
public void addWebhookPublisher(Webhook webhook) {
if (webhook.getEnabled()) { // Only add webhook that is enabled
WebhookPublisher publisher = new WebhookPublisher(webhook);
BatchEventProcessor<ChangeEventHolder> processor = EventPubSub.addEventHandler(publisher);
publisher.setProcessor(processor);
webhookPublisherMap.put(webhook.getId(), publisher);
LOG.info("Webhook subscription started for {}", webhook.getName());
if (!webhook.getEnabled()) { // Only add webhook that is enabled
return;
}
WebhookPublisher publisher = new WebhookPublisher(webhook);
BatchEventProcessor<ChangeEventHolder> processor = EventPubSub.addEventHandler(publisher);
publisher.setProcessor(processor);
webhookPublisherMap.put(webhook.getId(), publisher);
LOG.info("Webhook subscription started for {}", webhook.getName());
}
public void updateWebhookPublisher(Webhook webhook) throws InterruptedException {
if (webhook.getEnabled()) { // Only add webhook that is enabled
// If there was a previous webhook either in disabled state or stopped due
// to errors, update it and restart subscription
// to errors, update it and restart publishing
WebhookPublisher previousPublisher = getPublisher(webhook.getId());
if (previousPublisher == null) {
addWebhookPublisher(webhook);
return;
}
// Update the previousPublisher
// Update the existing publisher
Status status = previousPublisher.getWebhook().getStatus();
previousPublisher.updateWebhook(webhook);
if (status != Status.SUCCESS && status != Status.AWAITING_RETRY) {
// Restart the publisher
// Restart the previously stopped publisher (in states notStarted, error, retryLimitReached)
BatchEventProcessor<ChangeEventHolder> processor = EventPubSub.addEventHandler(previousPublisher);
previousPublisher.setProcessor(processor);
LOG.info("Webhook publisher restarted for {}", webhook.getName());
}
} else {
// Remove the webhook that may be enabled currently
// Remove the webhook publisher
deleteWebhookPublisher(webhook.getId());
}
}

View File

@ -155,10 +155,10 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
@BeforeAll
public static void setup(TestInfo test) throws URISyntaxException, IOException {
webhookCallbackResource.clearAllEvents();
webhookCallbackResource.clearEvents();
WebhookResourceTest webhookResourceTest = new WebhookResourceTest();
webhookResourceTest.startWebhookSubscription();
webhookResourceTest.createWebhooks();
UserResourceTest userResourceTest = new UserResourceTest();
USER1 = UserResourceTest.createUser(userResourceTest.create(test), authHeaders("test@open-metadata.org"));
USER_OWNER1 = new EntityReference().withId(USER1.getId()).withType("user");
@ -244,10 +244,7 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
@AfterAll
public static void afterAllTests() throws Exception {
// EventPubSub.shutdown();
// Ensure webhooks are in the right state
new WebhookResourceTest().validateWebhookEvents();
// APP.getEnvironment().getApplicationContext().getServer().stop();
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -1000,7 +997,11 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
changeEvents = getChangeEvents(null, null, null, updateTime, authHeaders);
}
assertTrue(changeEvents.getData().size() > 0);
// Wait for change event to be recorded
if (changeEvents.getData().size() == 0) {
continue;
}
for (ChangeEvent event : changeEvents.getData()) {
if (event.getDateTime().getTime() == updateTime.getTime()) {
changeEvent = event;

View File

@ -23,6 +23,17 @@ public class WebhookCallbackResource {
private final ConcurrentLinkedQueue<ChangeEvent> changeEvents = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<ChangeEvent> changeEventsSlowServer = new ConcurrentLinkedQueue<>();
/** Webhook endpoint that immediately responds to callback. The events received are collected in a queue */
@POST
public Response receiveEvent(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
changeEvents.addAll(events.getData());
return Response.ok().build();
}
public ConcurrentLinkedQueue<ChangeEvent> getEvents() { return changeEvents; }
public void clearEvents() { changeEvents.clear(); }
/** Webhook endpoint that immediately responds to callback. This only counts the number of events received */
@POST
@Path("/counter")
public Response receiveEventCount(
@ -30,7 +41,9 @@ public class WebhookCallbackResource {
counter.incrementAndGet();
return Response.ok().build();
}
public int getCount() { return counter.get(); }
/** Webhook endpoint that immediately responds to callback. The events received are ignored */
@POST
@Path("/ignore")
public Response receiveEventIgnore(
@ -38,38 +51,36 @@ public class WebhookCallbackResource {
return Response.ok().build();
}
@POST
public Response receiveEvent(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
changeEvents.addAll(events.getData());
return Response.ok().build();
}
/** Webhook endpoint that responds to callback with 1 second delay. The events received are collected in a queue */
@POST
@Path("/slowServer")
public Response receiveEventWithDelay(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
changeEventsSlowServer.addAll(events.getData());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
changeEventsSlowServer.addAll(events.getData());
return Response.ok().build();
}
public ConcurrentLinkedQueue<ChangeEvent> getEventsSlowServer() { return changeEventsSlowServer; }
public void clearEventsSlowServer() { changeEventsSlowServer.clear(); }
/** Webhook endpoint that responds to callback with 15 seconds delay. The events received are collected in a queue */
@POST
@Path("/timeout")
public Response receiveEventWithTimeout(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
try {
Thread.sleep(11 * 1000);
Thread.sleep(15 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Response.ok().build();
}
/** Webhook endpoint that responds to callback with 300 Moved Permanently response */
@POST
@Path("/300")
public Response receiveEvent300(
@ -77,6 +88,7 @@ public class WebhookCallbackResource {
return Response.status(Response.Status.MOVED_PERMANENTLY).build();
}
/** Webhook endpoint that responds to callback with 400 Bad request response */
@POST
@Path("/400")
public Response receiveEvent400(
@ -84,31 +96,11 @@ public class WebhookCallbackResource {
return Response.status(Response.Status.BAD_REQUEST).build();
}
/** Webhook endpoint that responds to callback with 500 Internal server error response */
@POST
@Path("/500")
public Response receiveEvent500(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
public ConcurrentLinkedQueue<ChangeEvent> getEvents() {
return changeEvents;
}
public ConcurrentLinkedQueue<ChangeEvent> getEventsSlowServer() {
return changeEventsSlowServer;
}
public void clearAllEvents() {
changeEvents.clear();
changeEventsSlowServer.clear();
}
public int getCount() {
return counter.get();
}
public void resetCount() {
counter.set(0);
}
}

View File

@ -13,19 +13,8 @@
package org.openmetadata.catalog.resources.events;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.ws.rs.core.Response;
import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@ -43,6 +32,20 @@ import org.openmetadata.catalog.type.Webhook.Status;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.TestUtils;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
public class WebhookResourceTest extends EntityResourceTest<Webhook> {
public static List<EventFilter> ALL_EVENTS_FILTER = new ArrayList<>();
@ -57,11 +60,6 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
supportsPatch = false;
}
@BeforeAll
public static void setup(TestInfo test) throws IOException, URISyntaxException {
EntityResourceTest.setup(test);
}
@Test
public void post_webhookEnabledStateChange() throws URISyntaxException, IOException, InterruptedException {
// Disabled webhook will not start webhook publisher
@ -151,89 +149,92 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
@Override
public void assertFieldChange(String fieldName, Object expected, Object actual) throws IOException {}
public void createWebhooks() throws IOException, URISyntaxException {
/**
* Before a test for every entity resource, create a webhook subscription. At the end of the test, ensure all events
* are delivered over web subscription comparing it with number of events stored in the system.
*/
public void startWebhookSubscription() throws IOException, URISyntaxException {
// Valid webhook callback
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
CreateWebhook createWebhook =
createRequest("validWebhook", "validWebhook", "", null).withEndPoint(URI.create(baseUri));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback that responds slowly with 5 seconds delay
createWebhook.withName("slowServer").withEndPoint(URI.create(baseUri + "/slowServer"));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback that responds slowly with after 12 seconds (beyond connection + read response timeout)
createWebhook.withName("callbackTimeout").withEndPoint(URI.create(baseUri + "/timeout"));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback that responds with 300 error
createWebhook.withName("callbackResponse300").withEndPoint(URI.create(baseUri + "/300"));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback that responds with 400 error
createWebhook.withName("callbackResponse400").withEndPoint(URI.create(baseUri + "/400"));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback that responds with 400 error
createWebhook.withName("callbackResponse500").withEndPoint(URI.create(baseUri + "/500"));
createEntity(createWebhook, adminAuthHeaders());
// Webhook callback with invalid endpoint URI
createWebhook.withName("invalidEndpoint").withEndPoint(URI.create("http://invalidUnknownHost"));
createEntity(createWebhook, adminAuthHeaders());
}
/**
* At the end of the test, ensure all events are delivered over web subscription comparing it with number of events
* stored in the system.
*/
public void validateWebhookEvents() throws HttpResponseException, InterruptedException {
// Check the healthy callback server received all the change events
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = webhookCallbackResource.getEvents();
List<ChangeEvent> actualEvents =
getChangeEvents(null, null, null, callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData();
int iteration = 0;
while (callbackEvents.size() < actualEvents.size() && iteration < 100) {
Thread.sleep(10);
while (callbackEvents.size() < actualEvents.size() && iteration < 10) {
Thread.sleep(100);
iteration++;
}
assertEquals(actualEvents.size(), callbackEvents.size());
assertWebhookStatusSuccess("validWebhook");
}
// TODO enable this test
// Check the slow callback server received all the change events
callbackEvents = webhookCallbackResource.getEventsSlowServer();
actualEvents = getChangeEvents(null, null, null, callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData();
iteration = 0;
while (callbackEvents.size() < actualEvents.size() - 1 && iteration < 300) {
Thread.sleep(10);
@Test
public void testDifferentTypesOfWebhooks() throws IOException, InterruptedException, URISyntaxException {
Thread.sleep(1000);
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
// Create multiple webhooks each with different type of response to callback
createWebhook("slowServer", baseUri + "/slowServer"); // Callback response 1 second slower
createWebhook("callbackTimeout", baseUri + "/timeout"); // Callback response 12 seconds slower
createWebhook("callbackResponse300", baseUri + "/300"); // 3xx response
createWebhook("callbackResponse400", baseUri + "/400"); // 4xx response
createWebhook("callbackResponse500", baseUri + "/500"); // 5xx response
createWebhook("invalidEndpoint", "http://invalidUnknownHost"); // Invalid URL
// Now check state of webhooks created
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = webhookCallbackResource.getEventsSlowServer();
int iteration = 0;
while(callbackEvents.size() == 0 && iteration < 100) {
Thread.sleep(100);
iteration++;
}
assertEquals(actualEvents.size() - 1, callbackEvents.size());
webhookCallbackResource.clearAllEvents();
assertNotNull(callbackEvents.peek());
List<ChangeEvent> actualEvents =
getChangeEvents(null, null, null, callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData();
iteration = 0;
while (callbackEvents.size() < actualEvents.size() - 1 && iteration < 30) {
Thread.sleep(100);
iteration++;
}
assertEquals(actualEvents.size(), callbackEvents.size());
webhookCallbackResource.clearEventsSlowServer();
// Check all webhook status
Webhook webhook = getEntityByName("validWebhook", "", adminAuthHeaders());
assertWebhookStatusSuccess("slowServer");
assertWebhookStatus("callbackResponse300", Status.ERROR, 301, "Moved Permanently");
assertWebhookStatus("callbackResponse400", Status.AWAITING_RETRY, 400, "Bad Request");
assertWebhookStatus("callbackResponse500", Status.AWAITING_RETRY, 500, "Internal Server Error");
assertWebhookStatus("invalidEndpoint", Status.ERROR, null, "UnknownHostException");
}
public void createWebhook(String name, String uri) throws URISyntaxException, IOException {
CreateWebhook createWebhook = createRequest(name, "", "", null).withEndPoint(URI.create(uri));
createAndCheckEntity(createWebhook, adminAuthHeaders());
}
public void assertWebhookStatusSuccess(String name) throws HttpResponseException {
Webhook webhook = getEntityByName(name, "", adminAuthHeaders());
assertEquals(Status.SUCCESS, webhook.getStatus());
assertNull(webhook.getFailureDetails());
}
webhook = getEntityByName("slowServer", "", adminAuthHeaders());
assertEquals(Status.SUCCESS, webhook.getStatus());
assertNull(webhook.getFailureDetails());
webhook = getEntityByName("callbackResponse300", "", adminAuthHeaders());
assertEquals(Status.ERROR, webhook.getStatus());
assertEquals(301, webhook.getFailureDetails().getLastFailedStatusCode());
assertEquals("Moved Permanently", webhook.getFailureDetails().getLastFailedReason());
webhook = getEntityByName("callbackResponse400", "", adminAuthHeaders());
assertEquals(Status.AWAITING_RETRY, webhook.getStatus());
assertEquals(400, webhook.getFailureDetails().getLastFailedStatusCode());
assertEquals("Bad Request", webhook.getFailureDetails().getLastFailedReason());
webhook = getEntityByName("callbackResponse500", "", adminAuthHeaders());
assertEquals(Status.AWAITING_RETRY, webhook.getStatus());
assertEquals(500, webhook.getFailureDetails().getLastFailedStatusCode());
assertEquals("Internal Server Error", webhook.getFailureDetails().getLastFailedReason());
webhook = getEntityByName("invalidEndpoint", "", adminAuthHeaders());
assertEquals(Status.ERROR, webhook.getStatus());
assertNull(webhook.getFailureDetails().getLastFailedStatusCode());
assertEquals("UnknownHostException", webhook.getFailureDetails().getLastFailedReason());
public void assertWebhookStatus(String name, Status status, Integer statusCode, String failedReason)
throws HttpResponseException {
Webhook webhook = getEntityByName(name, "", adminAuthHeaders());
assertEquals(status, webhook.getStatus());
assertEquals(statusCode, webhook.getFailureDetails().getLastFailedStatusCode());
assertEquals(failedReason, webhook.getFailureDetails().getLastFailedReason());
}
}