diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPubSub.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPubSub.java index 39769618323..39ac9f0512f 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPubSub.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/EventPubSub.java @@ -98,6 +98,7 @@ public class EventPubSub { public static void removeProcessor(BatchEventProcessor processor) { ringBuffer.removeGatingSequence(processor.getSequence()); + LOG.info("Processor removed for {}", processor); } public void close() {} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java index aa10927ce9b..584774a387d 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/WebhookRepository.java @@ -154,7 +154,7 @@ public class WebhookRepository extends EntityRepository { } } - public static void deleteWebhookPublisher(UUID id) throws InterruptedException { + public void deleteWebhookPublisher(UUID id) throws InterruptedException { WebhookPublisher publisher = webhookPublisherMap.get(id); if (publisher != null) { publisher.getProcessor().halt(); @@ -330,13 +330,13 @@ public class WebhookRepository extends EntityRepository { @Override public void onStart() { - LOG.info("Webhook-lifecycle-onStart {}", webhook.getName()); createClient(); webhook.withFailureDetails(new FailureDetails()); // TODO clean this up Map authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org"); - target = SecurityUtil.addHeaders(client.target(webhook.getEndPoint()), authHeaders); + target = SecurityUtil.addHeaders(client.target(webhook.getEndpoint()), authHeaders); + LOG.info("Webhook-lifecycle-onStart {}", webhook.getName()); } @Override @@ -367,8 +367,8 @@ public class WebhookRepository extends EntityRepository { // 2xx response means call back is successful if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses batch.clear(); - webhook.getFailureDetails().setLastSuccessfulAt(changeEventHolder.get().getDateTime().getTime()); if (webhook.getStatus() != Status.SUCCESS) { + webhook.getFailureDetails().setLastSuccessfulAt(changeEventHolder.get().getDateTime().getTime()); setStatus(Status.SUCCESS, null, null, null, null); } // 3xx response/redirection is not allowed for callback. Set the webhook state as in error @@ -383,7 +383,7 @@ public class WebhookRepository extends EntityRepository { } catch (ProcessingException ex) { Throwable cause = ex.getCause(); if (cause.getClass() == UnknownHostException.class) { - LOG.warn("Invalid webhook {} endpoint {}", webhook.getName(), webhook.getEndPoint()); + LOG.warn("Invalid webhook {} endpoint {}", webhook.getName(), webhook.getEndpoint()); setErrorStatus(attemptTime, null, "UnknownHostException"); } } @@ -406,7 +406,7 @@ public class WebhookRepository extends EntityRepository { currentBackoffTime = BACKOFF_NORMAL; webhook.setTimeout(updatedWebhook.getTimeout()); webhook.setBatchSize(updatedWebhook.getBatchSize()); - webhook.setEndPoint(updatedWebhook.getEndPoint()); + webhook.setEndpoint(updatedWebhook.getEndpoint()); webhook.setEventFilters(updatedWebhook.getEventFilters()); initFilter(); createClient(); @@ -437,6 +437,7 @@ public class WebhookRepository extends EntityRepository { private void setStatus(Status status, Long attemptTime, Integer statusCode, String reason, Date date) throws IOException { + Webhook stored = dao.webhookDAO().findEntityById(webhook.getId()); webhook.setStatus(status); webhook .getFailureDetails() @@ -444,8 +445,8 @@ public class WebhookRepository extends EntityRepository { .withLastFailedStatusCode(statusCode) .withLastFailedReason(reason) .withNextAttempt(date); - // TODO versioning - storeEntity(webhook, true); + WebhookUpdater updater = new WebhookUpdater(stored, webhook, false); + updater.update(); } private synchronized void createClient() { @@ -496,6 +497,8 @@ public class WebhookRepository extends EntityRepository { public void entitySpecificUpdate() throws IOException { recordChange("enabled", original.getEntity().getEnabled(), updated.getEntity().getEnabled()); recordChange("status", original.getEntity().getStatus(), updated.getEntity().getStatus()); + recordChange("endPoint", original.getEntity().getEndpoint(), updated.getEntity().getEndpoint()); + recordChange("batchSize", original.getEntity().getBatchSize(), updated.getEntity().getBatchSize()); recordChange( "failureDetails", original.getEntity().getFailureDetails(), diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java index ae517af627f..7d7d2254a9c 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/events/WebhookResource.java @@ -282,8 +282,9 @@ public class WebhookResource { public Response deleteWebhook( @Context UriInfo uriInfo, @Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id) - throws IOException, GeneralSecurityException, ParseException { + throws IOException, GeneralSecurityException, ParseException, InterruptedException { dao.delete(id); + dao.deleteWebhookPublisher(UUID.fromString(id)); return Response.ok().build(); } @@ -292,7 +293,7 @@ public class WebhookResource { .withDescription(create.getDescription()) .withName(create.getName()) .withId(UUID.randomUUID()) - .withEndPoint(create.getEndPoint()) + .withEndpoint(create.getEndpoint()) .withEventFilters(create.getEventFilters()) .withBatchSize(create.getBatchSize()) .withTimeout(create.getTimeout()) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index da85050c2cc..c7d0dbc769f 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -109,7 +109,9 @@ public final class EntityUtil { public static final BiPredicate mlFeatureMatch = MlFeature::equals; public static final BiPredicate mlHyperParameterMatch = MlHyperParameter::equals; public static final BiPredicate failureDetailsMatch = - (failureDetails1, failureDetails2) -> failureDetails1.getLastFailedAt().equals(failureDetails2.getLastFailedAt()); + (failureDetails1, failureDetails2) -> + failureDetails1.getLastFailedAt().equals(failureDetails2.getLastFailedAt()) && + failureDetails1.getLastSuccessfulAt().equals(failureDetails2.getLastSuccessfulAt()); private EntityUtil() {} diff --git a/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json b/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json index ca995084cfa..352d724d065 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/events/createWebhook.json @@ -15,13 +15,13 @@ "description": "Description of the application", "type": "string" }, - "endPoint": { + "endpoint": { "description": "Endpoint to receive the webhook events over POST requests.", "type": "string", "format": "uri" }, "eventFilters": { - "description": "Endpoint to receive the webhook events over POST requests.", + "description": "Event filters to filter for desired events.", "type": "array", "items": { "$ref": "../../type/changeEvent.json#/definitions/eventFilter" diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json b/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json index 09515f78afa..28a90acba31 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/events/webhook.json @@ -20,7 +20,7 @@ "description": "Description of the application", "type": "string" }, - "endPoint": { + "endpoint": { "description": "Endpoint to receive the webhook events over POST requests.", "type": "string", "format": "uri" diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java index d04ebd598cb..9d3ae21b864 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookCallbackResource.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; public class WebhookCallbackResource { public static final Logger LOG = LoggerFactory.getLogger(WebhookCallbackResource.class); private final AtomicInteger counter = new AtomicInteger(); + private volatile long counterStartTime; private final ConcurrentLinkedQueue changeEvents = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue changeEventsSlowServer = new ConcurrentLinkedQueue<>(); @@ -56,14 +57,17 @@ public class WebhookCallbackResource { @Path("/counter") public Response receiveEventCount( @Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) { + if (counter.get() == 0) { + counterStartTime = events.getData().get(0).getDateTime().getTime(); + } counter.incrementAndGet(); LOG.info("callback /counter received event. Current count {}", counter.get()); return Response.ok().build(); } - public int getCount() { - return counter.get(); - } + public int getCount() { return counter.get(); } + public void resetCount() { counter.set(0); } + public long getCountStartTime() { return counterStartTime; } /** Webhook endpoint that immediately responds to callback. The events received are ignored */ @POST @@ -150,7 +154,7 @@ public class WebhookCallbackResource { String key = eventType + ":" + entityType; List list = entityCallbackMap.get(key); if (list == null) { - list = new ArrayList(); + list = new ArrayList<>(); entityCallbackMap.put(key, list); } else { list.addAll(events.getData()); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java index 306283f7102..89f4df3c401 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/events/WebhookResourceTest.java @@ -30,19 +30,24 @@ import java.util.concurrent.ConcurrentLinkedQueue; import javax.ws.rs.core.Response; import org.apache.http.client.HttpResponseException; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.api.events.CreateWebhook; import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookEntityInterface; import org.openmetadata.catalog.resources.EntityResourceTest; import org.openmetadata.catalog.resources.events.WebhookResource.WebhookList; +import org.openmetadata.catalog.type.ChangeDescription; import org.openmetadata.catalog.type.ChangeEvent; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EventFilter; import org.openmetadata.catalog.type.EventType; +import org.openmetadata.catalog.type.FailureDetails; +import org.openmetadata.catalog.type.FieldChange; import org.openmetadata.catalog.type.Webhook; import org.openmetadata.catalog.type.Webhook.Status; import org.openmetadata.catalog.util.EntityInterface; import org.openmetadata.catalog.util.TestUtils; +import org.openmetadata.catalog.util.TestUtils.UpdateType; public class WebhookResourceTest extends EntityResourceTest { public static List ALL_EVENTS_FILTER = new ArrayList<>(); @@ -61,9 +66,10 @@ public class WebhookResourceTest extends EntityResourceTest { @Test public void post_webhookEnabledStateChange() throws URISyntaxException, IOException, InterruptedException { // Disabled webhook will not start webhook publisher + webhookCallbackResource.resetCount(); String uri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter"; CreateWebhook create = - createRequest("disabledWebhook", "", "", null).withEnabled(false).withEndPoint(URI.create(uri)); + createRequest("disabledWebhook", "", "", null).withEnabled(false).withEndpoint(URI.create(uri)); Webhook webhook = createAndCheckEntity(create, adminAuthHeaders()); assertEquals(Status.NOT_STARTED, webhook.getStatus()); Webhook getWebhook = getEntity(webhook.getId(), adminAuthHeaders()); @@ -72,9 +78,18 @@ public class WebhookResourceTest extends EntityResourceTest { // Now enable the webhook int counter = webhookCallbackResource.getCount(); - create.withEnabled(true); - getWebhook = updateEntity(create, Response.Status.OK, adminAuthHeaders()); - assertEquals(Status.SUCCESS, getWebhook.getStatus()); + ChangeDescription change = getChangeDescription(webhook.getVersion()); + change.getFieldsUpdated().add(new FieldChange().withName("enabled").withOldValue(false).withNewValue(true)); + change + .getFieldsUpdated() + .add(new FieldChange().withName("status").withOldValue(Status.NOT_STARTED).withNewValue(Status.SUCCESS)); + change + .getFieldsUpdated() + .add(new FieldChange().withName("batchSize").withOldValue(10).withNewValue(50)); + create.withEnabled(true).withBatchSize(50); + + webhook = updateAndCheckEntity(create, Response.Status.OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change); + assertEquals(Status.SUCCESS, webhook.getStatus()); getWebhook = getEntity(webhook.getId(), adminAuthHeaders()); assertEquals(Status.SUCCESS, getWebhook.getStatus()); @@ -88,8 +103,15 @@ public class WebhookResourceTest extends EntityResourceTest { // Disable the webhook and ensure it is disabled create.withEnabled(false); - getWebhook = updateEntity(create, Response.Status.OK, adminAuthHeaders()); + change = getChangeDescription(webhook.getVersion()); + change.getFieldsUpdated().add(new FieldChange().withName("enabled").withOldValue(true).withNewValue(false)); + change + .getFieldsUpdated() + .add(new FieldChange().withName("status").withOldValue(Status.SUCCESS).withNewValue(Status.NOT_STARTED)); + + getWebhook = updateAndCheckEntity(create, Response.Status.OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change); assertEquals(Status.NOT_STARTED, getWebhook.getStatus()); + getWebhook = getEntity(webhook.getId(), adminAuthHeaders()); assertEquals(Status.NOT_STARTED, getWebhook.getStatus()); @@ -100,6 +122,48 @@ public class WebhookResourceTest extends EntityResourceTest { iterations++; assertEquals(counter + 1, webhookCallbackResource.getCount()); // Event counter remains the same } + + deleteEntity(webhook.getId(), adminAuthHeaders()); + } + + @Test + public void put_updateEndpointURL() throws URISyntaxException, IOException, InterruptedException { + CreateWebhook create = + createRequest("replaceURL", "", "", null).withEndpoint(URI.create("http://invalidUnknowHost")); + Webhook webhook = createAndCheckEntity(create, adminAuthHeaders()); + + // Wait for webhook to be marked as failed + int iteration = 0; + Webhook getWebhook = getEntity(webhook.getId(), adminAuthHeaders()); + LOG.info("getWebhook {}", getWebhook); + while (getWebhook.getStatus() != Status.FAILED && iteration < 100) { + getWebhook = getEntity(webhook.getId(), adminAuthHeaders()); + LOG.info("getWebhook {}", getWebhook); + Thread.sleep(100); + iteration++; + } + assertEquals(Status.FAILED, getWebhook.getStatus()); + FailureDetails failureDetails = getWebhook.getFailureDetails(); + + // Now change the webhook URL to a valid URL and ensure callbacks resume + String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter"; + create = create.withEndpoint(URI.create(baseUri)); + ChangeDescription change = getChangeDescription(getWebhook.getVersion()); + change.getFieldsDeleted().add(new FieldChange().withName("failureDetails").withOldValue(failureDetails)); + change + .getFieldsUpdated() + .add( + new FieldChange() + .withName("endPoint") + .withOldValue(webhook.getEndpoint()) + .withNewValue(create.getEndpoint())); + change + .getFieldsUpdated() + .add(new FieldChange().withName("status").withOldValue(Status.FAILED).withNewValue(Status.SUCCESS)); + webhook = updateAndCheckEntity(create, Response.Status.OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change); + assertEquals(Status.SUCCESS, webhook.getStatus()); + + deleteEntity(webhook.getId(), adminAuthHeaders()); } @Override @@ -110,7 +174,7 @@ public class WebhookResourceTest extends EntityResourceTest { .withName(name) .withDescription(description) .withEventFilters(ALL_EVENTS_FILTER) - .withEndPoint(URI.create(uri)) + .withEndpoint(URI.create(uri)) .withBatchSize(100); } @@ -155,7 +219,7 @@ public class WebhookResourceTest extends EntityResourceTest { // Valid webhook callback String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook"; CreateWebhook createWebhook = - createRequest("validWebhook", "validWebhook", "", null).withEndPoint(URI.create(baseUri)); + createRequest("validWebhook", "validWebhook", "", null).withEndpoint(URI.create(baseUri)); createEntity(createWebhook, adminAuthHeaders()); } @@ -214,16 +278,15 @@ public class WebhookResourceTest extends EntityResourceTest { @Test public void testDifferentTypesOfWebhooks() throws IOException, InterruptedException, URISyntaxException { - Thread.sleep(1000); String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook"; // Create multiple webhooks each with different type of response to callback - createWebhook("slowServer", baseUri + "/slowServer"); // Callback response 1 second slower - createWebhook("callbackTimeout", baseUri + "/timeout"); // Callback response 12 seconds slower - createWebhook("callbackResponse300", baseUri + "/300"); // 3xx response - createWebhook("callbackResponse400", baseUri + "/400"); // 4xx response - createWebhook("callbackResponse500", baseUri + "/500"); // 5xx response - createWebhook("invalidEndpoint", "http://invalidUnknownHost"); // Invalid URL + Webhook w1 = createWebhook("slowServer", baseUri + "/slowServer"); // Callback response 1 second slower + Webhook w2 = createWebhook("callbackTimeout", baseUri + "/timeout"); // Callback response 12 seconds slower + Webhook w3 = createWebhook("callbackResponse300", baseUri + "/300"); // 3xx response + Webhook w4 = createWebhook("callbackResponse400", baseUri + "/400"); // 4xx response + Webhook w5 = createWebhook("callbackResponse500", baseUri + "/500"); // 5xx response + Webhook w6 = createWebhook("invalidEndpoint", "http://invalidUnknownHost"); // Invalid URL // Now check state of webhooks created ConcurrentLinkedQueue callbackEvents = webhookCallbackResource.getEventsSlowServer(); @@ -243,16 +306,25 @@ public class WebhookResourceTest extends EntityResourceTest { assertWebhookStatus("callbackResponse400", Status.AWAITING_RETRY, 400, "Bad Request"); assertWebhookStatus("callbackResponse500", Status.AWAITING_RETRY, 500, "Internal Server Error"); assertWebhookStatus("invalidEndpoint", Status.FAILED, null, "UnknownHostException"); + + // Delete all webhooks + deleteEntity(w1.getId(), adminAuthHeaders()); + deleteEntity(w2.getId(), adminAuthHeaders()); + deleteEntity(w3.getId(), adminAuthHeaders()); + deleteEntity(w4.getId(), adminAuthHeaders()); + deleteEntity(w5.getId(), adminAuthHeaders()); + deleteEntity(w6.getId(), adminAuthHeaders()); } - public void createWebhook(String name, String uri) throws URISyntaxException, IOException { - createWebhook(name, uri, ALL_EVENTS_FILTER); + public Webhook createWebhook(String name, String uri) throws URISyntaxException, IOException { + return createWebhook(name, uri, ALL_EVENTS_FILTER); } - public void createWebhook(String name, String uri, List filters) throws URISyntaxException, IOException { + public Webhook createWebhook(String name, String uri, List filters) + throws URISyntaxException, IOException { CreateWebhook createWebhook = - createRequest(name, "", "", null).withEndPoint(URI.create(uri)).withEventFilters(filters); - createAndCheckEntity(createWebhook, adminAuthHeaders()); + createRequest(name, "", "", null).withEndpoint(URI.create(uri)).withEventFilters(filters); + return createAndCheckEntity(createWebhook, adminAuthHeaders()); } public void assertWebhookStatusSuccess(String name) throws HttpResponseException {