mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-03 12:08:31 +00:00
This commit is contained in:
parent
e6f686e857
commit
f668a58668
@ -106,10 +106,12 @@ public class WebhookPublisher extends AbstractEventPublisher {
|
||||
updateFilter(webhook.getEventFilters());
|
||||
}
|
||||
|
||||
private void setErrorStatus(Long attemptTime, Integer statusCode, String reason) throws IOException {
|
||||
private void setErrorStatus(Long attemptTime, Integer statusCode, String reason)
|
||||
throws IOException, InterruptedException {
|
||||
if (!attemptTime.equals(webhook.getFailureDetails().getLastFailedAt())) {
|
||||
setStatus(Webhook.Status.FAILED, attemptTime, statusCode, reason, null);
|
||||
}
|
||||
webhookRepository.deleteWebhookPublisher(webhook.getId());
|
||||
throw new RuntimeException(reason);
|
||||
}
|
||||
|
||||
@ -165,7 +167,8 @@ public class WebhookPublisher extends AbstractEventPublisher {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publish(EventResource.ChangeEventList list) throws EventPublisherException, IOException {
|
||||
public void publish(EventResource.ChangeEventList list)
|
||||
throws EventPublisherException, IOException, InterruptedException {
|
||||
long attemptTime = System.currentTimeMillis();
|
||||
try {
|
||||
String json = JsonUtils.pojoToJson(list);
|
||||
@ -189,11 +192,11 @@ public class WebhookPublisher extends AbstractEventPublisher {
|
||||
if (webhook.getStatus() != Webhook.Status.ACTIVE) {
|
||||
setStatus(Webhook.Status.ACTIVE, null, null, null, null);
|
||||
}
|
||||
// 3xx response/redirection is not allowed for callback. Set the webhook state as in error
|
||||
} else if (response.getStatus() >= 300 && response.getStatus() < 400) {
|
||||
// 3xx response/redirection is not allowed for callback. Set the webhook state as in error
|
||||
setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
|
||||
// 4xx, 5xx response retry delivering events after timeout
|
||||
} else if (response.getStatus() >= 300 && response.getStatus() < 600) {
|
||||
// 4xx, 5xx response retry delivering events after timeout
|
||||
setNextBackOff();
|
||||
setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
|
||||
Thread.sleep(currentBackoffTime);
|
||||
|
||||
@ -58,10 +58,6 @@ public final class CatalogExceptionMessage {
|
||||
return entityNotFound(entityType, id.toString());
|
||||
}
|
||||
|
||||
public static String entitiesNotFound(String entityType) {
|
||||
return String.format("%s instances not found", entityType);
|
||||
}
|
||||
|
||||
public static String readOnlyAttribute(String entityType, String attribute) {
|
||||
return String.format("%s attribute %s can't be modified", entityType, attribute);
|
||||
}
|
||||
@ -135,10 +131,6 @@ public final class CatalogExceptionMessage {
|
||||
return String.format("%s is not empty", entityType);
|
||||
}
|
||||
|
||||
public static String invalidEntity(String entity) {
|
||||
return String.format("Invalid entity %s", entity);
|
||||
}
|
||||
|
||||
public static String unknownCustomField(String fieldName) {
|
||||
return String.format("Unknown custom field %s", fieldName);
|
||||
}
|
||||
|
||||
@ -98,7 +98,7 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
BatchEventProcessor<ChangeEventHolder> processor = EventPubSub.addEventHandler(publisher);
|
||||
publisher.setProcessor(processor);
|
||||
webhookPublisherMap.put(webhook.getId(), publisher);
|
||||
LOG.info("Webhook subscription started for {}", webhook.getName());
|
||||
LOG.info("Webhook publisher subscription started for {}", webhook.getName());
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@ -128,14 +128,13 @@ public class WebhookRepository extends EntityRepository<Webhook> {
|
||||
}
|
||||
|
||||
public void deleteWebhookPublisher(UUID id) throws InterruptedException {
|
||||
WebhookPublisher publisher = webhookPublisherMap.get(id);
|
||||
WebhookPublisher publisher = webhookPublisherMap.remove(id);
|
||||
if (publisher != null) {
|
||||
publisher.getProcessor().halt();
|
||||
publisher.awaitShutdown();
|
||||
EventPubSub.removeProcessor(publisher.getProcessor());
|
||||
LOG.info("Webhook publisher deleted for {}", publisher.getWebhook().getName());
|
||||
}
|
||||
webhookPublisherMap.remove(id);
|
||||
}
|
||||
|
||||
public class WebhookUpdater extends EntityUpdater {
|
||||
|
||||
@ -37,7 +37,6 @@ import org.testcontainers.containers.JdbcDatabaseContainer;
|
||||
@Slf4j
|
||||
public abstract class OpenMetadataApplicationTest {
|
||||
protected static final String CONFIG_PATH = ResourceHelpers.resourceFilePath("openmetadata-secure-test.yaml");
|
||||
private static JdbcDatabaseContainer<?> SQL_CONTAINER;
|
||||
public static DropwizardAppExtension<OpenMetadataApplicationConfig> APP;
|
||||
protected static final WebhookCallbackResource webhookCallbackResource = new WebhookCallbackResource();
|
||||
public static final String FERNET_KEY_1 = "ihZpp5gmmDvVsgoOG6OVivKWwC9vd5JQ";
|
||||
@ -54,19 +53,19 @@ public abstract class OpenMetadataApplicationTest {
|
||||
final String jdbcContainerImage = System.getProperty("jdbcContainerImage");
|
||||
LOG.info("Using test container class {} and image {}", jdbcContainerClassName, jdbcContainerImage);
|
||||
|
||||
SQL_CONTAINER =
|
||||
JdbcDatabaseContainer<?> sqlContainer =
|
||||
(JdbcDatabaseContainer<?>)
|
||||
Class.forName(jdbcContainerClassName).getConstructor(String.class).newInstance(jdbcContainerImage);
|
||||
SQL_CONTAINER.withReuse(true);
|
||||
SQL_CONTAINER.withStartupTimeoutSeconds(240);
|
||||
SQL_CONTAINER.withConnectTimeoutSeconds(240);
|
||||
SQL_CONTAINER.start();
|
||||
sqlContainer.withReuse(true);
|
||||
sqlContainer.withStartupTimeoutSeconds(240);
|
||||
sqlContainer.withConnectTimeoutSeconds(240);
|
||||
sqlContainer.start();
|
||||
|
||||
final String migrationScripsLocation =
|
||||
ResourceHelpers.resourceFilePath("db/sql/" + SQL_CONTAINER.getDriverClassName());
|
||||
ResourceHelpers.resourceFilePath("db/sql/" + sqlContainer.getDriverClassName());
|
||||
Flyway flyway =
|
||||
Flyway.configure()
|
||||
.dataSource(SQL_CONTAINER.getJdbcUrl(), SQL_CONTAINER.getUsername(), SQL_CONTAINER.getPassword())
|
||||
.dataSource(sqlContainer.getJdbcUrl(), sqlContainer.getUsername(), sqlContainer.getPassword())
|
||||
.table("DATABASE_CHANGE_LOG")
|
||||
.locations("filesystem:" + migrationScripsLocation)
|
||||
.sqlMigrationPrefix("v")
|
||||
@ -79,10 +78,10 @@ public abstract class OpenMetadataApplicationTest {
|
||||
OpenMetadataApplication.class,
|
||||
CONFIG_PATH,
|
||||
// Database overrides
|
||||
ConfigOverride.config("database.driverClass", SQL_CONTAINER.getDriverClassName()),
|
||||
ConfigOverride.config("database.url", SQL_CONTAINER.getJdbcUrl()),
|
||||
ConfigOverride.config("database.user", SQL_CONTAINER.getUsername()),
|
||||
ConfigOverride.config("database.password", SQL_CONTAINER.getPassword()),
|
||||
ConfigOverride.config("database.driverClass", sqlContainer.getDriverClassName()),
|
||||
ConfigOverride.config("database.url", sqlContainer.getJdbcUrl()),
|
||||
ConfigOverride.config("database.user", sqlContainer.getUsername()),
|
||||
ConfigOverride.config("database.password", sqlContainer.getPassword()),
|
||||
// Migration overrides
|
||||
ConfigOverride.config("migrationConfiguration.path", migrationScripsLocation));
|
||||
|
||||
|
||||
@ -71,6 +71,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
@ -81,13 +82,13 @@ import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Predicate;
|
||||
import javax.json.JsonPatch;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.http.client.HttpResponseException;
|
||||
import org.awaitility.Awaitility;
|
||||
@ -1832,6 +1833,30 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
|
||||
validateChangeEvents(entityInterface, timestamp, expectedEventType, expectedChangeDescription, authHeaders, false);
|
||||
}
|
||||
|
||||
public static class EventHolder {
|
||||
@Getter ChangeEvent expectedEvent;
|
||||
|
||||
public boolean hasExpectedEvent(ResultList<ChangeEvent> changeEvents, long timestamp) {
|
||||
for (ChangeEvent event : listOrEmpty(changeEvents.getData())) {
|
||||
if (event.getTimestamp() == timestamp) {
|
||||
expectedEvent = event;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return expectedEvent != null;
|
||||
}
|
||||
|
||||
public boolean hasDeletedEvent(ResultList<ChangeEvent> changeEvents, UUID id) {
|
||||
for (ChangeEvent event : changeEvents.getData()) {
|
||||
if (event.getEntityId().equals(id)) {
|
||||
expectedEvent = event;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return expectedEvent != null;
|
||||
}
|
||||
}
|
||||
|
||||
private void validateChangeEvents(
|
||||
T entity,
|
||||
long timestamp,
|
||||
@ -1840,38 +1865,19 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
|
||||
Map<String, String> authHeaders,
|
||||
boolean withEventFilter)
|
||||
throws IOException {
|
||||
ResultList<ChangeEvent> changeEvents;
|
||||
ChangeEvent changeEvent = null;
|
||||
|
||||
int iteration = 0;
|
||||
while (changeEvent == null && iteration < 10) {
|
||||
iteration++;
|
||||
// Sometimes change event is not returned on quickly querying with a millisecond
|
||||
// Try multiple times before giving up
|
||||
if (withEventFilter) {
|
||||
// Get change event with an event filter for specific entity type
|
||||
changeEvents = getChangeEvents(entityType, entityType, null, timestamp, authHeaders);
|
||||
} else {
|
||||
// Get change event with no event filter for entity types
|
||||
changeEvents = getChangeEvents("*", "*", null, timestamp, authHeaders);
|
||||
}
|
||||
|
||||
if (changeEvents == null || changeEvents.getData().size() == 0) {
|
||||
ResultList<ChangeEvent> finalChangeEvents = changeEvents;
|
||||
Awaitility.await()
|
||||
.atLeast(iteration * 100L, TimeUnit.MILLISECONDS)
|
||||
.until(() -> finalChangeEvents != null && finalChangeEvents.getData().size() > 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (ChangeEvent event : changeEvents.getData()) {
|
||||
if (event.getTimestamp() == timestamp) {
|
||||
changeEvent = event;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Get change event with an event filter for specific entity type if withEventFilter is True. Else get all entities.
|
||||
String createdFilter = withEventFilter ? entityType : "*";
|
||||
String updatedFilter = withEventFilter ? entityType : "*";
|
||||
EventHolder eventHolder = new EventHolder();
|
||||
|
||||
Awaitility.await("Wait for expected change event at timestamp " + timestamp)
|
||||
.pollInterval(Duration.ofMillis(100L))
|
||||
.atMost(Duration.ofMillis(10 * 100L)) // 10 iterations
|
||||
.until(
|
||||
() ->
|
||||
eventHolder.hasExpectedEvent(
|
||||
getChangeEvents(createdFilter, updatedFilter, null, timestamp, authHeaders), timestamp));
|
||||
ChangeEvent changeEvent = eventHolder.getExpectedEvent();
|
||||
assertNotNull(
|
||||
changeEvent,
|
||||
"Expected change event "
|
||||
@ -1905,31 +1911,15 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
|
||||
}
|
||||
|
||||
private void validateDeletedEvent(
|
||||
UUID id, long timestamp, EventType expectedEventType, Double expectedVersion, Map<String, String> authHeaders)
|
||||
throws IOException {
|
||||
UUID id, long timestamp, EventType expectedEventType, Double expectedVersion, Map<String, String> authHeaders) {
|
||||
String updatedBy = SecurityUtil.getPrincipalName(authHeaders);
|
||||
ResultList<ChangeEvent> changeEvents;
|
||||
ChangeEvent changeEvent = null;
|
||||
EventHolder eventHolder = new EventHolder();
|
||||
|
||||
int iteration = 0;
|
||||
while (changeEvent == null && iteration < 25) {
|
||||
iteration++;
|
||||
changeEvents = getChangeEvents(null, null, entityType, timestamp, authHeaders);
|
||||
|
||||
if (changeEvents == null || changeEvents.getData().size() == 0) {
|
||||
ResultList<ChangeEvent> finalChangeEvents = changeEvents;
|
||||
Awaitility.await()
|
||||
.atMost(iteration * 10L, TimeUnit.MILLISECONDS)
|
||||
.until(() -> finalChangeEvents != null && finalChangeEvents.getData().size() > 0);
|
||||
continue;
|
||||
}
|
||||
for (ChangeEvent event : changeEvents.getData()) {
|
||||
if (event.getEntityId().equals(id)) {
|
||||
changeEvent = event;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Awaitility.await("Wait for expected deleted event at timestamp " + timestamp)
|
||||
.pollInterval(Duration.ofMillis(100L))
|
||||
.atMost(Duration.ofMillis(10 * 100L)) // 10 iterations
|
||||
.until(() -> eventHolder.hasDeletedEvent(getChangeEvents(null, null, entityType, timestamp, authHeaders), id));
|
||||
ChangeEvent changeEvent = eventHolder.getExpectedEvent();
|
||||
|
||||
assertNotNull(changeEvent, "Deleted event after " + timestamp + " was not found for entity " + id);
|
||||
assertEquals(expectedEventType, changeEvent.getEventType());
|
||||
|
||||
@ -75,7 +75,7 @@ public class WebhookCallbackResource {
|
||||
public Response receiveEventWithTimeout(
|
||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, ChangeEventList events) {
|
||||
addEventDetails("simulate-timeout", events);
|
||||
Awaitility.await().pollDelay(Duration.ofSeconds(15L)).untilTrue(new AtomicBoolean(true));
|
||||
Awaitility.await().pollDelay(Duration.ofSeconds(100L)).untilTrue(new AtomicBoolean(true));
|
||||
return Response.ok().build();
|
||||
}
|
||||
|
||||
|
||||
@ -80,7 +80,7 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
}
|
||||
|
||||
@Test
|
||||
void post_webhookEnabledStateChange(TestInfo test) throws IOException, InterruptedException {
|
||||
void post_webhookEnabledStateChange(TestInfo test) throws IOException {
|
||||
//
|
||||
// Create webhook in disabled state. It will not start webhook publisher
|
||||
//
|
||||
@ -112,7 +112,7 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
assertEquals(Status.ACTIVE, getWebhook.getStatus());
|
||||
|
||||
// Ensure the call back notification has started
|
||||
details = waitForFirstEvent(webhookName, 25, 100);
|
||||
details = waitForFirstEvent(webhookName, 25);
|
||||
assertEquals(1, details.getEvents().size());
|
||||
long lastSuccessfulEventTime = details.getLatestEventTime();
|
||||
FailureDetails failureDetails = new FailureDetails().withLastSuccessfulAt(lastSuccessfulEventTime);
|
||||
@ -138,8 +138,8 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
|
||||
// Ensure callback back notification is disabled with no new events
|
||||
int iterations = 0;
|
||||
while (iterations < 100) {
|
||||
Awaitility.await().atLeast(Duration.ofMillis(10L)).untilFalse(new AtomicBoolean(false));
|
||||
while (iterations < 10) {
|
||||
Awaitility.await().atLeast(Duration.ofMillis(100L)).untilTrue(new AtomicBoolean(true));
|
||||
iterations++;
|
||||
assertEquals(1, details.getEvents().size()); // Event counter remains the same
|
||||
}
|
||||
@ -147,36 +147,40 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
deleteEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
// TODO: Fix - This test is currently broken
|
||||
// @Test
|
||||
void put_updateEndpointURL(TestInfo test) throws IOException, InterruptedException {
|
||||
@Test
|
||||
void put_updateEndpointURL(TestInfo test) throws IOException {
|
||||
// Create webhook with invalid URL
|
||||
CreateWebhook create =
|
||||
createRequest("counter", "", "", null).withEnabled(true).withEndpoint(URI.create("http://invalidUnknowHost"));
|
||||
Webhook webhook = createAndCheckEntity(create, ADMIN_AUTH_HEADERS);
|
||||
|
||||
// Wait for webhook to be marked as failed
|
||||
int iteration = 0;
|
||||
while (iteration < 100) {
|
||||
Awaitility.await().atLeast(Duration.ofMillis(100L)).untilFalse(hasWebHookFailed(webhook.getId()));
|
||||
iteration++;
|
||||
}
|
||||
Awaitility.await()
|
||||
.pollInterval(Duration.ofMillis(100L))
|
||||
.atMost(Duration.ofMillis(100 * 100L))
|
||||
.untilTrue(hasWebHookFailed(webhook.getId()));
|
||||
Webhook getWebhook = getEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
|
||||
LOG.info("getWebhook {}", getWebhook);
|
||||
assertEquals(Status.FAILED, getWebhook.getStatus());
|
||||
|
||||
// Get webhook again to reflect the version change (when marked as failed)
|
||||
getWebhook = getEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
|
||||
FailureDetails failureDetails = getWebhook.getFailureDetails();
|
||||
|
||||
// Now change the webhook URL to a valid URL and ensure callbacks resume
|
||||
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/counter/" + test.getDisplayName();
|
||||
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/" + test.getDisplayName();
|
||||
create = create.withEndpoint(URI.create(baseUri));
|
||||
ChangeDescription change = getChangeDescription(getWebhook.getVersion());
|
||||
fieldUpdated(change, "endPoint", webhook.getEndpoint(), create.getEndpoint());
|
||||
fieldUpdated(change, "status", Status.FAILED, Status.ACTIVE);
|
||||
fieldDeleted(change, "failureDetails", JsonUtils.pojoToJson(failureDetails));
|
||||
|
||||
webhook = updateAndCheckEntity(create, Response.Status.OK, ADMIN_AUTH_HEADERS, UpdateType.MINOR_UPDATE, change);
|
||||
deleteEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
private AtomicBoolean hasWebHookFailed(UUID webhookId) throws HttpResponseException {
|
||||
Webhook getWebhook = getEntity(webhookId, ADMIN_AUTH_HEADERS);
|
||||
LOG.info("getWebhook {}", getWebhook);
|
||||
LOG.info("webhook status {}", getWebhook.getStatus());
|
||||
return new AtomicBoolean(getWebhook.getStatus() == Status.FAILED);
|
||||
}
|
||||
|
||||
@ -266,19 +270,24 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
if (expected == actual) {
|
||||
return;
|
||||
}
|
||||
if (fieldName.equals("eventFilters")) {
|
||||
List<EventFilter> expectedFilters = (List<EventFilter>) expected;
|
||||
List<EventFilter> actualFilters =
|
||||
JsonUtils.readValue(actual.toString(), new TypeReference<ArrayList<EventFilter>>() {});
|
||||
assertEquals(expectedFilters, actualFilters);
|
||||
} else if (fieldName.equals("endPoint")) {
|
||||
URI expectedEndpoint = (URI) expected;
|
||||
URI actualEndpoint = URI.create(actual.toString());
|
||||
assertEquals(expectedEndpoint, actualEndpoint);
|
||||
} else if (fieldName.equals("status")) {
|
||||
assertEquals(expected, Status.fromValue(actual.toString()));
|
||||
} else {
|
||||
assertCommonFieldChange(fieldName, expected, actual);
|
||||
switch (fieldName) {
|
||||
case "eventFilters":
|
||||
List<EventFilter> expectedFilters = (List<EventFilter>) expected;
|
||||
List<EventFilter> actualFilters =
|
||||
JsonUtils.readValue(actual.toString(), new TypeReference<ArrayList<EventFilter>>() {});
|
||||
assertEquals(expectedFilters, actualFilters);
|
||||
break;
|
||||
case "endPoint":
|
||||
URI expectedEndpoint = (URI) expected;
|
||||
URI actualEndpoint = URI.create(actual.toString());
|
||||
assertEquals(expectedEndpoint, actualEndpoint);
|
||||
break;
|
||||
case "status":
|
||||
assertEquals(expected, Status.fromValue(actual.toString()));
|
||||
break;
|
||||
default:
|
||||
assertCommonFieldChange(fieldName, expected, actual);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -322,38 +331,38 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
* At the end of the test, ensure all events are delivered over web subscription comparing it with number of events
|
||||
* stored in the system.
|
||||
*/
|
||||
public void validateWebhookEvents() throws HttpResponseException, InterruptedException {
|
||||
public void validateWebhookEvents() throws HttpResponseException {
|
||||
// Check the healthy callback server received all the change events
|
||||
EventDetails details = webhookCallbackResource.getEventDetails("healthy");
|
||||
assertNotNull(details);
|
||||
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = details.getEvents();
|
||||
assertNotNull(callbackEvents);
|
||||
assertNotNull(callbackEvents.peek());
|
||||
waitAndCheckForEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), callbackEvents, 15, 250);
|
||||
waitAndCheckForEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), callbackEvents, 40);
|
||||
assertWebhookStatusSuccess("healthy");
|
||||
}
|
||||
|
||||
/** At the end of the test, ensure all events are delivered for the combination of entity and eventTypes */
|
||||
public void validateWebhookEntityEvents(String entity) throws HttpResponseException, InterruptedException {
|
||||
public void validateWebhookEntityEvents(String entity) throws HttpResponseException {
|
||||
// Check the healthy callback server received all the change events
|
||||
// For the entity all the webhooks registered for created events have the right number of events
|
||||
List<ChangeEvent> callbackEvents =
|
||||
webhookCallbackResource.getEntityCallbackEvents(EventType.ENTITY_CREATED, entity);
|
||||
assertTrue(callbackEvents.size() > 1);
|
||||
long timestamp = callbackEvents.get(0).getTimestamp();
|
||||
waitAndCheckForEvents(entity, null, null, timestamp, callbackEvents, 30, 100);
|
||||
waitAndCheckForEvents(entity, null, null, timestamp, callbackEvents, 30);
|
||||
|
||||
// For the entity all the webhooks registered for updated events have the right number of events
|
||||
callbackEvents = webhookCallbackResource.getEntityCallbackEvents(EventType.ENTITY_UPDATED, entity);
|
||||
// Use previous date if no update events
|
||||
timestamp = callbackEvents.size() > 0 ? callbackEvents.get(0).getTimestamp() : timestamp;
|
||||
waitAndCheckForEvents(null, entity, null, timestamp, callbackEvents, 30, 100);
|
||||
waitAndCheckForEvents(null, entity, null, timestamp, callbackEvents, 30);
|
||||
|
||||
// TODO add delete event support
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDifferentTypesOfWebhooks() throws IOException, InterruptedException {
|
||||
void testDifferentTypesOfWebhooks() throws IOException {
|
||||
String baseUri = "http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook";
|
||||
|
||||
// Create multiple webhooks each with different type of response to callback
|
||||
@ -365,11 +374,11 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
Webhook w6 = createWebhook("invalidEndpoint", "http://invalidUnknownHost"); // Invalid URL
|
||||
|
||||
// Now check state of webhooks created
|
||||
EventDetails details = waitForFirstEvent("simulate-slowServer", 25, 100);
|
||||
EventDetails details = waitForFirstEvent("simulate-slowServer", 25);
|
||||
ConcurrentLinkedQueue<ChangeEvent> callbackEvents = details.getEvents();
|
||||
assertNotNull(callbackEvents.peek());
|
||||
|
||||
waitAndCheckForEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), callbackEvents, 30, 100);
|
||||
waitAndCheckForEvents("*", "*", "*", callbackEvents.peek().getTimestamp(), callbackEvents, 30);
|
||||
|
||||
// Check all webhook status
|
||||
assertWebhookStatusSuccess("slowServer");
|
||||
@ -411,49 +420,49 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
|
||||
assertEquals(failedReason, webhook.getFailureDetails().getLastFailedReason());
|
||||
}
|
||||
|
||||
private static AtomicBoolean receivedAllEvents(List<ChangeEvent> expected, Collection<ChangeEvent> callbackEvents) {
|
||||
LOG.info("expected size {} callback events size {}", expected.size(), callbackEvents.size());
|
||||
return new AtomicBoolean(expected.size() == callbackEvents.size());
|
||||
}
|
||||
|
||||
public void waitAndCheckForEvents(
|
||||
String entityCreated,
|
||||
String entityUpdated,
|
||||
String entityDeleted,
|
||||
long timestamp,
|
||||
Collection<ChangeEvent> received,
|
||||
int iteration,
|
||||
long sleepMillis)
|
||||
Collection<ChangeEvent> callbackEvents,
|
||||
int iteration)
|
||||
throws HttpResponseException {
|
||||
int i = 0;
|
||||
List<ChangeEvent> expected =
|
||||
getChangeEvents(entityCreated, entityUpdated, entityDeleted, timestamp, ADMIN_AUTH_HEADERS).getData();
|
||||
while (expected.size() < received.size() && i < iteration) {
|
||||
Awaitility.await().atLeast(Duration.ofMillis(sleepMillis)).untilFalse(new AtomicBoolean(false));
|
||||
i++;
|
||||
}
|
||||
// Refresh the expected events again by getting list of events to compare with webhook received events
|
||||
expected = getChangeEvents(entityCreated, entityUpdated, entityDeleted, timestamp, ADMIN_AUTH_HEADERS).getData();
|
||||
if (expected.size() != received.size()) {
|
||||
Awaitility.await()
|
||||
.pollInterval(Duration.ofMillis(100L))
|
||||
.atMost(Duration.ofMillis(iteration * 100L))
|
||||
.untilTrue(receivedAllEvents(expected, callbackEvents));
|
||||
if (expected.size() != callbackEvents.size()) { // Failed to receive all the events
|
||||
expected.forEach(
|
||||
c1 ->
|
||||
LOG.info(
|
||||
"expected {}:{}:{}:{}", c1.getTimestamp(), c1.getEventType(), c1.getEntityType(), c1.getEntityId()));
|
||||
received.forEach(
|
||||
callbackEvents.forEach(
|
||||
c1 ->
|
||||
LOG.info(
|
||||
"received {}:{}:{}:{}", c1.getTimestamp(), c1.getEventType(), c1.getEntityType(), c1.getEntityId()));
|
||||
}
|
||||
assertEquals(expected.size(), received.size());
|
||||
assertEquals(expected.size(), callbackEvents.size());
|
||||
}
|
||||
|
||||
public EventDetails waitForFirstEvent(String endpoint, int iteration, long sleepMillis) throws InterruptedException {
|
||||
public EventDetails waitForFirstEvent(String endpoint, int iteration) {
|
||||
Awaitility.await()
|
||||
.pollInterval(Duration.ofMillis(100L))
|
||||
.atMost(Duration.ofMillis(iteration * 100L))
|
||||
.untilFalse(hasEventOccurred(endpoint));
|
||||
EventDetails details = webhookCallbackResource.getEventDetails(endpoint);
|
||||
int i = 0;
|
||||
while (i < iteration) {
|
||||
Awaitility.await().atLeast(Duration.ofMillis(sleepMillis)).untilFalse(hasEventOccured(endpoint));
|
||||
i++;
|
||||
}
|
||||
LOG.info("Returning for endpoint {} eventDetails {}", endpoint, details);
|
||||
return details;
|
||||
}
|
||||
|
||||
private AtomicBoolean hasEventOccured(String endpoint) {
|
||||
private AtomicBoolean hasEventOccurred(String endpoint) {
|
||||
EventDetails details = webhookCallbackResource.getEventDetails(endpoint);
|
||||
return new AtomicBoolean(details != null && details.getEvents() != null && details.getEvents().size() <= 0);
|
||||
}
|
||||
|
||||
@ -14,7 +14,6 @@ import java.util.Date;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInfo;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.openmetadata.schema.api.security.jwt.JWTTokenConfiguration;
|
||||
import org.openmetadata.schema.entity.teams.User;
|
||||
@ -31,7 +30,7 @@ class JWTTokenGeneratorTest {
|
||||
protected JWTTokenGenerator jwtTokenGenerator;
|
||||
|
||||
@BeforeAll
|
||||
public void setup(TestInfo test) {
|
||||
public void setup() {
|
||||
jwtTokenConfiguration = new JWTTokenConfiguration();
|
||||
jwtTokenConfiguration.setJwtissuer("open-metadata.org");
|
||||
jwtTokenConfiguration.setRsaprivateKeyFilePath(rsaPrivateKeyPath);
|
||||
|
||||
@ -6,7 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import java.time.Duration;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.Assert;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.openmetadata.service.security.auth.LoginAttemptCache;
|
||||
|
||||
@ -28,7 +27,7 @@ public class LoginAttemptCacheTest {
|
||||
assertTrue(cache.isLoginBlocked(testKey));
|
||||
|
||||
// Check Eviction
|
||||
Awaitility.await().pollDelay(Duration.ofSeconds(2L)).untilAsserted(() -> Assert.assertTrue(true));
|
||||
Awaitility.await().pollDelay(Duration.ofSeconds(2L)).untilAsserted(() -> assertTrue(true));
|
||||
assertFalse(cache.isLoginBlocked(testKey));
|
||||
|
||||
// Check Successful Login
|
||||
@ -42,7 +41,7 @@ public class LoginAttemptCacheTest {
|
||||
assertFalse(cache.isLoginBlocked(testKey));
|
||||
|
||||
// Check Eviction
|
||||
Awaitility.await().pollDelay(Duration.ofSeconds(2L)).untilAsserted(() -> Assert.assertTrue(true));
|
||||
Awaitility.await().pollDelay(Duration.ofSeconds(2L)).untilAsserted(() -> assertTrue(true));
|
||||
assertFalse(cache.isLoginBlocked(testKey));
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user