diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java index 4fc302ee8db..055218c2042 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/EntityResourceTest.java @@ -168,10 +168,9 @@ public abstract class EntityResourceTest extends CatalogApplicationTest { @BeforeAll public void setup(TestInfo test) throws URISyntaxException, IOException { webhookCallbackResource.clearEvents(); - webhookCallbackResource.clearEntityCallbackCount(); WebhookResourceTest webhookResourceTest = new WebhookResourceTest(); webhookResourceTest.startWebhookSubscription(); - new WebhookResourceTest().startWebhookEntitySubscriptions(entityName); + webhookResourceTest.startWebhookEntitySubscriptions(entityName); UserResourceTest userResourceTest = new UserResourceTest(); USER1 = UserResourceTest.createUser(userResourceTest.create(test), authHeaders("test@open-metadata.org")); @@ -1147,8 +1146,6 @@ public abstract class EntityResourceTest extends CatalogApplicationTest { throws IOException { expectedList.sort(EntityUtil.compareFieldChange); actualList.sort(EntityUtil.compareFieldChange); - System.out.println("XXX " + expectedList); - System.out.println("XXX " + actualList); assertEquals(expectedList.size(), actualList.size()); for (int i = 0; i < expectedList.size(); i++) { diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java index 829b1bd6421..940df518a8e 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java @@ -1,13 +1,11 @@ package org.openmetadata.catalog.resources.events; -import io.swagger.annotations.Api; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -18,109 +16,50 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; import javax.ws.rs.core.UriInfo; +import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; import org.openmetadata.catalog.type.ChangeEvent; import org.openmetadata.catalog.type.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** REST resource used for webhook callback tests. */ @Path("v1/test/webhook") -@Api(value = "Topic data asset collection", tags = "Topic data asset collection") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) public class WebhookCallbackResource { public static final Logger LOG = LoggerFactory.getLogger(WebhookCallbackResource.class); - private final AtomicInteger counter = new AtomicInteger(); - private volatile long counterStartTime; - private volatile long counterLatestTime; - private final ConcurrentLinkedQueue changeEvents = new ConcurrentLinkedQueue<>(); - private final ConcurrentLinkedQueue changeEventsSlowServer = new ConcurrentLinkedQueue<>(); - + private final ConcurrentHashMap eventMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap> entityCallbackMap = new ConcurrentHashMap<>(); - /** Webhook endpoint that immediately responds to callback. The events received are collected in a queue */ + /** + * Webhook endpoint that immediately responds to callback. The events received are collected in a queue per testName + */ @POST - public Response receiveEvent( - @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { - changeEvents.addAll(events.getData()); - return Response.ok().build(); - } - - public ConcurrentLinkedQueue getEvents() { - return changeEvents; - } - - public void clearEvents() { - changeEvents.clear(); - } - - /** Webhook endpoint that immediately responds to callback. This only counts the number of events received */ - @POST - @Path("/counter") + @Path("/{testName}") public Response receiveEventCount( - @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { - if (counter.get() == 0) { - counterStartTime = events.getData().get(0).getDateTime().getTime(); - } - counterLatestTime = events.getData().get(0).getDateTime().getTime(); - counter.incrementAndGet(); - LOG.info("callback /counter received event. Current count {}", counter.get()); + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @PathParam("testName") String testName, + EventResource.ChangeEventList events) { + addEventDetails(testName, events); return Response.ok().build(); } - public int getCount() { - return counter.get(); - } - - public void resetCount() { - counter.set(0); - } - - public long getCounterStartTime() { - return counterStartTime; - } - - public long getCounterLatestTime() { - return counterLatestTime; - } - - /** Webhook endpoint that immediately responds to callback. The events received are ignored */ + /** Webhook endpoint that responds to callback with 1 seconds delay. The events received are collected in a queue */ @POST - @Path("/ignore") - public Response receiveEventIgnore( - @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { - LOG.info("callback /ignore received event. Current count {}", counter.get()); - return Response.ok().build(); - } - - /** Webhook endpoint that responds to callback with 1 second delay. The events received are collected in a queue */ - @POST - @Path("/slowServer") + @Path("/simulate/slowServer") public Response receiveEventWithDelay( @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { - changeEventsSlowServer.addAll(events.getData()); - LOG.info("callback /slowServer received event. Current count {}", changeEventsSlowServer.size()); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + addEventDetails("simulate-slowServer", events); return Response.ok().build(); } - public ConcurrentLinkedQueue getEventsSlowServer() { - return changeEventsSlowServer; - } - - public void clearEventsSlowServer() { - changeEventsSlowServer.clear(); - } - /** Webhook endpoint that responds to callback with 15 seconds delay. The events received are collected in a queue */ @POST - @Path("/timeout") + @Path("/simulate/timeout") public Response receiveEventWithTimeout( @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { - LOG.info("callback /timeout received event"); + addEventDetails("simulate-timeout", events); try { Thread.sleep(15 * 1000); } catch (InterruptedException e) { @@ -131,34 +70,34 @@ public class WebhookCallbackResource { /** Webhook endpoint that responds to callback with 300 Moved Permanently response */ @POST - @Path("/300") + @Path("/simulate/300") public Response receiveEvent300( @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { - LOG.info("callback /300 received event"); + addEventDetails("simulate-300", events); return Response.status(Response.Status.MOVED_PERMANENTLY).build(); } /** Webhook endpoint that responds to callback with 400 Bad request response */ @POST - @Path("/400") + @Path("/simulate/400") public Response receiveEvent400( @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { - LOG.info("callback /400 received event"); + addEventDetails("simulate-400", events); return Response.status(Response.Status.BAD_REQUEST).build(); } /** Webhook endpoint that responds to callback with 500 Internal server error response */ @POST - @Path("/500") + @Path("/simulate/500") public Response receiveEvent500( @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { - LOG.info("callback /400 received event"); + addEventDetails("simulate-500", events); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } - /** Webhook endpoint that receives change events various combination of event types and entity filters */ + /** Webhook endpoint that receives change events for various combination of event types and entity filters */ @POST - @Path("/{eventType}/{entityType}") + @Path("/filterBased/{eventType}/{entityType}") public Response receiveEntityEvents( @Context UriInfo uriInfo, @Context SecurityContext securityContext, @@ -177,11 +116,55 @@ public class WebhookCallbackResource { return Response.ok().build(); } - public void clearEntityCallbackCount() { - entityCallbackMap.clear(); - } - public List getEntityCallbackEvents(EventType eventType, String entity) { return Optional.ofNullable(entityCallbackMap.get(eventType + ":" + entity)).orElse(Collections.emptyList()); } + + public EventDetails getEventDetails(String endpoint) { + return eventMap.get(endpoint); + } + + private void addEventDetails(String endpoint, ChangeEventList events) { + EventDetails details = eventMap.get(endpoint); // Default endpoint + if (details == null) { + details = new EventDetails(); + details.setFirstEventTime(events.getData().get(0).getDateTime().getTime()); + eventMap.put(endpoint, details); + } + details.getEvents().addAll(events.getData()); + details.setLatestEventTime(events.getData().get(events.getData().size() - 1).getDateTime().getTime()); + LOG.info("Event received {}, total count {}", endpoint, details.getEvents().size()); + } + + public void clearEvents() { + eventMap.clear(); + entityCallbackMap.clear(); + } + + /** Class to keep track of all the events received by a webhook endpoint */ + static class EventDetails { + long firstEventTime; + long latestEventTime; + ConcurrentLinkedQueue events = new ConcurrentLinkedQueue<>(); + + public long getFirstEventTime() { + return firstEventTime; + } + + public void setFirstEventTime(long firstEventTime) { + this.firstEventTime = firstEventTime; + } + + public long getLatestEventTime() { + return latestEventTime; + } + + public void setLatestEventTime(long latestEventTime) { + this.latestEventTime = latestEventTime; + } + + public ConcurrentLinkedQueue getEvents() { + return events; + } + } } diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java index 31e13ab4047..253f1f8a1ed 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java @@ -30,10 +30,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; import javax.ws.rs.core.Response; import org.apache.http.client.HttpResponseException; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.api.events.CreateWebhook; import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookEntityInterface; import org.openmetadata.catalog.resources.EntityResourceTest; +import org.openmetadata.catalog.resources.events.WebhookCallbackResource.EventDetails; import org.openmetadata.catalog.resources.events.WebhookResource.WebhookList; import org.openmetadata.catalog.type.ChangeDescription; import org.openmetadata.catalog.type.ChangeEvent; @@ -64,25 +66,26 @@ public class WebhookResourceTest extends EntityResourceTest { } @Test - public void post_webhookEnabledStateChange() throws URISyntaxException, IOException, InterruptedException { + public void post_webhookEnabledStateChange(TestInfo test) + throws URISyntaxException, IOException, InterruptedException { // // Create webhook in disabled state. It will not start webhook publisher // + String webhookName = getEntityName(test); LOG.info("creating webhook in disabled state"); - webhookCallbackResource.resetCount(); - String uri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter"; - CreateWebhook create = createRequest("counter", "", "", null).withEnabled(false).withEndpoint(URI.create(uri)); + String uri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/" + webhookName; + CreateWebhook create = createRequest(webhookName, "", "", null).withEnabled(false).withEndpoint(URI.create(uri)); Webhook webhook = createAndCheckEntity(create, adminAuthHeaders()); assertEquals(Status.NOT_STARTED, webhook.getStatus()); Webhook getWebhook = getEntity(webhook.getId(), adminAuthHeaders()); assertEquals(Status.NOT_STARTED, getWebhook.getStatus()); - assertEquals(0, webhookCallbackResource.getCount()); + EventDetails details = webhookCallbackResource.getEventDetails(webhookName); + assertNull(details); // // Now enable the webhook // LOG.info("Enabling webhook"); - int counter = webhookCallbackResource.getCount(); ChangeDescription change = getChangeDescription(webhook.getVersion()); change.getFieldsUpdated().add(new FieldChange().withName("enabled").withOldValue(false).withNewValue(true)); change @@ -97,13 +100,9 @@ public class WebhookResourceTest extends EntityResourceTest { assertEquals(Status.STARTED, getWebhook.getStatus()); // Ensure the call back notification has started - int iterations = 0; - while (webhookCallbackResource.getCount() <= counter && iterations < 100) { - Thread.sleep(10); - iterations++; - } - assertEquals(counter + 1, webhookCallbackResource.getCount()); - long lastSuccessfulEventTime = webhookCallbackResource.getCounterLatestTime(); + details = waitForFirstEvent(webhookName, 25, 100); + assertEquals(1, details.getEvents().size()); + long lastSuccessfulEventTime = details.getLatestEventTime(); FailureDetails failureDetails = new FailureDetails().withLastSuccessfulAt(lastSuccessfulEventTime); // @@ -127,21 +126,21 @@ public class WebhookResourceTest extends EntityResourceTest { // Disabled webhook state also records last successful time when event was sent getWebhook = getEntity(webhook.getId(), adminAuthHeaders()); assertEquals(Status.NOT_STARTED, getWebhook.getStatus()); - assertEquals(webhookCallbackResource.getCounterStartTime(), getWebhook.getFailureDetails().getLastSuccessfulAt()); + assertEquals(details.getFirstEventTime(), getWebhook.getFailureDetails().getLastSuccessfulAt()); // Ensure callback back notification is disabled with no new events - iterations = 0; + int iterations = 0; while (iterations < 100) { Thread.sleep(10); iterations++; - assertEquals(counter + 1, webhookCallbackResource.getCount()); // Event counter remains the same + assertEquals(1, details.getEvents().size()); // Event counter remains the same } deleteEntity(webhook.getId(), adminAuthHeaders()); } @Test - public void put_updateEndpointURL() throws URISyntaxException, IOException, InterruptedException { + public void put_updateEndpointURL(TestInfo test) throws URISyntaxException, IOException, InterruptedException { CreateWebhook create = createRequest("counter", "", "", null).withEnabled(true).withEndpoint(URI.create("http://invalidUnknowHost")); Webhook webhook = createAndCheckEntity(create, adminAuthHeaders()); @@ -157,10 +156,9 @@ public class WebhookResourceTest extends EntityResourceTest { iteration++; } assertEquals(Status.FAILED, getWebhook.getStatus()); - FailureDetails failureDetails = getWebhook.getFailureDetails(); // Now change the webhook URL to a valid URL and ensure callbacks resume - String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter"; + String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter/" + test.getDisplayName(); create = create.withEndpoint(URI.create(baseUri)); ChangeDescription change = getChangeDescription(getWebhook.getVersion()); change @@ -235,13 +233,13 @@ public class WebhookResourceTest extends EntityResourceTest { */ public void startWebhookSubscription() throws IOException, URISyntaxException { // Valid webhook callback - String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook"; - createWebhook("validWebhook", baseUri); + String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/healthy"; + createWebhook("healthy", baseUri); } /** Start webhook subscription for given entity and various event types */ public void startWebhookEntitySubscriptions(String entity) throws IOException, URISyntaxException { - String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook"; + String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/filterBased"; // Create webhook with endpoint api/v1/test/webhook/entityCreated/ to receive entityCreated events String name = EventType.ENTITY_CREATED + ":" + entity; @@ -265,11 +263,15 @@ public class WebhookResourceTest extends EntityResourceTest { */ public void validateWebhookEvents() throws HttpResponseException, InterruptedException { // Check the healthy callback server received all the change events - ConcurrentLinkedQueue callbackEvents = webhookCallbackResource.getEvents(); + EventDetails details = webhookCallbackResource.getEventDetails("healthy"); + assertNotNull(details); + ConcurrentLinkedQueue callbackEvents = details.getEvents(); + assertNotNull(callbackEvents); + assertNotNull(callbackEvents.peek()); List actualEvents = getChangeEvents("*", "*", "*", callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData(); waitAndCheckForEvents(callbackEvents, actualEvents, 10, 100); - assertWebhookStatusSuccess("validWebhook"); + assertWebhookStatusSuccess("healthy"); } /** At the end of the test, ensure all events are delivered for the combination of entity and eventTypes */ @@ -297,22 +299,23 @@ public class WebhookResourceTest extends EntityResourceTest { String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook"; // Create multiple webhooks each with different type of response to callback - Webhook w1 = createWebhook("slowServer", baseUri + "/slowServer"); // Callback response 1 second slower - Webhook w2 = createWebhook("callbackTimeout", baseUri + "/timeout"); // Callback response 12 seconds slower - Webhook w3 = createWebhook("callbackResponse300", baseUri + "/300"); // 3xx response - Webhook w4 = createWebhook("callbackResponse400", baseUri + "/400"); // 4xx response - Webhook w5 = createWebhook("callbackResponse500", baseUri + "/500"); // 5xx response + Webhook w1 = createWebhook("slowServer", baseUri + "/simulate/slowServer"); // Callback response 1 second slower + Webhook w2 = createWebhook("callbackTimeout", baseUri + "/simulate/timeout"); // Callback response 12 seconds slower + Webhook w3 = createWebhook("callbackResponse300", baseUri + "/simulate/300"); // 3xx response + Webhook w4 = createWebhook("callbackResponse400", baseUri + "/simulate/400"); // 4xx response + Webhook w5 = createWebhook("callbackResponse500", baseUri + "/simulate/500"); // 5xx response Webhook w6 = createWebhook("invalidEndpoint", "http://invalidUnknownHost"); // Invalid URL + Thread.sleep(1000); + // Now check state of webhooks created - ConcurrentLinkedQueue callbackEvents = webhookCallbackResource.getEventsSlowServer(); - waitForFirstEvent(callbackEvents, 100, 100); + EventDetails details = waitForFirstEvent("simulate-slowServer", 25, 100); + ConcurrentLinkedQueue callbackEvents = details.getEvents(); assertNotNull(callbackEvents.peek()); List actualEvents = getChangeEvents("*", "*", "*", callbackEvents.peek().getDateTime(), adminAuthHeaders()).getData(); waitAndCheckForEvents(callbackEvents, actualEvents, 30, 100); - webhookCallbackResource.clearEventsSlowServer(); // Check all webhook status assertWebhookStatusSuccess("slowServer"); @@ -384,11 +387,15 @@ public class WebhookResourceTest extends EntityResourceTest { assertEquals(expected.size(), received.size()); } - public void waitForFirstEvent(Collection c1, int iteration, long sleepMillis) throws InterruptedException { + public EventDetails waitForFirstEvent(String endpoint, int iteration, long sleepMillis) throws InterruptedException { + EventDetails details = webhookCallbackResource.getEventDetails(endpoint); int i = 0; - while (c1.size() > 0 && i < iteration) { + while ((details == null || details.getEvents() == null || details.getEvents().size() <= 0) && i < iteration) { + details = webhookCallbackResource.getEventDetails(endpoint); Thread.sleep(sleepMillis); i++; } + LOG.info("Returning for endpoint {} eventDetails {}", endpoint, details); + return details; } }