From 66c862fea9ee9c06d6095b100a371e09dda9177f Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 13 Sep 2021 23:37:29 +0200 Subject: [PATCH] test(metadata-io): Improve speed of ElasticSearch tests (#3160) --- .../metadata/ElasticSearchTestUtils.java | 84 +++++++++++++++++++ .../graph/ElasticSearchGraphServiceTest.java | 19 +++-- .../ElasticSearchServiceTest.java | 21 +++-- ...lasticSearchSystemMetadataServiceTest.java | 31 +++---- ...sticSearchTimeseriesAspectServiceTest.java | 6 +- 5 files changed, 127 insertions(+), 34 deletions(-) create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java diff --git a/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java new file mode 100644 index 0000000000..18779204f3 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java @@ -0,0 +1,84 @@ +package com.linkedin.metadata; + +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; +import org.testng.TestException; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public class ElasticSearchTestUtils { + + private ElasticSearchTestUtils() { + } + + public static void syncAfterWrite(RestHighLevelClient client) throws Exception { + syncAfterWrite(client, "test-sync-flag"); + } + + public static void syncAfterWrite(RestHighLevelClient searchClient, String indexName) throws Exception { + // we add some more data (a sync flag) and wait for it to appear + // we pick a random flag so that this can be used concurrently + String syncFlag = UUID.randomUUID().toString(); + + // add the flag and wait for it to appear, preferably to the indexed modified outside + addSyncFlag(searchClient, syncFlag, indexName); + waitForSyncFlag(searchClient, syncFlag, indexName, true); + + // flush changes for all indices in ES to disk + FlushResponse fResponse = searchClient.indices().flush(new FlushRequest(), RequestOptions.DEFAULT); + if (fResponse.getFailedShards() > 0) { + throw new RuntimeException("Failed to flush " + fResponse.getFailedShards() + " of " + fResponse.getTotalShards() + " shards"); + } + + // wait for all indices to be refreshed + RefreshResponse rResponse = searchClient.indices().refresh(new RefreshRequest(), RequestOptions.DEFAULT); + if (rResponse.getFailedShards() > 0) { + throw new RuntimeException("Failed to refresh " + rResponse.getFailedShards() + " of " + rResponse.getTotalShards() + " shards"); + } + + // remove the flag again and wait for it to disappear + removeSyncFlag(searchClient, syncFlag, indexName); + waitForSyncFlag(searchClient, syncFlag, indexName, false); + } + + private static void addSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) throws IOException { + String document = "{ }"; + final IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(document, XContentType.JSON); + final UpdateRequest updateRequest = new UpdateRequest(indexName, docId).doc(document, XContentType.JSON) + .detectNoop(false) + .upsert(indexRequest); + searchClient.update(updateRequest, RequestOptions.DEFAULT); + } + + private static void removeSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) throws IOException { + final DeleteRequest deleteRequest = new DeleteRequest(indexName).id(docId); + searchClient.delete(deleteRequest, RequestOptions.DEFAULT); + } + + private static void waitForSyncFlag(RestHighLevelClient searchClient, String docId, String indexName, boolean toExist) + throws IOException, InterruptedException { + GetRequest request = new GetRequest(indexName).id(docId); + long timeout = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS); + while (System.currentTimeMillis() < timeout) { + GetResponse response = searchClient.get(request, RequestOptions.DEFAULT); + if (response.isExists() == toExist) { + return; + } + TimeUnit.MILLISECONDS.sleep(50); + } + throw new TestException("Waiting for sync timed out"); + } + +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java index 1d8e1e29ee..8d3a8125aa 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java @@ -1,6 +1,6 @@ package com.linkedin.metadata.graph; -import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.ElasticSearchTestUtils; import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO; import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO; import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; @@ -17,28 +17,28 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeTest; import javax.annotation.Nonnull; -import java.net.URISyntaxException; -import java.util.concurrent.TimeUnit; + +import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.INDEX_NAME; public class ElasticSearchGraphServiceTest extends GraphServiceTestBase { private ElasticsearchContainer _elasticsearchContainer; private RestHighLevelClient _searchClient; - private IndexConvention _indexConvention; + private final IndexConvention _indexConvention = new IndexConventionImpl(null); + private final String _indexName = _indexConvention.getIndexName(INDEX_NAME); private ElasticSearchGraphService _client; private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:7.9.3"; private static final int HTTP_PORT = 9200; @BeforeMethod - public void wipe() throws URISyntaxException { - _client.removeNode(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)")); - _client.removeNode(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)")); + public void wipe() throws Exception { + _client.clear(); + syncAfterWrite(); } @BeforeTest public void setup() { - _indexConvention = new IndexConventionImpl(null); _elasticsearchContainer = new ElasticsearchContainer(IMAGE_NAME); _elasticsearchContainer.start(); _searchClient = buildRestClient(); @@ -78,6 +78,7 @@ public class ElasticSearchGraphServiceTest extends GraphServiceTestBase { @Override protected void syncAfterWrite() throws Exception { - TimeUnit.SECONDS.sleep(5); + ElasticSearchTestUtils.syncAfterWrite(_searchClient, _indexName); } + } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchServiceTest.java index 74eff01ef1..d5a70c0659 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchServiceTest.java @@ -17,7 +17,6 @@ import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import java.util.Collections; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.apache.http.HttpHost; import org.apache.http.impl.nio.reactor.IOReactorConfig; @@ -26,11 +25,13 @@ import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite; public class ElasticSearchServiceTest { @@ -54,6 +55,13 @@ public class ElasticSearchServiceTest { _elasticsearchContainer.start(); _searchClient = buildRestClient(); _elasticSearchService = buildService(); + _elasticSearchService.configure(); + } + + @BeforeMethod + public void wipe() throws Exception { + _elasticSearchService.clear(); + syncAfterWrite(_searchClient); } @Nonnull @@ -84,9 +92,7 @@ public class ElasticSearchServiceTest { } @Test - public void testElasticSearchService() throws InterruptedException { - _elasticSearchService.configure(); - + public void testElasticSearchService() throws Exception { SearchResult searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10); assertEquals(searchResult.getNumEntities().intValue(), 0); BrowseResult browseResult = _elasticSearchService.browse(ENTITY_NAME, "", null, 0, 10); @@ -100,7 +106,8 @@ public class ElasticSearchServiceTest { document.set("textFieldOverride", JsonNodeFactory.instance.textNode("textFieldOverride")); document.set("browsePaths", JsonNodeFactory.instance.textNode("/a/b/c")); _elasticSearchService.upsertDocument(ENTITY_NAME, document.toString(), urn.toString()); - TimeUnit.SECONDS.sleep(5); + syncAfterWrite(_searchClient); + searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10); assertEquals(searchResult.getNumEntities().intValue(), 1); assertEquals(searchResult.getEntities().get(0), urn); @@ -119,7 +126,7 @@ public class ElasticSearchServiceTest { document2.set("textFieldOverride", JsonNodeFactory.instance.textNode("textFieldOverride2")); document2.set("browsePaths", JsonNodeFactory.instance.textNode("/b/c")); _elasticSearchService.upsertDocument(ENTITY_NAME, document2.toString(), urn2.toString()); - TimeUnit.SECONDS.sleep(5); + syncAfterWrite(_searchClient); searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10); assertEquals(searchResult.getNumEntities().intValue(), 1); @@ -135,7 +142,7 @@ public class ElasticSearchServiceTest { _elasticSearchService.deleteDocument(ENTITY_NAME, urn.toString()); _elasticSearchService.deleteDocument(ENTITY_NAME, urn2.toString()); - TimeUnit.SECONDS.sleep(5); + syncAfterWrite(_searchClient); searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10); assertEquals(searchResult.getNumEntities().intValue(), 0); browseResult = _elasticSearchService.browse(ENTITY_NAME, "", null, 0, 10); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataServiceTest.java index 1d9163e5d6..9b0154d4d7 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataServiceTest.java @@ -5,9 +5,7 @@ import com.linkedin.metadata.run.IngestionRunSummary; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import com.linkedin.mxe.SystemMetadata; -import java.net.URISyntaxException; import java.util.List; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.apache.http.HttpHost; import org.apache.http.impl.nio.reactor.IOReactorConfig; @@ -20,26 +18,23 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; +import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite; +import static com.linkedin.metadata.systemmetadata.ElasticSearchSystemMetadataService.INDEX_NAME; import static org.testng.Assert.*; public class ElasticSearchSystemMetadataServiceTest { private ElasticsearchContainer _elasticsearchContainer; private RestHighLevelClient _searchClient; - private IndexConvention _indexConvention; + private final IndexConvention _indexConvention = new IndexConventionImpl(null); + private final String _indexName = _indexConvention.getIndexName(INDEX_NAME); private ElasticSearchSystemMetadataService _client; private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:7.9.3"; private static final int HTTP_PORT = 9200; - @BeforeMethod - public void wipe() throws URISyntaxException { - _client.clear(); - } - @BeforeTest public void setup() { - _indexConvention = new IndexConventionImpl(null); _elasticsearchContainer = new ElasticsearchContainer(IMAGE_NAME); _elasticsearchContainer.start(); _searchClient = buildRestClient(); @@ -47,6 +42,12 @@ public class ElasticSearchSystemMetadataServiceTest { _client.configure(); } + @BeforeMethod + public void wipe() throws Exception { + _client.clear(); + syncAfterWrite(_searchClient, _indexName); + } + @Nonnull private RestHighLevelClient buildRestClient() { final RestClientBuilder builder = @@ -88,7 +89,7 @@ public class ElasticSearchSystemMetadataServiceTest { _client.insert(metadata2, "urn:li:chart:2", "chartKey"); _client.insert(metadata2, "urn:li:chart:2", "Ownership"); - TimeUnit.SECONDS.sleep(5); + syncAfterWrite(_searchClient, _indexName); List runs = _client.listRuns(0, 20); @@ -117,7 +118,7 @@ public class ElasticSearchSystemMetadataServiceTest { _client.insert(metadata2, "urn:li:chart:2", "chartKey"); _client.insert(metadata2, "urn:li:chart:2", "Ownership"); - TimeUnit.SECONDS.sleep(5); + syncAfterWrite(_searchClient, _indexName); List runs = _client.listRuns(0, 20); @@ -146,7 +147,7 @@ public class ElasticSearchSystemMetadataServiceTest { _client.insert(metadata2, "urn:li:chart:2", "chartKey"); _client.insert(metadata2, "urn:li:chart:2", "Ownership"); - TimeUnit.SECONDS.sleep(5); + syncAfterWrite(_searchClient, _indexName); List rows = _client.findByRunId("abc-456"); @@ -174,11 +175,11 @@ public class ElasticSearchSystemMetadataServiceTest { _client.insert(metadata2, "urn:li:chart:2", "chartKey"); _client.insert(metadata2, "urn:li:chart:2", "Ownership"); - TimeUnit.SECONDS.sleep(5); + syncAfterWrite(_searchClient, _indexName); _client.deleteUrn("urn:li:chart:1"); - TimeUnit.SECONDS.sleep(5); + syncAfterWrite(_searchClient, _indexName); List rows = _client.findByRunId("abc-456"); @@ -190,7 +191,7 @@ public class ElasticSearchSystemMetadataServiceTest { public void testInsertNullData() throws Exception { _client.insert(null, "urn:li:chart:1", "chartKey"); - TimeUnit.SECONDS.sleep(5); + syncAfterWrite(_searchClient, _indexName); List runs = _client.listRuns(0, 20); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java index 5e6a1a653d..d06b4a6f65 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java @@ -18,7 +18,6 @@ import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import java.util.Calendar; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -35,6 +34,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.*; +import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite; public class ElasticSearchTimeseriesAspectServiceTest { @@ -122,7 +122,7 @@ public class ElasticSearchTimeseriesAspectServiceTest { } @Test(groups = "upsert") - public void testUpsertProfiles() throws InterruptedException { + public void testUpsertProfiles() throws Exception { // Create the testEntity profiles that we would like to use for testing. _startTime = Calendar.getInstance().getTimeInMillis(); @@ -146,7 +146,7 @@ public class ElasticSearchTimeseriesAspectServiceTest { } }); - TimeUnit.SECONDS.sleep(5); + syncAfterWrite(_searchClient); } @Test(groups = "query", dependsOnGroups = "upsert")