mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-20 06:21:45 +00:00
parent
793f64f0df
commit
bdb99bfcd0
@ -98,6 +98,7 @@ public class EventPubSub {
|
|||||||
|
|
||||||
public static void removeProcessor(BatchEventProcessor<ChangeEventHolder> processor) {
|
public static void removeProcessor(BatchEventProcessor<ChangeEventHolder> processor) {
|
||||||
ringBuffer.removeGatingSequence(processor.getSequence());
|
ringBuffer.removeGatingSequence(processor.getSequence());
|
||||||
|
LOG.info("Processor removed for {}", processor);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {}
|
public void close() {}
|
||||||
|
@ -154,7 +154,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void deleteWebhookPublisher(UUID id) throws InterruptedException {
|
public void deleteWebhookPublisher(UUID id) throws InterruptedException {
|
||||||
WebhookPublisher publisher = webhookPublisherMap.get(id);
|
WebhookPublisher publisher = webhookPublisherMap.get(id);
|
||||||
if (publisher != null) {
|
if (publisher != null) {
|
||||||
publisher.getProcessor().halt();
|
publisher.getProcessor().halt();
|
||||||
@ -330,13 +330,13 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStart() {
|
public void onStart() {
|
||||||
LOG.info("Webhook-lifecycle-onStart {}", webhook.getName());
|
|
||||||
createClient();
|
createClient();
|
||||||
webhook.withFailureDetails(new FailureDetails());
|
webhook.withFailureDetails(new FailureDetails());
|
||||||
|
|
||||||
// TODO clean this up
|
// TODO clean this up
|
||||||
Map<String, String> authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org");
|
Map<String, String> authHeaders = SecurityUtil.authHeaders("admin@open-metadata.org");
|
||||||
target = SecurityUtil.addHeaders(client.target(webhook.getEndPoint()), authHeaders);
|
target = SecurityUtil.addHeaders(client.target(webhook.getEndpoint()), authHeaders);
|
||||||
|
LOG.info("Webhook-lifecycle-onStart {}", webhook.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -367,8 +367,8 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
|||||||
// 2xx response means call back is successful
|
// 2xx response means call back is successful
|
||||||
if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses
|
if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses
|
||||||
batch.clear();
|
batch.clear();
|
||||||
webhook.getFailureDetails().setLastSuccessfulAt(changeEventHolder.get().getDateTime().getTime());
|
|
||||||
if (webhook.getStatus() != Status.SUCCESS) {
|
if (webhook.getStatus() != Status.SUCCESS) {
|
||||||
|
webhook.getFailureDetails().setLastSuccessfulAt(changeEventHolder.get().getDateTime().getTime());
|
||||||
setStatus(Status.SUCCESS, null, null, null, null);
|
setStatus(Status.SUCCESS, null, null, null, null);
|
||||||
}
|
}
|
||||||
// 3xx response/redirection is not allowed for callback. Set the webhook state as in error
|
// 3xx response/redirection is not allowed for callback. Set the webhook state as in error
|
||||||
@ -383,7 +383,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
|||||||
} catch (ProcessingException ex) {
|
} catch (ProcessingException ex) {
|
||||||
Throwable cause = ex.getCause();
|
Throwable cause = ex.getCause();
|
||||||
if (cause.getClass() == UnknownHostException.class) {
|
if (cause.getClass() == UnknownHostException.class) {
|
||||||
LOG.warn("Invalid webhook {} endpoint {}", webhook.getName(), webhook.getEndPoint());
|
LOG.warn("Invalid webhook {} endpoint {}", webhook.getName(), webhook.getEndpoint());
|
||||||
setErrorStatus(attemptTime, null, "UnknownHostException");
|
setErrorStatus(attemptTime, null, "UnknownHostException");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -406,7 +406,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
|||||||
currentBackoffTime = BACKOFF_NORMAL;
|
currentBackoffTime = BACKOFF_NORMAL;
|
||||||
webhook.setTimeout(updatedWebhook.getTimeout());
|
webhook.setTimeout(updatedWebhook.getTimeout());
|
||||||
webhook.setBatchSize(updatedWebhook.getBatchSize());
|
webhook.setBatchSize(updatedWebhook.getBatchSize());
|
||||||
webhook.setEndPoint(updatedWebhook.getEndPoint());
|
webhook.setEndpoint(updatedWebhook.getEndpoint());
|
||||||
webhook.setEventFilters(updatedWebhook.getEventFilters());
|
webhook.setEventFilters(updatedWebhook.getEventFilters());
|
||||||
initFilter();
|
initFilter();
|
||||||
createClient();
|
createClient();
|
||||||
@ -437,6 +437,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
|||||||
|
|
||||||
private void setStatus(Status status, Long attemptTime, Integer statusCode, String reason, Date date)
|
private void setStatus(Status status, Long attemptTime, Integer statusCode, String reason, Date date)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
Webhook stored = dao.webhookDAO().findEntityById(webhook.getId());
|
||||||
webhook.setStatus(status);
|
webhook.setStatus(status);
|
||||||
webhook
|
webhook
|
||||||
.getFailureDetails()
|
.getFailureDetails()
|
||||||
@ -444,8 +445,8 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
|||||||
.withLastFailedStatusCode(statusCode)
|
.withLastFailedStatusCode(statusCode)
|
||||||
.withLastFailedReason(reason)
|
.withLastFailedReason(reason)
|
||||||
.withNextAttempt(date);
|
.withNextAttempt(date);
|
||||||
// TODO versioning
|
WebhookUpdater updater = new WebhookUpdater(stored, webhook, false);
|
||||||
storeEntity(webhook, true);
|
updater.update();
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void createClient() {
|
private synchronized void createClient() {
|
||||||
@ -496,6 +497,8 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
|||||||
public void entitySpecificUpdate() throws IOException {
|
public void entitySpecificUpdate() throws IOException {
|
||||||
recordChange("enabled", original.getEntity().getEnabled(), updated.getEntity().getEnabled());
|
recordChange("enabled", original.getEntity().getEnabled(), updated.getEntity().getEnabled());
|
||||||
recordChange("status", original.getEntity().getStatus(), updated.getEntity().getStatus());
|
recordChange("status", original.getEntity().getStatus(), updated.getEntity().getStatus());
|
||||||
|
recordChange("endPoint", original.getEntity().getEndpoint(), updated.getEntity().getEndpoint());
|
||||||
|
recordChange("batchSize", original.getEntity().getBatchSize(), updated.getEntity().getBatchSize());
|
||||||
recordChange(
|
recordChange(
|
||||||
"failureDetails",
|
"failureDetails",
|
||||||
original.getEntity().getFailureDetails(),
|
original.getEntity().getFailureDetails(),
|
||||||
|
@ -282,8 +282,9 @@ public class WebhookResource {
|
|||||||
public Response deleteWebhook(
|
public Response deleteWebhook(
|
||||||
@Context UriInfo uriInfo,
|
@Context UriInfo uriInfo,
|
||||||
@Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id)
|
@Parameter(description = "webhook Id", schema = @Schema(type = "string")) @PathParam("id") String id)
|
||||||
throws IOException, GeneralSecurityException, ParseException {
|
throws IOException, GeneralSecurityException, ParseException, InterruptedException {
|
||||||
dao.delete(id);
|
dao.delete(id);
|
||||||
|
dao.deleteWebhookPublisher(UUID.fromString(id));
|
||||||
return Response.ok().build();
|
return Response.ok().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,7 +293,7 @@ public class WebhookResource {
|
|||||||
.withDescription(create.getDescription())
|
.withDescription(create.getDescription())
|
||||||
.withName(create.getName())
|
.withName(create.getName())
|
||||||
.withId(UUID.randomUUID())
|
.withId(UUID.randomUUID())
|
||||||
.withEndPoint(create.getEndPoint())
|
.withEndpoint(create.getEndpoint())
|
||||||
.withEventFilters(create.getEventFilters())
|
.withEventFilters(create.getEventFilters())
|
||||||
.withBatchSize(create.getBatchSize())
|
.withBatchSize(create.getBatchSize())
|
||||||
.withTimeout(create.getTimeout())
|
.withTimeout(create.getTimeout())
|
||||||
|
@ -109,7 +109,9 @@ public final class EntityUtil {
|
|||||||
public static final BiPredicate<MlFeature, MlFeature> mlFeatureMatch = MlFeature::equals;
|
public static final BiPredicate<MlFeature, MlFeature> mlFeatureMatch = MlFeature::equals;
|
||||||
public static final BiPredicate<MlHyperParameter, MlHyperParameter> mlHyperParameterMatch = MlHyperParameter::equals;
|
public static final BiPredicate<MlHyperParameter, MlHyperParameter> mlHyperParameterMatch = MlHyperParameter::equals;
|
||||||
public static final BiPredicate<FailureDetails, FailureDetails> failureDetailsMatch =
|
public static final BiPredicate<FailureDetails, FailureDetails> failureDetailsMatch =
|
||||||
(failureDetails1, failureDetails2) -> failureDetails1.getLastFailedAt().equals(failureDetails2.getLastFailedAt());
|
(failureDetails1, failureDetails2) ->
|
||||||
|
failureDetails1.getLastFailedAt().equals(failureDetails2.getLastFailedAt()) &&
|
||||||
|
failureDetails1.getLastSuccessfulAt().equals(failureDetails2.getLastSuccessfulAt());
|
||||||
|
|
||||||
private EntityUtil() {}
|
private EntityUtil() {}
|
||||||
|
|
||||||
|
@ -15,13 +15,13 @@
|
|||||||
"description": "Description of the application",
|
"description": "Description of the application",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"endPoint": {
|
"endpoint": {
|
||||||
"description": "Endpoint to receive the webhook events over POST requests.",
|
"description": "Endpoint to receive the webhook events over POST requests.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"format": "uri"
|
"format": "uri"
|
||||||
},
|
},
|
||||||
"eventFilters": {
|
"eventFilters": {
|
||||||
"description": "Endpoint to receive the webhook events over POST requests.",
|
"description": "Event filters to filter for desired events.",
|
||||||
"type": "array",
|
"type": "array",
|
||||||
"items": {
|
"items": {
|
||||||
"$ref": "../../type/changeEvent.json#/definitions/eventFilter"
|
"$ref": "../../type/changeEvent.json#/definitions/eventFilter"
|
||||||
|
@ -20,7 +20,7 @@
|
|||||||
"description": "Description of the application",
|
"description": "Description of the application",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"endPoint": {
|
"endpoint": {
|
||||||
"description": "Endpoint to receive the webhook events over POST requests.",
|
"description": "Endpoint to receive the webhook events over POST requests.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"format": "uri"
|
"format": "uri"
|
||||||
|
@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
public class WebhookCallbackResource {
|
public class WebhookCallbackResource {
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(WebhookCallbackResource.class);
|
public static final Logger LOG = LoggerFactory.getLogger(WebhookCallbackResource.class);
|
||||||
private final AtomicInteger counter = new AtomicInteger();
|
private final AtomicInteger counter = new AtomicInteger();
|
||||||
|
private volatile long counterStartTime;
|
||||||
private final ConcurrentLinkedQueue<ChangeEvent> changeEvents = new ConcurrentLinkedQueue<>();
|
private final ConcurrentLinkedQueue<ChangeEvent> changeEvents = new ConcurrentLinkedQueue<>();
|
||||||
private final ConcurrentLinkedQueue<ChangeEvent> changeEventsSlowServer = new ConcurrentLinkedQueue<>();
|
private final ConcurrentLinkedQueue<ChangeEvent> changeEventsSlowServer = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
@ -56,14 +57,17 @@ public class WebhookCallbackResource {
|
|||||||
@Path("/counter")
|
@Path("/counter")
|
||||||
public Response receiveEventCount(
|
public Response receiveEventCount(
|
||||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
|
@Context UriInfo uriInfo, @Context SecurityContext securityContext, EventResource.ChangeEventList events) {
|
||||||
|
if (counter.get() == 0) {
|
||||||
|
counterStartTime = events.getData().get(0).getDateTime().getTime();
|
||||||
|
}
|
||||||
counter.incrementAndGet();
|
counter.incrementAndGet();
|
||||||
LOG.info("callback /counter received event. Current count {}", counter.get());
|
LOG.info("callback /counter received event. Current count {}", counter.get());
|
||||||
return Response.ok().build();
|
return Response.ok().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getCount() {
|
public int getCount() { return counter.get(); }
|
||||||
return counter.get();
|
public void resetCount() { counter.set(0); }
|
||||||
}
|
public long getCountStartTime() { return counterStartTime; }
|
||||||
|
|
||||||
/** Webhook endpoint that immediately responds to callback. The events received are ignored */
|
/** Webhook endpoint that immediately responds to callback. The events received are ignored */
|
||||||
@POST
|
@POST
|
||||||
@ -150,7 +154,7 @@ public class WebhookCallbackResource {
|
|||||||
String key = eventType + ":" + entityType;
|
String key = eventType + ":" + entityType;
|
||||||
List<ChangeEvent> list = entityCallbackMap.get(key);
|
List<ChangeEvent> list = entityCallbackMap.get(key);
|
||||||
if (list == null) {
|
if (list == null) {
|
||||||
list = new ArrayList<ChangeEvent>();
|
list = new ArrayList<>();
|
||||||
entityCallbackMap.put(key, list);
|
entityCallbackMap.put(key, list);
|
||||||
} else {
|
} else {
|
||||||
list.addAll(events.getData());
|
list.addAll(events.getData());
|
||||||
|
@ -30,19 +30,24 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import org.apache.http.client.HttpResponseException;
|
import org.apache.http.client.HttpResponseException;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.TestInfo;
|
||||||
import org.openmetadata.catalog.Entity;
|
import org.openmetadata.catalog.Entity;
|
||||||
import org.openmetadata.catalog.api.events.CreateWebhook;
|
import org.openmetadata.catalog.api.events.CreateWebhook;
|
||||||
import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookEntityInterface;
|
import org.openmetadata.catalog.jdbi3.WebhookRepository.WebhookEntityInterface;
|
||||||
import org.openmetadata.catalog.resources.EntityResourceTest;
|
import org.openmetadata.catalog.resources.EntityResourceTest;
|
||||||
import org.openmetadata.catalog.resources.events.WebhookResource.WebhookList;
|
import org.openmetadata.catalog.resources.events.WebhookResource.WebhookList;
|
||||||
|
import org.openmetadata.catalog.type.ChangeDescription;
|
||||||
import org.openmetadata.catalog.type.ChangeEvent;
|
import org.openmetadata.catalog.type.ChangeEvent;
|
||||||
import org.openmetadata.catalog.type.EntityReference;
|
import org.openmetadata.catalog.type.EntityReference;
|
||||||
import org.openmetadata.catalog.type.EventFilter;
|
import org.openmetadata.catalog.type.EventFilter;
|
||||||
import org.openmetadata.catalog.type.EventType;
|
import org.openmetadata.catalog.type.EventType;
|
||||||
|
import org.openmetadata.catalog.type.FailureDetails;
|
||||||
|
import org.openmetadata.catalog.type.FieldChange;
|
||||||
import org.openmetadata.catalog.type.Webhook;
|
import org.openmetadata.catalog.type.Webhook;
|
||||||
import org.openmetadata.catalog.type.Webhook.Status;
|
import org.openmetadata.catalog.type.Webhook.Status;
|
||||||
import org.openmetadata.catalog.util.EntityInterface;
|
import org.openmetadata.catalog.util.EntityInterface;
|
||||||
import org.openmetadata.catalog.util.TestUtils;
|
import org.openmetadata.catalog.util.TestUtils;
|
||||||
|
import org.openmetadata.catalog.util.TestUtils.UpdateType;
|
||||||
|
|
||||||
public class WebhookResourceTest extends EntityResourceTest<Webhook> {
|
public class WebhookResourceTest extends EntityResourceTest<Webhook> {
|
||||||
public static List<EventFilter> ALL_EVENTS_FILTER = new ArrayList<>();
|
public static List<EventFilter> ALL_EVENTS_FILTER = new ArrayList<>();
|
||||||
@ -61,9 +66,10 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
|
|||||||
@Test
|
@Test
|
||||||
public void post_webhookEnabledStateChange() throws URISyntaxException, IOException, InterruptedException {
|
public void post_webhookEnabledStateChange() throws URISyntaxException, IOException, InterruptedException {
|
||||||
// Disabled webhook will not start webhook publisher
|
// Disabled webhook will not start webhook publisher
|
||||||
|
webhookCallbackResource.resetCount();
|
||||||
String uri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter";
|
String uri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter";
|
||||||
CreateWebhook create =
|
CreateWebhook create =
|
||||||
createRequest("disabledWebhook", "", "", null).withEnabled(false).withEndPoint(URI.create(uri));
|
createRequest("disabledWebhook", "", "", null).withEnabled(false).withEndpoint(URI.create(uri));
|
||||||
Webhook webhook = createAndCheckEntity(create, adminAuthHeaders());
|
Webhook webhook = createAndCheckEntity(create, adminAuthHeaders());
|
||||||
assertEquals(Status.NOT_STARTED, webhook.getStatus());
|
assertEquals(Status.NOT_STARTED, webhook.getStatus());
|
||||||
Webhook getWebhook = getEntity(webhook.getId(), adminAuthHeaders());
|
Webhook getWebhook = getEntity(webhook.getId(), adminAuthHeaders());
|
||||||
@ -72,9 +78,18 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
|
|||||||
|
|
||||||
// Now enable the webhook
|
// Now enable the webhook
|
||||||
int counter = webhookCallbackResource.getCount();
|
int counter = webhookCallbackResource.getCount();
|
||||||
create.withEnabled(true);
|
ChangeDescription change = getChangeDescription(webhook.getVersion());
|
||||||
getWebhook = updateEntity(create, Response.Status.OK, adminAuthHeaders());
|
change.getFieldsUpdated().add(new FieldChange().withName("enabled").withOldValue(false).withNewValue(true));
|
||||||
assertEquals(Status.SUCCESS, getWebhook.getStatus());
|
change
|
||||||
|
.getFieldsUpdated()
|
||||||
|
.add(new FieldChange().withName("status").withOldValue(Status.NOT_STARTED).withNewValue(Status.SUCCESS));
|
||||||
|
change
|
||||||
|
.getFieldsUpdated()
|
||||||
|
.add(new FieldChange().withName("batchSize").withOldValue(10).withNewValue(50));
|
||||||
|
create.withEnabled(true).withBatchSize(50);
|
||||||
|
|
||||||
|
webhook = updateAndCheckEntity(create, Response.Status.OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change);
|
||||||
|
assertEquals(Status.SUCCESS, webhook.getStatus());
|
||||||
getWebhook = getEntity(webhook.getId(), adminAuthHeaders());
|
getWebhook = getEntity(webhook.getId(), adminAuthHeaders());
|
||||||
assertEquals(Status.SUCCESS, getWebhook.getStatus());
|
assertEquals(Status.SUCCESS, getWebhook.getStatus());
|
||||||
|
|
||||||
@ -88,8 +103,15 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
|
|||||||
|
|
||||||
// Disable the webhook and ensure it is disabled
|
// Disable the webhook and ensure it is disabled
|
||||||
create.withEnabled(false);
|
create.withEnabled(false);
|
||||||
getWebhook = updateEntity(create, Response.Status.OK, adminAuthHeaders());
|
change = getChangeDescription(webhook.getVersion());
|
||||||
|
change.getFieldsUpdated().add(new FieldChange().withName("enabled").withOldValue(true).withNewValue(false));
|
||||||
|
change
|
||||||
|
.getFieldsUpdated()
|
||||||
|
.add(new FieldChange().withName("status").withOldValue(Status.SUCCESS).withNewValue(Status.NOT_STARTED));
|
||||||
|
|
||||||
|
getWebhook = updateAndCheckEntity(create, Response.Status.OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change);
|
||||||
assertEquals(Status.NOT_STARTED, getWebhook.getStatus());
|
assertEquals(Status.NOT_STARTED, getWebhook.getStatus());
|
||||||
|
|
||||||
getWebhook = getEntity(webhook.getId(), adminAuthHeaders());
|
getWebhook = getEntity(webhook.getId(), adminAuthHeaders());
|
||||||
assertEquals(Status.NOT_STARTED, getWebhook.getStatus());
|
assertEquals(Status.NOT_STARTED, getWebhook.getStatus());
|
||||||
|
|
||||||
@ -100,6 +122,48 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
|
|||||||
iterations++;
|
iterations++;
|
||||||
assertEquals(counter + 1, webhookCallbackResource.getCount()); // Event counter remains the same
|
assertEquals(counter + 1, webhookCallbackResource.getCount()); // Event counter remains the same
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deleteEntity(webhook.getId(), adminAuthHeaders());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void put_updateEndpointURL() throws URISyntaxException, IOException, InterruptedException {
|
||||||
|
CreateWebhook create =
|
||||||
|
createRequest("replaceURL", "", "", null).withEndpoint(URI.create("http://invalidUnknowHost"));
|
||||||
|
Webhook webhook = createAndCheckEntity(create, adminAuthHeaders());
|
||||||
|
|
||||||
|
// Wait for webhook to be marked as failed
|
||||||
|
int iteration = 0;
|
||||||
|
Webhook getWebhook = getEntity(webhook.getId(), adminAuthHeaders());
|
||||||
|
LOG.info("getWebhook {}", getWebhook);
|
||||||
|
while (getWebhook.getStatus() != Status.FAILED && iteration < 100) {
|
||||||
|
getWebhook = getEntity(webhook.getId(), adminAuthHeaders());
|
||||||
|
LOG.info("getWebhook {}", getWebhook);
|
||||||
|
Thread.sleep(100);
|
||||||
|
iteration++;
|
||||||
|
}
|
||||||
|
assertEquals(Status.FAILED, getWebhook.getStatus());
|
||||||
|
FailureDetails failureDetails = getWebhook.getFailureDetails();
|
||||||
|
|
||||||
|
// Now change the webhook URL to a valid URL and ensure callbacks resume
|
||||||
|
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter";
|
||||||
|
create = create.withEndpoint(URI.create(baseUri));
|
||||||
|
ChangeDescription change = getChangeDescription(getWebhook.getVersion());
|
||||||
|
change.getFieldsDeleted().add(new FieldChange().withName("failureDetails").withOldValue(failureDetails));
|
||||||
|
change
|
||||||
|
.getFieldsUpdated()
|
||||||
|
.add(
|
||||||
|
new FieldChange()
|
||||||
|
.withName("endPoint")
|
||||||
|
.withOldValue(webhook.getEndpoint())
|
||||||
|
.withNewValue(create.getEndpoint()));
|
||||||
|
change
|
||||||
|
.getFieldsUpdated()
|
||||||
|
.add(new FieldChange().withName("status").withOldValue(Status.FAILED).withNewValue(Status.SUCCESS));
|
||||||
|
webhook = updateAndCheckEntity(create, Response.Status.OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change);
|
||||||
|
assertEquals(Status.SUCCESS, webhook.getStatus());
|
||||||
|
|
||||||
|
deleteEntity(webhook.getId(), adminAuthHeaders());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -110,7 +174,7 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
|
|||||||
.withName(name)
|
.withName(name)
|
||||||
.withDescription(description)
|
.withDescription(description)
|
||||||
.withEventFilters(ALL_EVENTS_FILTER)
|
.withEventFilters(ALL_EVENTS_FILTER)
|
||||||
.withEndPoint(URI.create(uri))
|
.withEndpoint(URI.create(uri))
|
||||||
.withBatchSize(100);
|
.withBatchSize(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,7 +219,7 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
|
|||||||
// Valid webhook callback
|
// Valid webhook callback
|
||||||
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
|
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
|
||||||
CreateWebhook createWebhook =
|
CreateWebhook createWebhook =
|
||||||
createRequest("validWebhook", "validWebhook", "", null).withEndPoint(URI.create(baseUri));
|
createRequest("validWebhook", "validWebhook", "", null).withEndpoint(URI.create(baseUri));
|
||||||
createEntity(createWebhook, adminAuthHeaders());
|
createEntity(createWebhook, adminAuthHeaders());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,16 +278,15 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDifferentTypesOfWebhooks() throws IOException, InterruptedException, URISyntaxException {
|
public void testDifferentTypesOfWebhooks() throws IOException, InterruptedException, URISyntaxException {
|
||||||
Thread.sleep(1000);
|
|
||||||
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
|
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
|
||||||
|
|
||||||
// Create multiple webhooks each with different type of response to callback
|
// Create multiple webhooks each with different type of response to callback
|
||||||
createWebhook("slowServer", baseUri + "/slowServer"); // Callback response 1 second slower
|
Webhook w1 = createWebhook("slowServer", baseUri + "/slowServer"); // Callback response 1 second slower
|
||||||
createWebhook("callbackTimeout", baseUri + "/timeout"); // Callback response 12 seconds slower
|
Webhook w2 = createWebhook("callbackTimeout", baseUri + "/timeout"); // Callback response 12 seconds slower
|
||||||
createWebhook("callbackResponse300", baseUri + "/300"); // 3xx response
|
Webhook w3 = createWebhook("callbackResponse300", baseUri + "/300"); // 3xx response
|
||||||
createWebhook("callbackResponse400", baseUri + "/400"); // 4xx response
|
Webhook w4 = createWebhook("callbackResponse400", baseUri + "/400"); // 4xx response
|
||||||
createWebhook("callbackResponse500", baseUri + "/500"); // 5xx response
|
Webhook w5 = createWebhook("callbackResponse500", baseUri + "/500"); // 5xx response
|
||||||
createWebhook("invalidEndpoint", "http://invalidUnknownHost"); // Invalid URL
|
Webhook w6 = createWebhook("invalidEndpoint", "http://invalidUnknownHost"); // Invalid URL
|
||||||
|
|
||||||
// Now check state of webhooks created
|
// Now check state of webhooks created
|
||||||
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = webhookCallbackResource.getEventsSlowServer();
|
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = webhookCallbackResource.getEventsSlowServer();
|
||||||
@ -243,16 +306,25 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook> {
|
|||||||
assertWebhookStatus("callbackResponse400", Status.AWAITING_RETRY, 400, "Bad Request");
|
assertWebhookStatus("callbackResponse400", Status.AWAITING_RETRY, 400, "Bad Request");
|
||||||
assertWebhookStatus("callbackResponse500", Status.AWAITING_RETRY, 500, "Internal Server Error");
|
assertWebhookStatus("callbackResponse500", Status.AWAITING_RETRY, 500, "Internal Server Error");
|
||||||
assertWebhookStatus("invalidEndpoint", Status.FAILED, null, "UnknownHostException");
|
assertWebhookStatus("invalidEndpoint", Status.FAILED, null, "UnknownHostException");
|
||||||
|
|
||||||
|
// Delete all webhooks
|
||||||
|
deleteEntity(w1.getId(), adminAuthHeaders());
|
||||||
|
deleteEntity(w2.getId(), adminAuthHeaders());
|
||||||
|
deleteEntity(w3.getId(), adminAuthHeaders());
|
||||||
|
deleteEntity(w4.getId(), adminAuthHeaders());
|
||||||
|
deleteEntity(w5.getId(), adminAuthHeaders());
|
||||||
|
deleteEntity(w6.getId(), adminAuthHeaders());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createWebhook(String name, String uri) throws URISyntaxException, IOException {
|
public Webhook createWebhook(String name, String uri) throws URISyntaxException, IOException {
|
||||||
createWebhook(name, uri, ALL_EVENTS_FILTER);
|
return createWebhook(name, uri, ALL_EVENTS_FILTER);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createWebhook(String name, String uri, List<EventFilter> filters) throws URISyntaxException, IOException {
|
public Webhook createWebhook(String name, String uri, List<EventFilter> filters)
|
||||||
|
throws URISyntaxException, IOException {
|
||||||
CreateWebhook createWebhook =
|
CreateWebhook createWebhook =
|
||||||
createRequest(name, "", "", null).withEndPoint(URI.create(uri)).withEventFilters(filters);
|
createRequest(name, "", "", null).withEndpoint(URI.create(uri)).withEventFilters(filters);
|
||||||
createAndCheckEntity(createWebhook, adminAuthHeaders());
|
return createAndCheckEntity(createWebhook, adminAuthHeaders());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void assertWebhookStatusSuccess(String name) throws HttpResponseException {
|
public void assertWebhookStatusSuccess(String name) throws HttpResponseException {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user