Fixes #2027 - Fix intermittent Webhook test failures (#2030)

This commit is contained in:
Suresh Srinivas 2022-01-04 09:57:11 -08:00 committed by GitHub
parent 36d518fa2d
commit d2473c8ca0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 128 deletions

View File

@ -168,10 +168,9 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
@BeforeAll
public void setup(TestInfo test) throws URISyntaxException, IOException {
webhookCallbackResource.clearEvents();
webhookCallbackResource.clearEntityCallbackCount();
WebhookResourceTest webhookResourceTest = new WebhookResourceTest();
webhookResourceTest.startWebhookSubscription();
new WebhookResourceTest().startWebhookEntitySubscriptions(entityName);
webhookResourceTest.startWebhookEntitySubscriptions(entityName);
UserResourceTest userResourceTest = new UserResourceTest();
USER1 = UserResourceTest.createUser(userResourceTest.create(test), authHeaders("test@open-metadata.org"));
@ -1147,8 +1146,6 @@ public abstract class EntityResourceTest<T> extends CatalogApplicationTest {
throws IOException {
expectedList.sort(EntityUtil.compareFieldChange);
actualList.sort(EntityUtil.compareFieldChange);
System.out.println("XXX " + expectedList);
System.out.println("XXX " + actualList);
assertEquals(expectedList.size(), actualList.size());
for (int i = 0; i < expectedList.size(); i++) {

View File

@ -1,13 +1,11 @@
package org.openmetadata.catalog.resources.events;
import io.swagger.annotations.Api;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@ -18,109 +16,50 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** REST resource used for webhook callback tests. */
@Path("v1/test/webhook")
@Api(value = "Topic data asset collection", tags = "Topic data asset collection")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class WebhookCallbackResource {
public static final Logger LOG = LoggerFactory.getLogger(WebhookCallbackResource.class);
private final AtomicInteger counter = new AtomicInteger();
private volatile long counterStartTime;
private volatile long counterLatestTime;
private final ConcurrentLinkedQueue<ChangeEvent> changeEvents = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<ChangeEvent> changeEventsSlowServer = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, EventDetails> eventMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<ChangeEvent>> entityCallbackMap = new ConcurrentHashMap<>();
/** Webhook endpoint that immediately responds to callback. The events received are collected in a queue */
/**
* Webhook endpoint that immediately responds to callback. The events received are collected in a queue per testName
*/
@POST
public Response receiveEvent(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
changeEvents.addAll(events.getData());
return Response.ok().build();
}
public ConcurrentLinkedQueue<ChangeEvent> getEvents() {
return changeEvents;
}
public void clearEvents() {
changeEvents.clear();
}
/** Webhook endpoint that immediately responds to callback. This only counts the number of events received */
@POST
@Path("/counter")
@Path("/{testName}")
public Response receiveEventCount(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
if (counter.get() == 0) {
counterStartTime = events.getData().get(0).getDateTime().getTime();
}
counterLatestTime = events.getData().get(0).getDateTime().getTime();
counter.incrementAndGet();
LOG.info("callback /counter received event. Current count {}", counter.get());
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@PathParam("testName") String testName,
EventResource.ChangeEventList events) {
addEventDetails(testName, events);
return Response.ok().build();
}
public int getCount() {
return counter.get();
}
public void resetCount() {
counter.set(0);
}
public long getCounterStartTime() {
return counterStartTime;
}
public long getCounterLatestTime() {
return counterLatestTime;
}
/** Webhook endpoint that immediately responds to callback. The events received are ignored */
/** Webhook endpoint that responds to callback with 1 seconds delay. The events received are collected in a queue */
@POST
@Path("/ignore")
public Response receiveEventIgnore(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
LOG.info("callback /ignore received event. Current count {}", counter.get());
return Response.ok().build();
}
/** Webhook endpoint that responds to callback with 1 second delay. The events received are collected in a queue */
@POST
@Path("/slowServer")
@Path("/simulate/slowServer")
public Response receiveEventWithDelay(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
changeEventsSlowServer.addAll(events.getData());
LOG.info("callback /slowServer received event. Current count {}", changeEventsSlowServer.size());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
addEventDetails("simulate-slowServer", events);
return Response.ok().build();
}
public ConcurrentLinkedQueue<ChangeEvent> getEventsSlowServer() {
return changeEventsSlowServer;
}
public void clearEventsSlowServer() {
changeEventsSlowServer.clear();
}
/** Webhook endpoint that responds to callback with 15 seconds delay. The events received are collected in a queue */
@POST
@Path("/timeout")
@Path("/simulate/timeout")
public Response receiveEventWithTimeout(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
LOG.info("callback /timeout received event");
addEventDetails("simulate-timeout", events);
try {
Thread.sleep(15 * 1000);
} catch (InterruptedException e) {
@ -131,34 +70,34 @@ public class WebhookCallbackResource {
/** Webhook endpoint that responds to callback with 300 Moved Permanently response */
@POST
@Path("/300")
@Path("/simulate/300")
public Response receiveEvent300(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
LOG.info("callback /300 received event");
addEventDetails("simulate-300", events);
return Response.status(Response.Status.MOVED_PERMANENTLY).build();
}
/** Webhook endpoint that responds to callback with 400 Bad request response */
@POST
@Path("/400")
@Path("/simulate/400")
public Response receiveEvent400(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
LOG.info("callback /400 received event");
addEventDetails("simulate-400", events);
return Response.status(Response.Status.BAD_REQUEST).build();
}
/** Webhook endpoint that responds to callback with 500 Internal server error response */
@POST
@Path("/500")
@Path("/simulate/500")
public Response receiveEvent500(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
LOG.info("callback /400 received event");
addEventDetails("simulate-500", events);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
/** Webhook endpoint that receives change events various combination of event types and entity filters */
/** Webhook endpoint that receives change events for various combination of event types and entity filters */
@POST
@Path("/{eventType}/{entityType}")
@Path("/filterBased/{eventType}/{entityType}")
public Response receiveEntityEvents(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@ -177,11 +116,55 @@ public class WebhookCallbackResource {
return Response.ok().build();
}
public void clearEntityCallbackCount() {
entityCallbackMap.clear();
}
public List<ChangeEvent> getEntityCallbackEvents(EventType eventType, String entity) {
return Optional.ofNullable(entityCallbackMap.get(eventType + ":" + entity)).orElse(Collections.emptyList());
}
public EventDetails getEventDetails(String endpoint) {
return eventMap.get(endpoint);
}
private void addEventDetails(String endpoint, ChangeEventList events) {
EventDetails details = eventMap.get(endpoint); // Default endpoint
if (details == null) {
details = new EventDetails();
details.setFirstEventTime(events.getData().get(0).getDateTime().getTime());
eventMap.put(endpoint, details);
}
details.getEvents().addAll(events.getData());
details.setLatestEventTime(events.getData().get(events.getData().size() - 1).getDateTime().getTime());
LOG.info("Event received {}, total count {}", endpoint, details.getEvents().size());
}
public void clearEvents() {
eventMap.clear();
entityCallbackMap.clear();
}
/** Class to keep track of all the events received by a webhook endpoint */
static class EventDetails {
long firstEventTime;
long latestEventTime;
ConcurrentLinkedQueue<ChangeEvent> events = new ConcurrentLinkedQueue<>();
public long getFirstEventTime() {
return firstEventTime;
}
public void setFirstEventTime(long firstEventTime) {
this.firstEventTime = firstEventTime;
}
public long getLatestEventTime() {
return latestEventTime;
}
public void setLatestEventTime(long latestEventTime) {
this.latestEventTime = latestEventTime;
}
public ConcurrentLinkedQueue<ChangeEvent> getEvents() {
return events;
}
}
}

View File

@ -30,10 +30,12 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import javax.ws.rs.core.Response;
import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.events.CreateWebhook;
import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookEntityInterface;
import org.openmetadata.catalog.resources.EntityResourceTest;
import org.openmetadata.catalog.resources.events.WebhookCallbackResource.EventDetails;
import org.openmetadata.catalog.resources.events.WebhookResource.WebhookList;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
@ -64,25 +66,26 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
}
@Test
public void post_webhookEnabledStateChange() throws URISyntaxException, IOException, InterruptedException {
public void post_webhookEnabledStateChange(TestInfo test)
throws URISyntaxException, IOException, InterruptedException {
//
// Create webhook in disabled state. It will not start webhook publisher
//
String webhookName = getEntityName(test);
LOG.info("creating webhook in disabled state");
webhookCallbackResource.resetCount();
String uri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter";
CreateWebhook create = createRequest("counter", "", "", null).withEnabled(false).withEndpoint(URI.create(uri));
String uri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/" + webhookName;
CreateWebhook create = createRequest(webhookName, "", "", null).withEnabled(false).withEndpoint(URI.create(uri));
Webhook webhook = createAndCheckEntity(create, adminAuthHeaders());
assertEquals(Status.NOT_STARTED, webhook.getStatus());
Webhook getWebhook = getEntity(webhook.getId(), adminAuthHeaders());
assertEquals(Status.NOT_STARTED, getWebhook.getStatus());
assertEquals(0, webhookCallbackResource.getCount());
EventDetails details = webhookCallbackResource.getEventDetails(webhookName);
assertNull(details);
//
// Now enable the webhook
//
LOG.info("Enabling webhook");
int counter = webhookCallbackResource.getCount();
ChangeDescription change = getChangeDescription(webhook.getVersion());
change.getFieldsUpdated().add(new FieldChange().withName("enabled").withOldValue(false).withNewValue(true));
change
@ -97,13 +100,9 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
assertEquals(Status.STARTED, getWebhook.getStatus());
// Ensure the call back notification has started
int iterations = 0;
while (webhookCallbackResource.getCount() <= counter && iterations < 100) {
Thread.sleep(10);
iterations++;
}
assertEquals(counter + 1, webhookCallbackResource.getCount());
long lastSuccessfulEventTime = webhookCallbackResource.getCounterLatestTime();
details = waitForFirstEvent(webhookName, 25, 100);
assertEquals(1, details.getEvents().size());
long lastSuccessfulEventTime = details.getLatestEventTime();
FailureDetails failureDetails = new FailureDetails().withLastSuccessfulAt(lastSuccessfulEventTime);
//
@ -127,21 +126,21 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
// Disabled webhook state also records last successful time when event was sent
getWebhook = getEntity(webhook.getId(), adminAuthHeaders());
assertEquals(Status.NOT_STARTED, getWebhook.getStatus());
assertEquals(webhookCallbackResource.getCounterStartTime(), getWebhook.getFailureDetails().getLastSuccessfulAt());
assertEquals(details.getFirstEventTime(), getWebhook.getFailureDetails().getLastSuccessfulAt());
// Ensure callback back notification is disabled with no new events
iterations = 0;
int iterations = 0;
while (iterations < 100) {
Thread.sleep(10);
iterations++;
assertEquals(counter + 1, webhookCallbackResource.getCount()); // Event counter remains the same
assertEquals(1, details.getEvents().size()); // Event counter remains the same
}
deleteEntity(webhook.getId(), adminAuthHeaders());
}
@Test
public void put_updateEndpointURL() throws URISyntaxException, IOException, InterruptedException {
public void put_updateEndpointURL(TestInfo test) throws URISyntaxException, IOException, InterruptedException {
CreateWebhook create =
createRequest("counter", "", "", null).withEnabled(true).withEndpoint(URI.create("http://invalidUnknowHost"));
Webhook webhook = createAndCheckEntity(create, adminAuthHeaders());
@ -157,10 +156,9 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
iteration++;
}
assertEquals(Status.FAILED, getWebhook.getStatus());
FailureDetails failureDetails = getWebhook.getFailureDetails();
// Now change the webhook URL to a valid URL and ensure callbacks resume
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter";
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter/" + test.getDisplayName();
create = create.withEndpoint(URI.create(baseUri));
ChangeDescription change = getChangeDescription(getWebhook.getVersion());
change
@ -235,13 +233,13 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
*/
public void startWebhookSubscription() throws IOException, URISyntaxException {
// Valid webhook callback
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
createWebhook("validWebhook", baseUri);
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/healthy";
createWebhook("healthy", baseUri);
}
/** Start webhook subscription for given entity and various event types */
public void startWebhookEntitySubscriptions(String entity) throws IOException, URISyntaxException {
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/filterBased";
// Create webhook with endpoint api/v1/test/webhook/entityCreated/<entity> to receive entityCreated events
String name = EventType.ENTITY_CREATED + ":" + entity;
@ -265,11 +263,15 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
*/
public void validateWebhookEvents() throws HttpResponseException, InterruptedException {
// Check the healthy callback server received all the change events
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = webhookCallbackResource.getEvents();
EventDetails details = webhookCallbackResource.getEventDetails("healthy");
assertNotNull(details);
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = details.getEvents();
assertNotNull(callbackEvents);
assertNotNull(callbackEvents.peek());
List<ChangeEvent> actualEvents =
getChangeEvents("*", "*", "*", callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData();
waitAndCheckForEvents(callbackEvents, actualEvents, 10, 100);
assertWebhookStatusSuccess("validWebhook");
assertWebhookStatusSuccess("healthy");
}
/** At the end of the test, ensure all events are delivered for the combination of entity and eventTypes */
@ -297,22 +299,23 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
// Create multiple webhooks each with different type of response to callback
Webhook w1 = createWebhook("slowServer", baseUri + "/slowServer"); // Callback response 1 second slower
Webhook w2 = createWebhook("callbackTimeout", baseUri + "/timeout"); // Callback response 12 seconds slower
Webhook w3 = createWebhook("callbackResponse300", baseUri + "/300"); // 3xx response
Webhook w4 = createWebhook("callbackResponse400", baseUri + "/400"); // 4xx response
Webhook w5 = createWebhook("callbackResponse500", baseUri + "/500"); // 5xx response
Webhook w1 = createWebhook("slowServer", baseUri + "/simulate/slowServer"); // Callback response 1 second slower
Webhook w2 = createWebhook("callbackTimeout", baseUri + "/simulate/timeout"); // Callback response 12 seconds slower
Webhook w3 = createWebhook("callbackResponse300", baseUri + "/simulate/300"); // 3xx response
Webhook w4 = createWebhook("callbackResponse400", baseUri + "/simulate/400"); // 4xx response
Webhook w5 = createWebhook("callbackResponse500", baseUri + "/simulate/500"); // 5xx response
Webhook w6 = createWebhook("invalidEndpoint", "http://invalidUnknownHost"); // Invalid URL
Thread.sleep(1000);
// Now check state of webhooks created
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = webhookCallbackResource.getEventsSlowServer();
waitForFirstEvent(callbackEvents, 100, 100);
EventDetails details = waitForFirstEvent("simulate-slowServer", 25, 100);
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = details.getEvents();
assertNotNull(callbackEvents.peek());
List<ChangeEvent> actualEvents =
getChangeEvents("*", "*", "*", callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData();
waitAndCheckForEvents(callbackEvents, actualEvents, 30, 100);
webhookCallbackResource.clearEventsSlowServer();
// Check all webhook status
assertWebhookStatusSuccess("slowServer");
@ -384,11 +387,15 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
assertEquals(expected.size(), received.size());
}
public void waitForFirstEvent(Collection c1, int iteration, long sleepMillis) throws InterruptedException {
public EventDetails waitForFirstEvent(String endpoint, int iteration, long sleepMillis) throws InterruptedException {
EventDetails details = webhookCallbackResource.getEventDetails(endpoint);
int i = 0;
while (c1.size() > 0 && i < iteration) {
while ((details == null || details.getEvents() == null || details.getEvents().size() <= 0) && i < iteration) {
details = webhookCallbackResource.getEventDetails(endpoint);
Thread.sleep(sleepMillis);
i++;
}
LOG.info("Returning for endpoint {} eventDetails {}", endpoint, details);
return details;
}
}