mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-03 20:19:31 +00:00 
			
		
		
		
	ISSUE-1987: Replace thread.sleep with awaitility library (#8305)
* ISSUE-1987: Replace thread.sleep with awaitility library * ISSUE-1987: Replace thread.sleep with awaitility library * ISSUE-1987: increase polling period to 100ms Co-authored-by: ssundaresha2 <sandeep_sundaresha@intuit.com>
This commit is contained in:
		
							parent
							
								
									cf08637d5f
								
							
						
					
					
						commit
						e6f686e857
					
				@ -81,6 +81,7 @@ import java.util.Map.Entry;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Random;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.function.BiConsumer;
 | 
			
		||||
import java.util.function.Predicate;
 | 
			
		||||
import javax.json.JsonPatch;
 | 
			
		||||
@ -89,6 +90,7 @@ import javax.ws.rs.core.Response;
 | 
			
		||||
import javax.ws.rs.core.Response.Status;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.http.client.HttpResponseException;
 | 
			
		||||
import org.awaitility.Awaitility;
 | 
			
		||||
import org.junit.jupiter.api.AfterAll;
 | 
			
		||||
import org.junit.jupiter.api.BeforeAll;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
@ -1842,7 +1844,7 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
 | 
			
		||||
    ChangeEvent changeEvent = null;
 | 
			
		||||
 | 
			
		||||
    int iteration = 0;
 | 
			
		||||
    while (changeEvent == null && iteration < 25) {
 | 
			
		||||
    while (changeEvent == null && iteration < 10) {
 | 
			
		||||
      iteration++;
 | 
			
		||||
      // Sometimes change event is not returned on quickly querying with a millisecond
 | 
			
		||||
      // Try multiple times before giving up
 | 
			
		||||
@ -1855,11 +1857,10 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (changeEvents == null || changeEvents.getData().size() == 0) {
 | 
			
		||||
        try {
 | 
			
		||||
          Thread.sleep(iteration * 10L); // Sleep with backoff
 | 
			
		||||
        } catch (InterruptedException e) {
 | 
			
		||||
          e.printStackTrace();
 | 
			
		||||
        }
 | 
			
		||||
        ResultList<ChangeEvent> finalChangeEvents = changeEvents;
 | 
			
		||||
        Awaitility.await()
 | 
			
		||||
            .atLeast(iteration * 100L, TimeUnit.MILLISECONDS)
 | 
			
		||||
            .until(() -> finalChangeEvents != null && finalChangeEvents.getData().size() > 0);
 | 
			
		||||
        continue;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
@ -1916,11 +1917,10 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
 | 
			
		||||
      changeEvents = getChangeEvents(null, null, entityType, timestamp, authHeaders);
 | 
			
		||||
 | 
			
		||||
      if (changeEvents == null || changeEvents.getData().size() == 0) {
 | 
			
		||||
        try {
 | 
			
		||||
          Thread.sleep(iteration * 10L); // Sleep with backoff
 | 
			
		||||
        } catch (InterruptedException e) {
 | 
			
		||||
          e.printStackTrace();
 | 
			
		||||
        }
 | 
			
		||||
        ResultList<ChangeEvent> finalChangeEvents = changeEvents;
 | 
			
		||||
        Awaitility.await()
 | 
			
		||||
            .atMost(iteration * 10L, TimeUnit.MILLISECONDS)
 | 
			
		||||
            .until(() -> finalChangeEvents != null && finalChangeEvents.getData().size() > 0);
 | 
			
		||||
        continue;
 | 
			
		||||
      }
 | 
			
		||||
      for (ChangeEvent event : changeEvents.getData()) {
 | 
			
		||||
 | 
			
		||||
@ -4,10 +4,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 | 
			
		||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.time.Duration;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentLinkedQueue;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicBoolean;
 | 
			
		||||
import javax.ws.rs.Consumes;
 | 
			
		||||
import javax.ws.rs.HeaderParam;
 | 
			
		||||
import javax.ws.rs.POST;
 | 
			
		||||
@ -22,6 +24,7 @@ import javax.ws.rs.core.UriInfo;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.Setter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.awaitility.Awaitility;
 | 
			
		||||
import org.openmetadata.common.utils.CommonUtil;
 | 
			
		||||
import org.openmetadata.schema.type.ChangeEvent;
 | 
			
		||||
import org.openmetadata.schema.type.EventType;
 | 
			
		||||
@ -72,11 +75,7 @@ public class WebhookCallbackResource {
 | 
			
		||||
  public Response receiveEventWithTimeout(
 | 
			
		||||
      @Context UriInfo uriInfo, @Context SecurityContext securityContext, ChangeEventList events) {
 | 
			
		||||
    addEventDetails("simulate-timeout", events);
 | 
			
		||||
    try {
 | 
			
		||||
      Thread.sleep(15 * 1000);
 | 
			
		||||
    } catch (InterruptedException e) {
 | 
			
		||||
      e.printStackTrace();
 | 
			
		||||
    }
 | 
			
		||||
    Awaitility.await().pollDelay(Duration.ofSeconds(15L)).untilTrue(new AtomicBoolean(true));
 | 
			
		||||
    return Response.ok().build();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -25,16 +25,14 @@ import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS;
 | 
			
		||||
import com.fasterxml.jackson.core.type.TypeReference;
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.net.URI;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.time.Duration;
 | 
			
		||||
import java.util.*;
 | 
			
		||||
import java.util.concurrent.ConcurrentLinkedQueue;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicBoolean;
 | 
			
		||||
import javax.ws.rs.core.Response;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.http.client.HttpResponseException;
 | 
			
		||||
import org.awaitility.Awaitility;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.junit.jupiter.api.TestInfo;
 | 
			
		||||
import org.openmetadata.schema.api.events.CreateWebhook;
 | 
			
		||||
@ -141,7 +139,7 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
 | 
			
		||||
    // Ensure callback back notification is disabled with no new events
 | 
			
		||||
    int iterations = 0;
 | 
			
		||||
    while (iterations < 100) {
 | 
			
		||||
      Thread.sleep(10);
 | 
			
		||||
      Awaitility.await().atLeast(Duration.ofMillis(10L)).untilFalse(new AtomicBoolean(false));
 | 
			
		||||
      iterations++;
 | 
			
		||||
      assertEquals(1, details.getEvents().size()); // Event counter remains the same
 | 
			
		||||
    }
 | 
			
		||||
@ -158,14 +156,12 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
 | 
			
		||||
 | 
			
		||||
    // Wait for webhook to be marked as failed
 | 
			
		||||
    int iteration = 0;
 | 
			
		||||
    Webhook getWebhook = getEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
 | 
			
		||||
    LOG.info("getWebhook {}", getWebhook);
 | 
			
		||||
    while (getWebhook.getStatus() != Status.FAILED && iteration < 100) {
 | 
			
		||||
      getWebhook = getEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
 | 
			
		||||
      LOG.info("getWebhook {}", getWebhook);
 | 
			
		||||
      Thread.sleep(100);
 | 
			
		||||
    while (iteration < 100) {
 | 
			
		||||
      Awaitility.await().atLeast(Duration.ofMillis(100L)).untilFalse(hasWebHookFailed(webhook.getId()));
 | 
			
		||||
      iteration++;
 | 
			
		||||
    }
 | 
			
		||||
    Webhook getWebhook = getEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
 | 
			
		||||
    LOG.info("getWebhook {}", getWebhook);
 | 
			
		||||
    assertEquals(Status.FAILED, getWebhook.getStatus());
 | 
			
		||||
 | 
			
		||||
    // Now change the webhook URL to a valid URL and ensure callbacks resume
 | 
			
		||||
@ -178,6 +174,12 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
 | 
			
		||||
    deleteEntity(webhook.getId(), ADMIN_AUTH_HEADERS);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private AtomicBoolean hasWebHookFailed(UUID webhookId) throws HttpResponseException {
 | 
			
		||||
    Webhook getWebhook = getEntity(webhookId, ADMIN_AUTH_HEADERS);
 | 
			
		||||
    LOG.info("getWebhook {}", getWebhook);
 | 
			
		||||
    return new AtomicBoolean(getWebhook.getStatus() == Status.FAILED);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  void put_updateWebhookFilter(TestInfo test) throws IOException {
 | 
			
		||||
    String endpoint =
 | 
			
		||||
@ -362,8 +364,6 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
 | 
			
		||||
    Webhook w5 = createWebhook("callbackResponse500", baseUri + "/simulate/500"); // 5xx response
 | 
			
		||||
    Webhook w6 = createWebhook("invalidEndpoint", "http://invalidUnknownHost"); // Invalid URL
 | 
			
		||||
 | 
			
		||||
    Thread.sleep(10000);
 | 
			
		||||
 | 
			
		||||
    // Now check state of webhooks created
 | 
			
		||||
    EventDetails details = waitForFirstEvent("simulate-slowServer", 25, 100);
 | 
			
		||||
    ConcurrentLinkedQueue<ChangeEvent> callbackEvents = details.getEvents();
 | 
			
		||||
@ -419,12 +419,12 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
 | 
			
		||||
      Collection<ChangeEvent> received,
 | 
			
		||||
      int iteration,
 | 
			
		||||
      long sleepMillis)
 | 
			
		||||
      throws InterruptedException, HttpResponseException {
 | 
			
		||||
      throws HttpResponseException {
 | 
			
		||||
    int i = 0;
 | 
			
		||||
    List<ChangeEvent> expected =
 | 
			
		||||
        getChangeEvents(entityCreated, entityUpdated, entityDeleted, timestamp, ADMIN_AUTH_HEADERS).getData();
 | 
			
		||||
    while (expected.size() < received.size() && i < iteration) {
 | 
			
		||||
      Thread.sleep(sleepMillis);
 | 
			
		||||
      Awaitility.await().atLeast(Duration.ofMillis(sleepMillis)).untilFalse(new AtomicBoolean(false));
 | 
			
		||||
      i++;
 | 
			
		||||
    }
 | 
			
		||||
    // Refresh the expected events again by getting list of events to compare with webhook received events
 | 
			
		||||
@ -445,12 +445,16 @@ public class WebhookResourceTest extends EntityResourceTest<Webhook, CreateWebho
 | 
			
		||||
  public EventDetails waitForFirstEvent(String endpoint, int iteration, long sleepMillis) throws InterruptedException {
 | 
			
		||||
    EventDetails details = webhookCallbackResource.getEventDetails(endpoint);
 | 
			
		||||
    int i = 0;
 | 
			
		||||
    while ((details == null || details.getEvents() == null || details.getEvents().size() <= 0) && i < iteration) {
 | 
			
		||||
      details = webhookCallbackResource.getEventDetails(endpoint);
 | 
			
		||||
      Thread.sleep(sleepMillis);
 | 
			
		||||
    while (i < iteration) {
 | 
			
		||||
      Awaitility.await().atLeast(Duration.ofMillis(sleepMillis)).untilFalse(hasEventOccured(endpoint));
 | 
			
		||||
      i++;
 | 
			
		||||
    }
 | 
			
		||||
    LOG.info("Returning for endpoint {} eventDetails {}", endpoint, details);
 | 
			
		||||
    return details;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private AtomicBoolean hasEventOccured(String endpoint) {
 | 
			
		||||
    EventDetails details = webhookCallbackResource.getEventDetails(endpoint);
 | 
			
		||||
    return new AtomicBoolean(details != null && details.getEvents() != null && details.getEvents().size() <= 0);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -3,14 +3,17 @@ package org.openmetadata.service.util;
 | 
			
		||||
import static org.junit.jupiter.api.Assertions.assertFalse;
 | 
			
		||||
import static org.junit.jupiter.api.Assertions.assertTrue;
 | 
			
		||||
 | 
			
		||||
import java.time.Duration;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.awaitility.Awaitility;
 | 
			
		||||
import org.junit.Assert;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.openmetadata.service.security.auth.LoginAttemptCache;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class LoginAttemptCacheTest {
 | 
			
		||||
  @Test
 | 
			
		||||
  void testFailedLogin() throws InterruptedException {
 | 
			
		||||
  void testFailedLogin() {
 | 
			
		||||
    String testKey = "test";
 | 
			
		||||
    LoginAttemptCache cache = new LoginAttemptCache(3, 1);
 | 
			
		||||
 | 
			
		||||
@ -25,7 +28,7 @@ public class LoginAttemptCacheTest {
 | 
			
		||||
    assertTrue(cache.isLoginBlocked(testKey));
 | 
			
		||||
 | 
			
		||||
    // Check Eviction
 | 
			
		||||
    Thread.sleep(2000);
 | 
			
		||||
    Awaitility.await().pollDelay(Duration.ofSeconds(2L)).untilAsserted(() -> Assert.assertTrue(true));
 | 
			
		||||
    assertFalse(cache.isLoginBlocked(testKey));
 | 
			
		||||
 | 
			
		||||
    // Check Successful Login
 | 
			
		||||
@ -39,7 +42,7 @@ public class LoginAttemptCacheTest {
 | 
			
		||||
    assertFalse(cache.isLoginBlocked(testKey));
 | 
			
		||||
 | 
			
		||||
    // Check Eviction
 | 
			
		||||
    Thread.sleep(2000);
 | 
			
		||||
    Awaitility.await().pollDelay(Duration.ofSeconds(2L)).untilAsserted(() -> Assert.assertTrue(true));
 | 
			
		||||
    assertFalse(cache.isLoginBlocked(testKey));
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user