mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-01 11:09:14 +00:00
parent
9011b27bf6
commit
ac59a092ea
@ -115,7 +115,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
|
||||
public void addWebhookPublisher(Webhook webhook) {
|
||||
if (Boolean.FALSE.equals(webhook.getEnabled())) { // Only add webhook that is enabled for publishing events
|
||||
webhook.setStatus(Status.NOT_STARTED);
|
||||
webhook.setStatus(Status.DISABLED);
|
||||
return;
|
||||
}
|
||||
WebhookPublisher publisher = new WebhookPublisher(webhook);
|
||||
@ -138,7 +138,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
// Update the existing publisher
|
||||
Status status = previousPublisher.getWebhook().getStatus();
|
||||
previousPublisher.updateWebhook(webhook);
|
||||
if (status != Status.STARTED && status != Status.AWAITING_RETRY) {
|
||||
if (status != Status.ACTIVE && status != Status.AWAITING_RETRY) {
|
||||
// Restart the previously stopped publisher (in states notStarted, error, retryLimitReached)
|
||||
BatchEventProcessor<ChangeEventHolder> processor = EventPubSub.addEventHandler(previousPublisher);
|
||||
previousPublisher.setProcessor(processor);
|
||||
@ -357,8 +357,8 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
if (response.getStatus() >= 200 && response.getStatus() < 300) { // All 2xx responses
|
||||
batch.clear();
|
||||
webhook.getFailureDetails().setLastSuccessfulAt(changeEventHolder.get().getTimestamp());
|
||||
if (webhook.getStatus() != Status.STARTED) {
|
||||
setStatus(Status.STARTED, null, null, null, null);
|
||||
if (webhook.getStatus() != Status.ACTIVE) {
|
||||
setStatus(Status.ACTIVE, null, null, null, null);
|
||||
}
|
||||
// 3xx response/redirection is not allowed for callback. Set the webhook state as in error
|
||||
} else if (response.getStatus() >= 300 && response.getStatus() < 400) {
|
||||
@ -491,6 +491,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
recordChange("status", origWebhook.getStatus(), updatedWebhook.getStatus());
|
||||
recordChange("endPoint", origWebhook.getEndpoint(), updatedWebhook.getEndpoint());
|
||||
recordChange("batchSize", origWebhook.getBatchSize(), updatedWebhook.getBatchSize());
|
||||
recordChange("timeout", origWebhook.getTimeout(), updatedWebhook.getTimeout());
|
||||
if (fieldsChanged()) {
|
||||
// If updating the other fields, opportunistically use it to capture failure details
|
||||
WebhookPublisher publisher = WebhookRepository.this.getPublisher(origWebhook.getId());
|
||||
@ -499,7 +500,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
.withStatus(publisher.getWebhook().getStatus())
|
||||
.withFailureDetails(publisher.getWebhook().getFailureDetails());
|
||||
if (Boolean.FALSE.equals(updatedWebhook.getEnabled())) {
|
||||
updatedWebhook.setStatus(Status.NOT_STARTED);
|
||||
updatedWebhook.setStatus(Status.DISABLED);
|
||||
}
|
||||
}
|
||||
recordChange(
|
||||
|
||||
@ -252,7 +252,7 @@ public class WebhookResource {
|
||||
throws IOException, ParseException {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
Webhook webhook = getWebhook(securityContext, create);
|
||||
webhook.setStatus(Boolean.TRUE.equals(webhook.getEnabled()) ? Status.STARTED : Status.NOT_STARTED);
|
||||
webhook.setStatus(Boolean.TRUE.equals(webhook.getEnabled()) ? Status.ACTIVE : Status.DISABLED);
|
||||
webhook = dao.create(uriInfo, webhook);
|
||||
dao.addWebhookPublisher(webhook);
|
||||
return Response.created(webhook.getHref()).entity(webhook).build();
|
||||
@ -277,7 +277,7 @@ public class WebhookResource {
|
||||
// SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
// Table table = getTable(securityContext, create);
|
||||
Webhook webhook = getWebhook(securityContext, create);
|
||||
webhook.setStatus(Boolean.TRUE.equals(webhook.getEnabled()) ? Status.STARTED : Status.NOT_STARTED);
|
||||
webhook.setStatus(Boolean.TRUE.equals(webhook.getEnabled()) ? Status.ACTIVE : Status.DISABLED);
|
||||
PutResponse<Webhook> putResponse = dao.createOrUpdate(uriInfo, webhook);
|
||||
dao.updateWebhookPublisher(webhook);
|
||||
return putResponse.toResponse();
|
||||
|
||||
@ -64,14 +64,14 @@
|
||||
"type": "string"
|
||||
},
|
||||
"status": {
|
||||
"description": "Status is `notStarted`, when webhook was created with `enabled` set to false and it never started publishing events. Status is `started` when webhook is normally functioning and 200 OK response was received for callback notification. Status is `failed` on bad callback URL, connection failures, `1xx`, and `3xx` response was received for callback notification. Status is `awaitingRetry` when previous attempt at callback timed out or received `4xx`, `5xx` response. Status is `retryLimitReached` after all retries fail.",
|
||||
"description": "Status is `disabled`, when webhook was created with `enabled` set to false and it never started publishing events. Status is `active` when webhook is normally functioning and 200 OK response was received for callback notification. Status is `failed` on bad callback URL, connection failures, `1xx`, and `3xx` response was received for callback notification. Status is `awaitingRetry` when previous attempt at callback timed out or received `4xx`, `5xx` response. Status is `retryLimitReached` after all retries fail.",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"notStarted",
|
||||
"started",
|
||||
"disabled",
|
||||
"failed",
|
||||
"retryLimitReached",
|
||||
"awaitingRetry",
|
||||
"retryLimitReached"
|
||||
"active"
|
||||
]
|
||||
},
|
||||
"failureDetails": {
|
||||
|
||||
@ -78,9 +78,9 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
String uri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/" + webhookName;
|
||||
CreateWebhook create = createRequest(webhookName, "", "", null).withEnabled(false).withEndpoint(URI.create(uri));
|
||||
Webhook webhook = createAndCheckEntity(create, ADMIN_AUTH_HEADERS);
|
||||
assertEquals(Status.NOT_STARTED, webhook.getStatus());
|
||||
assertEquals(Status.DISABLED, webhook.getStatus());
|
||||
Webhook getWebhook = getEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
|
||||
assertEquals(Status.NOT_STARTED, getWebhook.getStatus());
|
||||
assertEquals(Status.DISABLED, getWebhook.getStatus());
|
||||
EventDetails details = webhookCallbackResource.getEventDetails(webhookName);
|
||||
assertNull(details);
|
||||
|
||||
@ -92,14 +92,14 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
change.getFieldsUpdated().add(new FieldChange().withName("enabled").withOldValue(false).withNewValue(true));
|
||||
change
|
||||
.getFieldsUpdated()
|
||||
.add(new FieldChange().withName("status").withOldValue(Status.NOT_STARTED).withNewValue(Status.STARTED));
|
||||
.add(new FieldChange().withName("status").withOldValue(Status.DISABLED).withNewValue(Status.ACTIVE));
|
||||
change.getFieldsUpdated().add(new FieldChange().withName("batchSize").withOldValue(10).withNewValue(50));
|
||||
create.withEnabled(true).withBatchSize(50);
|
||||
|
||||
webhook = updateAndCheckEntity(create, Response.Status.OK, ADMIN_AUTH_HEADERS, UpdateType.MINOR_UPDATE, change);
|
||||
assertEquals(Status.STARTED, webhook.getStatus());
|
||||
assertEquals(Status.ACTIVE, webhook.getStatus());
|
||||
getWebhook = getEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
|
||||
assertEquals(Status.STARTED, getWebhook.getStatus());
|
||||
assertEquals(Status.ACTIVE, getWebhook.getStatus());
|
||||
|
||||
// Ensure the call back notification has started
|
||||
details = waitForFirstEvent(webhookName, 25, 100);
|
||||
@ -119,15 +119,15 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
change.getFieldsUpdated().add(new FieldChange().withName("enabled").withOldValue(true).withNewValue(false));
|
||||
change
|
||||
.getFieldsUpdated()
|
||||
.add(new FieldChange().withName("status").withOldValue(Status.STARTED).withNewValue(Status.NOT_STARTED));
|
||||
.add(new FieldChange().withName("status").withOldValue(Status.ACTIVE).withNewValue(Status.DISABLED));
|
||||
|
||||
// Disabled webhook state is NOT_STARTED
|
||||
// Disabled webhook state is DISABLED
|
||||
getWebhook = updateAndCheckEntity(create, Response.Status.OK, ADMIN_AUTH_HEADERS, UpdateType.MINOR_UPDATE, change);
|
||||
assertEquals(Status.NOT_STARTED, getWebhook.getStatus());
|
||||
assertEquals(Status.DISABLED, getWebhook.getStatus());
|
||||
|
||||
// Disabled webhook state also records last successful time when event was sent
|
||||
getWebhook = getEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
|
||||
assertEquals(Status.NOT_STARTED, getWebhook.getStatus());
|
||||
assertEquals(Status.DISABLED, getWebhook.getStatus());
|
||||
assertEquals(details.getFirstEventTime(), getWebhook.getFailureDetails().getLastSuccessfulAt());
|
||||
|
||||
// Ensure callback back notification is disabled with no new events
|
||||
@ -172,7 +172,7 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
.withNewValue(create.getEndpoint()));
|
||||
change
|
||||
.getFieldsUpdated()
|
||||
.add(new FieldChange().withName("status").withOldValue(Status.FAILED).withNewValue(Status.STARTED));
|
||||
.add(new FieldChange().withName("status").withOldValue(Status.FAILED).withNewValue(Status.ACTIVE));
|
||||
webhook = updateAndCheckEntity(create, Response.Status.OK, ADMIN_AUTH_HEADERS, UpdateType.MINOR_UPDATE, change);
|
||||
|
||||
deleteEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
|
||||
@ -343,7 +343,7 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
|
||||
public void assertWebhookStatusSuccess(String name) throws HttpResponseException {
|
||||
Webhook webhook = getEntityByName(name, "", ADMIN_AUTH_HEADERS);
|
||||
assertEquals(Status.STARTED, webhook.getStatus());
|
||||
assertEquals(Status.ACTIVE, webhook.getStatus());
|
||||
assertNull(webhook.getFailureDetails());
|
||||
}
|
||||
|
||||
|
||||
@ -7,5 +7,5 @@ Provides metadata version information.
|
||||
|
||||
from incremental import Version
|
||||
|
||||
__version__ = Version("metadata", 0, 9, 0, dev=4)
|
||||
__version__ = Version("metadata", 0, 9, 0, dev=5)
|
||||
__all__ = ["__version__"]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user