diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java index ed8315a6d7..efc5da3fcc 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java @@ -16,7 +16,9 @@ import io.ebean.Database; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import jakarta.annotation.Nonnull; +import jakarta.annotation.PostConstruct; import java.util.UUID; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; @@ -42,6 +44,13 @@ public class UpgradeCliApplicationTestConfiguration { @MockBean public SearchClientShim searchClientShim; + @PostConstruct + public void configureMocks() { + // Configure SearchClientShim mock to return a valid engine type + Mockito.when(searchClientShim.getEngineType()) + .thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + } + @Primary @Bean public MeterRegistry meterRegistry() { diff --git a/docs/deploy/environment-vars.md b/docs/deploy/environment-vars.md index 4cfc008ec2..75b6de3243 100644 --- a/docs/deploy/environment-vars.md +++ b/docs/deploy/environment-vars.md @@ -245,6 +245,7 @@ Reference Links: | `AWS_REGION` | `null` | AWS region | GMS, MAE Consumer, MCE Consumer, System Update | | `ELASTICSEARCH_IMPLEMENTATION` | `elasticsearch` | Implementation (elasticsearch or opensearch) | GMS, MAE Consumer, MCE Consumer, System Update | | `ELASTIC_ID_HASH_ALGO` | `MD5` | ID hash algorithm | GMS, MAE Consumer, MCE Consumer, System Update | +| `ELASTICSEARCH_DATA_NODE_COUNT` | `1` | Number of Elasticsearch data nodes | GMS, MAE Consumer, MCE Consumer, System Update | #### SSL Context Configuration @@ -288,27 +289,27 @@ Reference Links: #### Build Indices Configuration -| Environment Variable | Default | Description | Components | -| ---------------------------------------------------------- | ------- | ----------------------------------------------------------- | ------------- | -| `ELASTICSEARCH_BUILD_INDICES_ALLOW_DOC_COUNT_MISMATCH` | `false` | Allow document count mismatch when clone indices is enabled | System Update | -| `ELASTICSEARCH_BUILD_INDICES_CLONE_INDICES` | `true` | Clone indices | System Update | -| `ELASTICSEARCH_BUILD_INDICES_RETENTION_UNIT` | `DAYS` | Retention unit for indices | System Update | -| `ELASTICSEARCH_BUILD_INDICES_RETENTION_VALUE` | `60` | Retention value for indices | System Update | -| `ELASTICSEARCH_BUILD_INDICES_REINDEX_OPTIMIZATION_ENABLED` | `true` | Enable reindex optimization | System Update | -| `ELASTICSEARCH_NUM_SHARDS_PER_INDEX` | `1` | Number of shards per index | System Update | -| `ELASTICSEARCH_NUM_REPLICAS_PER_INDEX` | `1` | Number of replicas per index | System Update | -| `ELASTICSEARCH_INDEX_BUILDER_NUM_RETRIES` | `3` | Index builder number of retries | System Update | -| `ELASTICSEARCH_INDEX_BUILDER_REFRESH_INTERVAL_SECONDS` | `3` | Index builder refresh interval | System Update | -| `SEARCH_DOCUMENT_MAX_ARRAY_LENGTH` | `1000` | Maximum array length in search documents | System Update | -| `SEARCH_DOCUMENT_MAX_OBJECT_KEYS` | `1000` | Maximum object keys in search documents | System Update | -| `SEARCH_DOCUMENT_MAX_VALUE_LENGTH` | `4096` | Maximum value length in search documents | System Update | -| `ELASTICSEARCH_MAIN_TOKENIZER` | `null` | Main tokenizer | System Update | -| `ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX` | `false` | Enable mappings reindex | System Update | -| `ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX` | `false` | Enable settings reindex | System Update | -| `ELASTICSEARCH_INDEX_BUILDER_MAX_REINDEX_HOURS` | `0` | Maximum reindex hours (0 = no timeout) | System Update | -| `ELASTICSEARCH_INDEX_BUILDER_SETTINGS_OVERRIDES` | `null` | Index builder settings overrides | System Update | -| `ELASTICSEARCH_MIN_SEARCH_FILTER_LENGTH` | `3` | Minimum search filter length | System Update | -| `ELASTICSEARCH_INDEX_BUILDER_ENTITY_SETTINGS_OVERRIDES` | `null` | Entity settings overrides | System Update | +| Environment Variable | Default | Description | Components | +| ---------------------------------------------------------- | -------------------------------- | ----------------------------------------------------------- | ------------- | +| `ELASTICSEARCH_BUILD_INDICES_ALLOW_DOC_COUNT_MISMATCH` | `false` | Allow document count mismatch when clone indices is enabled | System Update | +| `ELASTICSEARCH_BUILD_INDICES_CLONE_INDICES` | `true` | Clone indices | System Update | +| `ELASTICSEARCH_BUILD_INDICES_RETENTION_UNIT` | `DAYS` | Retention unit for indices | System Update | +| `ELASTICSEARCH_BUILD_INDICES_RETENTION_VALUE` | `60` | Retention value for indices | System Update | +| `ELASTICSEARCH_BUILD_INDICES_REINDEX_OPTIMIZATION_ENABLED` | `true` | Enable reindex optimization | System Update | +| `ELASTICSEARCH_NUM_SHARDS_PER_INDEX` | `${elasticsearch.dataNodeCount}` | Number of shards per index, defaults to dataNodeCount | System Update | +| `ELASTICSEARCH_NUM_REPLICAS_PER_INDEX` | `1` | Number of replicas per index | System Update | +| `ELASTICSEARCH_INDEX_BUILDER_NUM_RETRIES` | `3` | Index builder number of retries | System Update | +| `ELASTICSEARCH_INDEX_BUILDER_REFRESH_INTERVAL_SECONDS` | `3` | Index builder refresh interval | System Update | +| `SEARCH_DOCUMENT_MAX_ARRAY_LENGTH` | `1000` | Maximum array length in search documents | System Update | +| `SEARCH_DOCUMENT_MAX_OBJECT_KEYS` | `1000` | Maximum object keys in search documents | System Update | +| `SEARCH_DOCUMENT_MAX_VALUE_LENGTH` | `4096` | Maximum value length in search documents | System Update | +| `ELASTICSEARCH_MAIN_TOKENIZER` | `null` | Main tokenizer | System Update | +| `ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX` | `false` | Enable mappings reindex | System Update | +| `ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX` | `false` | Enable settings reindex | System Update | +| `ELASTICSEARCH_INDEX_BUILDER_MAX_REINDEX_HOURS` | `0` | Maximum reindex hours (0 = no timeout) | System Update | +| `ELASTICSEARCH_INDEX_BUILDER_SETTINGS_OVERRIDES` | `null` | Index builder settings overrides | System Update | +| `ELASTICSEARCH_MIN_SEARCH_FILTER_LENGTH` | `3` | Minimum search filter length | System Update | +| `ELASTICSEARCH_INDEX_BUILDER_ENTITY_SETTINGS_OVERRIDES` | `null` | Entity settings overrides | System Update | #### Search Configuration @@ -333,21 +334,21 @@ Reference Links: #### Graph Search Configuration -| Environment Variable | Default | Description | Components | -| ----------------------------------------------------------- | ------- | ------------------------------------------------------------------------------- | ---------- | -| `ELASTICSEARCH_SEARCH_GRAPH_TIMEOUT_SECONDS` | `50` | Graph DAO timeout seconds | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_BATCH_SIZE` | `1000` | Graph DAO batch size | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_MULTI_PATH_SEARCH` | `false` | Allow path retraversal for all paths | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_BOOST_VIA_NODES` | `true` | Boost graph edges with via nodes | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_STATUS_ENABLED` | `false` | Enable soft delete tracking of URNs on edges | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_LINEAGE_MAX_HOPS` | `20` | Maximum hops to traverse lineage graph | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_IMPACT_MAX_HOPS` | `1000` | Maximum hops to traverse for impact analysis (impact.maxHops) | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_IMPACT_MAX_RELATIONS` | `40000` | Maximum number of relationships for impact analysis (impact.maxRelations) | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_IMPACT_SLICES` | `2` | Number of slices for parallel search operations (impact.slices) | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_IMPACT_KEEP_ALIVE` | `5m` | Point-in-Time keepAlive duration for impact analysis queries (impact.keepAlive) | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_IMPACT_MAX_THREADS` | `32` | Maximum parallel lineage graph queries | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_QUERY_OPTIMIZATION` | `true` | Reduce query nesting if possible | GMS | -| `ELASTICSEARCH_SEARCH_GRAPH_POINT_IN_TIME_CREATION_ENABLED` | `true` | Enable creation of point in time snapshots for graph queries | GMS | +| Environment Variable | Default | Description | Components | +| ----------------------------------------------------------- | -------------------------------- | ----------------------------------------------------------------------------------------------------- | ---------- | +| `ELASTICSEARCH_SEARCH_GRAPH_TIMEOUT_SECONDS` | `50` | Graph DAO timeout seconds | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_BATCH_SIZE` | `1000` | Graph DAO batch size | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_MULTI_PATH_SEARCH` | `false` | Allow path retraversal for all paths | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_BOOST_VIA_NODES` | `true` | Boost graph edges with via nodes | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_STATUS_ENABLED` | `false` | Enable soft delete tracking of URNs on edges | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_LINEAGE_MAX_HOPS` | `20` | Maximum hops to traverse lineage graph | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_IMPACT_MAX_HOPS` | `1000` | Maximum hops to traverse for impact analysis (impact.maxHops) | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_IMPACT_MAX_RELATIONS` | `40000` | Maximum number of relationships for impact analysis (impact.maxRelations) | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_IMPACT_SLICES` | `${elasticsearch.dataNodeCount}` | Number of slices for parallel search operations (impact.slices), defaults to dataNodeCount, minimum 2 | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_IMPACT_KEEP_ALIVE` | `5m` | Point-in-Time keepAlive duration for impact analysis queries (impact.keepAlive) | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_IMPACT_MAX_THREADS` | `32` | Maximum parallel lineage graph queries | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_QUERY_OPTIMIZATION` | `true` | Reduce query nesting if possible | GMS | +| `ELASTICSEARCH_SEARCH_GRAPH_POINT_IN_TIME_CREATION_ENABLED` | `true` | Enable creation of point in time snapshots for graph queries | GMS | ### Neo4j Configuration diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index 75060de0db..ce1597d82e 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -17,10 +17,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.NotImplementedException; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.springframework.beans.factory.DisposableBean; /** A search DAO for Elasticsearch backend. */ @Slf4j -public class ESGraphQueryDAO implements GraphQueryDAO { +public class ESGraphQueryDAO implements GraphQueryDAO, DisposableBean { private final GraphQueryBaseDAO delegate; @Getter private final GraphServiceConfiguration graphServiceConfig; @@ -97,4 +98,12 @@ public class ESGraphQueryDAO implements GraphQueryDAO { SearchResponse executeSearch(@Nonnull SearchRequest searchRequest) { return delegate.executeSearch(searchRequest); } + + @Override + public void destroy() throws Exception { + // Shutdown the delegate if it's a GraphQueryPITDAO + if (delegate instanceof GraphQueryPITDAO) { + ((GraphQueryPITDAO) delegate).shutdown(); + } + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryBaseDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryBaseDAO.java index 4ed6242b4c..665d444c6b 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryBaseDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryBaseDAO.java @@ -1490,7 +1490,7 @@ public abstract class GraphQueryBaseDAO implements GraphQueryDAO { Set entityUrns) { int defaultPageSize = graphServiceConfig.getLimit().getResults().getApiDefault(); - int slices = config.getSearch().getGraph().getImpact().getSlices(); + int slices = Math.max(2, config.getSearch().getGraph().getImpact().getSlices()); return searchWithSlices( opContext, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryPITDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryPITDAO.java index 499d3d0adf..f5a3b51927 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryPITDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryPITDAO.java @@ -19,6 +19,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -36,6 +40,8 @@ public class GraphQueryPITDAO extends GraphQueryBaseDAO { @Getter private final SearchClientShim client; + final ExecutorService pitExecutor; + public GraphQueryPITDAO( SearchClientShim client, GraphServiceConfiguration graphServiceConfig, @@ -43,6 +49,46 @@ public class GraphQueryPITDAO extends GraphQueryBaseDAO { MetricUtils metricUtils) { super(graphServiceConfig, config, metricUtils); this.client = client; + + // Create dedicated thread pool for PIT operations + int maxThreads = config.getSearch().getGraph().getMaxThreads(); + this.pitExecutor = + new ThreadPoolExecutor( + maxThreads, // core pool size + maxThreads, // maximum pool size + 60L, + TimeUnit.SECONDS, // keep alive time + new LinkedBlockingQueue<>(maxThreads), // bounded queue for backpressure + r -> { + Thread t = new Thread(r, "pit-worker-" + System.currentTimeMillis()); + t.setDaemon(true); + return t; + }, + new ThreadPoolExecutor.CallerRunsPolicy() // backpressure: caller runs when queue full + ); + + log.info("Initialized PIT thread pool with {} threads and bounded queue", maxThreads); + } + + /** Shutdown the PIT executor service gracefully. */ + public void shutdown() { + if (pitExecutor != null && !pitExecutor.isShutdown()) { + log.info("Shutting down PIT thread pool"); + pitExecutor.shutdown(); + try { + if (!pitExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + log.warn("PIT thread pool did not terminate gracefully, forcing shutdown"); + pitExecutor.shutdownNow(); + if (!pitExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + log.error("PIT thread pool did not terminate after forced shutdown"); + } + } + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for PIT thread pool shutdown", e); + pitExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } } /** @@ -73,6 +119,7 @@ public class GraphQueryPITDAO extends GraphQueryBaseDAO { for (int sliceId = 0; sliceId < slices; sliceId++) { final int currentSliceId = sliceId; + CompletableFuture> sliceFuture = CompletableFuture.supplyAsync( () -> { @@ -91,7 +138,8 @@ public class GraphQueryPITDAO extends GraphQueryBaseDAO { slices, remainingTime, entityUrns); - }); + }, + pitExecutor); // Use dedicated thread pool with CallerRunsPolicy for backpressure sliceFutures.add(sliceFuture); } @@ -135,6 +183,12 @@ public class GraphQueryPITDAO extends GraphQueryBaseDAO { opContext.getSearchContext().getIndexConvention().getIndexName(INDEX_NAME)); while (sliceRelationships.size() < maxRelations) { + // Check for thread interruption (from future.cancel(true)) + if (Thread.currentThread().isInterrupted()) { + log.warn("Slice {} was interrupted, cleaning up PIT and stopping", sliceId); + throw new RuntimeException("Slice " + sliceId + " was interrupted"); + } + // Check timeout before processing if (remainingTime <= 0) { log.warn("Slice {} timed out, stopping PIT search", sliceId); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAORelationshipGroupQueryTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAORelationshipGroupQueryTest.java index 86d40d5ddf..19a58b0e1c 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAORelationshipGroupQueryTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAORelationshipGroupQueryTest.java @@ -33,6 +33,10 @@ import org.apache.lucene.search.TotalHits; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.RequestOptions; @@ -57,6 +61,24 @@ public class ESGraphQueryDAORelationshipGroupQueryTest { mockClient = mock(SearchClientShim.class); when(mockClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + // Mock PIT operations + CreatePitResponse mockCreatePitResponse = mock(CreatePitResponse.class); + when(mockCreatePitResponse.getId()).thenReturn("test-pit-id"); + try { + when(mockClient.createPit(any(CreatePitRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockCreatePitResponse); + } catch (IOException e) { + // This should not happen in tests + } + + DeletePitResponse mockDeletePitResponse = mock(DeletePitResponse.class); + try { + when(mockClient.deletePit(any(DeletePitRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockDeletePitResponse); + } catch (IOException e) { + // This should not happen in tests + } + // Create configuration with timeout and batch settings GraphQueryConfiguration graphConfig = GraphQueryConfiguration.builder() @@ -64,6 +86,7 @@ public class ESGraphQueryDAORelationshipGroupQueryTest { .batchSize(25) .enableMultiPathSearch(true) .boostViaNodes(true) + .maxThreads(1) // Ensure valid thread count for GraphQueryPITDAO .build(); LimitConfig limitConfig = @@ -575,6 +598,7 @@ public class ESGraphQueryDAORelationshipGroupQueryTest { .batchSize(25) .enableMultiPathSearch(true) // Enable multiple paths .queryOptimization(true) + .maxThreads(1) // Ensure valid thread count for GraphQueryPITDAO .build(); ElasticSearchConfiguration testESConfig = @@ -604,6 +628,7 @@ public class ESGraphQueryDAORelationshipGroupQueryTest { .timeoutSeconds(10) .batchSize(25) .enableMultiPathSearch(false) // Disable multiple paths + .maxThreads(1) // Ensure valid thread count for GraphQueryPITDAO .build(); ElasticSearchConfiguration testSinglePathConfig = @@ -1079,7 +1104,13 @@ public class ESGraphQueryDAORelationshipGroupQueryTest { .batchSize(25) .enableMultiPathSearch(true) .pointInTimeCreationEnabled(true) - .impact(ImpactConfiguration.builder().maxRelations(1000).maxHops(10).build()) + .maxThreads(1) // Ensure valid thread count for GraphQueryPITDAO + .impact( + ImpactConfiguration.builder() + .maxRelations(1000) + .maxHops(10) + .keepAlive("5m") + .build()) .build(); ElasticSearchConfiguration testESConfig = @@ -1143,7 +1174,13 @@ public class ESGraphQueryDAORelationshipGroupQueryTest { .batchSize(25) .enableMultiPathSearch(true) .pointInTimeCreationEnabled(true) - .impact(ImpactConfiguration.builder().maxRelations(1000).maxHops(10).build()) + .maxThreads(1) // Ensure valid thread count for GraphQueryPITDAO + .impact( + ImpactConfiguration.builder() + .maxRelations(1000) + .maxHops(10) + .keepAlive("5m") + .build()) .build(); ElasticSearchConfiguration testESConfig = diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java index 8025fda311..4a82fd9590 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java @@ -6,6 +6,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; @@ -14,6 +15,8 @@ import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.metadata.config.graph.GraphServiceConfiguration; import com.linkedin.metadata.config.search.ElasticSearchConfiguration; +import com.linkedin.metadata.config.search.GraphQueryConfiguration; +import com.linkedin.metadata.config.search.SearchConfiguration; import com.linkedin.metadata.graph.GraphFilters; import com.linkedin.metadata.graph.LineageGraphFilters; import com.linkedin.metadata.query.filter.SortCriterion; @@ -58,6 +61,16 @@ public class ESGraphQueryDAOTest { mockSortCriteria = Arrays.asList(mock(SortCriterion.class)); mockSearchRequest = mock(SearchRequest.class); mockSearchResponse = mock(SearchResponse.class); + + // Configure nested mock objects for ElasticSearchConfiguration + SearchConfiguration mockSearchConfig = mock(SearchConfiguration.class); + GraphQueryConfiguration mockGraphQueryConfig = mock(GraphQueryConfiguration.class); + + when(mockElasticSearchConfig.getSearch()).thenReturn(mockSearchConfig); + when(mockSearchConfig.getGraph()).thenReturn(mockGraphQueryConfig); + + // Configure GraphQueryConfiguration with valid values for thread pool creation + when(mockGraphQueryConfig.getMaxThreads()).thenReturn(1); } @Test @@ -397,4 +410,79 @@ public class ESGraphQueryDAOTest { .getSearchResponse( mockOperationContext, mockGraphFilters, mockSortCriteria, "scroll123", "5m", null); } + + @Test + public void testDestroyWithGraphQueryPITDAO() throws Exception { + // Test destroy() method when delegate is GraphQueryPITDAO + SearchClientShim testClient = mock(SearchClientShim.class); + when(testClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + + // Create a real GraphQueryPITDAO as delegate + GraphQueryPITDAO pitDAO = + new GraphQueryPITDAO(testClient, mockGraphServiceConfig, mockElasticSearchConfig, null); + + // Create ESGraphQueryDAO with GraphQueryPITDAO as delegate + ESGraphQueryDAO dao = + new ESGraphQueryDAO(testClient, mockGraphServiceConfig, mockElasticSearchConfig, null); + + // Use reflection to set the delegate to our GraphQueryPITDAO + java.lang.reflect.Field delegateField = ESGraphQueryDAO.class.getDeclaredField("delegate"); + delegateField.setAccessible(true); + delegateField.set(dao, pitDAO); + + // Verify the delegate is a GraphQueryPITDAO + GraphQueryBaseDAO actualDelegate = (GraphQueryBaseDAO) delegateField.get(dao); + assertTrue(actualDelegate instanceof GraphQueryPITDAO); + + // Call destroy() + dao.destroy(); + + // Verify that the pitExecutor is shutdown + assertTrue(pitDAO.pitExecutor.isShutdown(), "PIT executor should be shutdown after destroy()"); + assertTrue( + pitDAO.pitExecutor.isTerminated(), "PIT executor should be terminated after destroy()"); + } + + @Test + public void testDestroyWithNonGraphQueryPITDAO() throws Exception { + // Test destroy() method when delegate is NOT GraphQueryPITDAO + SearchClientShim testClient = mock(SearchClientShim.class); + when(testClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.ELASTICSEARCH_7); + + // Create ESGraphQueryDAO (which will have GraphQueryElasticsearch7DAO as delegate) + ESGraphQueryDAO dao = + new ESGraphQueryDAO(testClient, mockGraphServiceConfig, mockElasticSearchConfig, null); + + // Verify the delegate is NOT a GraphQueryPITDAO + java.lang.reflect.Field delegateField = ESGraphQueryDAO.class.getDeclaredField("delegate"); + delegateField.setAccessible(true); + GraphQueryBaseDAO actualDelegate = (GraphQueryBaseDAO) delegateField.get(dao); + assertFalse(actualDelegate instanceof GraphQueryPITDAO); + + // Call destroy() - should not throw exception + dao.destroy(); + + // Test passes if no exception is thrown + } + + @Test + public void testDestroyWithNullDelegate() throws Exception { + // Test destroy() method when delegate is null + SearchClientShim testClient = mock(SearchClientShim.class); + when(testClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + + // Create ESGraphQueryDAO + ESGraphQueryDAO dao = + new ESGraphQueryDAO(testClient, mockGraphServiceConfig, mockElasticSearchConfig, null); + + // Use reflection to set the delegate to null + java.lang.reflect.Field delegateField = ESGraphQueryDAO.class.getDeclaredField("delegate"); + delegateField.setAccessible(true); + delegateField.set(dao, null); + + // Call destroy() - should not throw exception + dao.destroy(); + + // Test passes if no exception is thrown + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/GraphQueryPITDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/GraphQueryPITDAOTest.java index 0851ef0487..ba9cb41187 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/GraphQueryPITDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/GraphQueryPITDAOTest.java @@ -15,9 +15,11 @@ import static io.datahubproject.test.search.SearchTestUtils.TEST_GRAPH_SERVICE_C import static io.datahubproject.test.search.SearchTestUtils.TEST_OS_SEARCH_CONFIG; import static io.datahubproject.test.search.SearchTestUtils.TEST_OS_SEARCH_CONFIG_NO_PIT; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,6 +58,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.lucene.search.TotalHits; import org.mockito.ArgumentCaptor; import org.opensearch.action.search.ClearScrollRequest; @@ -72,6 +76,7 @@ import org.opensearch.search.SearchHits; import org.opensearch.search.builder.SearchSourceBuilder; import org.skyscreamer.jsonassert.JSONAssert; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; public class GraphQueryPITDAOTest { @@ -80,6 +85,39 @@ public class GraphQueryPITDAOTest { "elasticsearch/sample_filters/lineage_query_filters_limited.json"; private static final String TEST_QUERY_FILE_FULL = "elasticsearch/sample_filters/lineage_query_filters_full.json"; + + // Track created DAOs for cleanup + private final List createdDAOs = new ArrayList<>(); + + /** Create a GraphQueryPITDAO and track it for cleanup */ + private GraphQueryPITDAO createTrackedDAO(SearchClientShim client) { + return createTrackedDAO(client, TEST_GRAPH_SERVICE_CONFIG, TEST_OS_SEARCH_CONFIG); + } + + /** Create a GraphQueryPITDAO with custom configs and track it for cleanup */ + private GraphQueryPITDAO createTrackedDAO( + SearchClientShim client, + GraphServiceConfiguration graphConfig, + ElasticSearchConfiguration esConfig) { + GraphQueryPITDAO dao = new GraphQueryPITDAO(client, graphConfig, esConfig, null); + createdDAOs.add(dao); + return dao; + } + + @AfterMethod + public void cleanup() { + // Shutdown all created DAOs to prevent thread pool leaks + for (GraphQueryPITDAO dao : createdDAOs) { + try { + dao.shutdown(); + } catch (Exception e) { + // Log but don't fail the test + System.err.println("Failed to shutdown DAO: " + e.getMessage()); + } + } + createdDAOs.clear(); + } + private static final String TEST_QUERY_FILE_FULL_EMPTY_FILTERS = "elasticsearch/sample_filters/lineage_query_filters_full_empty_filters.json"; private static final String TEST_QUERY_FILE_FULL_MULTIPLE_FILTERS = @@ -1941,4 +1979,277 @@ public class GraphQueryPITDAOTest { Assert.assertFalse(hasMessageInChain(e, "Point-in-Time creation is required")); } } + + @Test + public void testSearchSingleSliceWithPitThreadInterruption() throws Exception { + // Test that thread interruption is properly handled in searchSingleSliceWithPit + SearchClientShim mockClient = mock(SearchClientShim.class); + when(mockClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + + GraphQueryPITDAO dao = createTrackedDAO(mockClient); + + // Mock PIT creation + CreatePitResponse mockPitResponse = mock(CreatePitResponse.class); + when(mockPitResponse.getId()).thenReturn("test_pit_id"); + when(mockClient.createPit(any(CreatePitRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockPitResponse); + + // Create a thread that will be interrupted + Thread testThread = + new Thread( + () -> { + try { + Urn sourceUrn = UrnUtils.getUrn("urn:li:dataset:test-urn"); + LineageGraphFilters filters = + LineageGraphFilters.forEntityType( + operationContext.getLineageRegistry(), + DATASET_ENTITY_NAME, + LineageDirection.DOWNSTREAM); + + // Start the search operation + dao.getImpactLineage(operationContext, sourceUrn, filters, 1); + } catch (Exception e) { + // Expected to throw exception due to interruption + } + }); + + // Start the thread and then interrupt it + testThread.start(); + + // Give the thread a moment to start, then interrupt it + Thread.sleep(100); + testThread.interrupt(); + + // Wait for the thread to complete + testThread.join(5000); + + // Verify that the thread completed (either successfully or with exception) + Assert.assertFalse(testThread.isAlive(), "Test thread should have completed"); + } + + @Test + public void testSearchSingleSliceWithPitThreadInterruptionException() throws Exception { + // Test that the specific RuntimeException is thrown when thread is interrupted + SearchClientShim mockClient = mock(SearchClientShim.class); + when(mockClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + + GraphQueryPITDAO dao = createTrackedDAO(mockClient); + + // Mock PIT creation + CreatePitResponse mockPitResponse = mock(CreatePitResponse.class); + when(mockPitResponse.getId()).thenReturn("test_pit_id"); + when(mockClient.createPit(any(CreatePitRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockPitResponse); + + // Create a mock search response that will cause the method to enter the loop + SearchHit[] hits = createFakeLineageHits(1, "urn:li:dataset:test-urn", "dest", "DownstreamOf"); + SearchResponse searchResponse = createFakeSearchResponse(hits, 1); + + // Mock search to return the response, then throw interruption exception + when(mockClient.search(any(SearchRequest.class), eq(RequestOptions.DEFAULT))) + .thenAnswer( + invocation -> { + // Simulate thread interruption by checking Thread.currentThread().isInterrupted() + if (Thread.currentThread().isInterrupted()) { + throw new RuntimeException("Slice 0 was interrupted"); + } + return searchResponse; + }); + + Urn sourceUrn = UrnUtils.getUrn("urn:li:dataset:test-urn"); + LineageGraphFilters filters = + LineageGraphFilters.forEntityType( + operationContext.getLineageRegistry(), + DATASET_ENTITY_NAME, + LineageDirection.DOWNSTREAM); + + // Create a thread that will be interrupted + final RuntimeException[] caughtException = new RuntimeException[1]; + Thread testThread = + new Thread( + () -> { + try { + dao.getImpactLineage(operationContext, sourceUrn, filters, 1); + } catch (RuntimeException e) { + caughtException[0] = e; + } + }); + + // Start the thread and then interrupt it + testThread.start(); + + // Give the thread a moment to start, then interrupt it + Thread.sleep(100); + testThread.interrupt(); + + // Wait for the thread to complete + testThread.join(5000); + + // Verify that the specific interruption exception was caught + Assert.assertNotNull( + caughtException[0], "Expected RuntimeException to be thrown due to interruption"); + Assert.assertTrue( + caughtException[0].getMessage().contains("Failed to execute slice-based search"), + "Expected slice-based search failure message, got: " + caughtException[0].getMessage()); + } + + @Test + public void testShutdown() throws Exception { + // Test that shutdown method properly terminates the thread pool + SearchClientShim mockClient = mock(SearchClientShim.class); + when(mockClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + + GraphQueryPITDAO dao = createTrackedDAO(mockClient); + + // Verify thread pool is running + Assert.assertFalse(dao.pitExecutor.isShutdown(), "Thread pool should be running"); + + // Call shutdown + dao.shutdown(); + + // Verify thread pool is shutdown + Assert.assertTrue(dao.pitExecutor.isShutdown(), "Thread pool should be shutdown"); + Assert.assertTrue(dao.pitExecutor.isTerminated(), "Thread pool should be terminated"); + } + + @Test + public void testShutdownWithForcedTermination() throws Exception { + // Test shutdown when graceful termination fails and forced shutdown is needed + SearchClientShim mockClient = mock(SearchClientShim.class); + when(mockClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + + GraphQueryPITDAO dao = createTrackedDAO(mockClient); + + // Create a custom ExecutorService that simulates graceful shutdown failure + ExecutorService mockExecutor = mock(ExecutorService.class); + when(mockExecutor.isShutdown()).thenReturn(false); + when(mockExecutor.awaitTermination(30, TimeUnit.SECONDS)) + .thenReturn(false); // Graceful shutdown fails + when(mockExecutor.awaitTermination(10, TimeUnit.SECONDS)) + .thenReturn(true); // Forced shutdown succeeds + + // Replace the executor with our mock + java.lang.reflect.Field executorField = GraphQueryPITDAO.class.getDeclaredField("pitExecutor"); + executorField.setAccessible(true); + executorField.set(dao, mockExecutor); + + // Call shutdown + dao.shutdown(); + + // Verify that shutdownNow was called + verify(mockExecutor).shutdown(); + verify(mockExecutor).shutdownNow(); + verify(mockExecutor).awaitTermination(30, TimeUnit.SECONDS); + verify(mockExecutor).awaitTermination(10, TimeUnit.SECONDS); + } + + @Test + public void testShutdownWithFailedForcedTermination() throws Exception { + // Test shutdown when both graceful and forced termination fail + SearchClientShim mockClient = mock(SearchClientShim.class); + when(mockClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + + GraphQueryPITDAO dao = createTrackedDAO(mockClient); + + // Create a custom ExecutorService that simulates both graceful and forced shutdown failure + ExecutorService mockExecutor = mock(ExecutorService.class); + when(mockExecutor.isShutdown()).thenReturn(false); + when(mockExecutor.awaitTermination(30, TimeUnit.SECONDS)) + .thenReturn(false); // Graceful shutdown fails + when(mockExecutor.awaitTermination(10, TimeUnit.SECONDS)) + .thenReturn(false); // Forced shutdown also fails + + // Replace the executor with our mock + java.lang.reflect.Field executorField = GraphQueryPITDAO.class.getDeclaredField("pitExecutor"); + executorField.setAccessible(true); + executorField.set(dao, mockExecutor); + + // Call shutdown + dao.shutdown(); + + // Verify that shutdownNow was called and both awaitTermination calls were made + verify(mockExecutor).shutdown(); + verify(mockExecutor).shutdownNow(); + verify(mockExecutor).awaitTermination(30, TimeUnit.SECONDS); + verify(mockExecutor).awaitTermination(10, TimeUnit.SECONDS); + } + + @Test + public void testShutdownWithInterruptedException() throws Exception { + // Test shutdown when interrupted during awaitTermination + SearchClientShim mockClient = mock(SearchClientShim.class); + when(mockClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + + GraphQueryPITDAO dao = createTrackedDAO(mockClient); + + // Create a custom ExecutorService that throws InterruptedException + ExecutorService mockExecutor = mock(ExecutorService.class); + when(mockExecutor.isShutdown()).thenReturn(false); + when(mockExecutor.awaitTermination(30, TimeUnit.SECONDS)) + .thenThrow(new InterruptedException("Test interruption")); + + // Replace the executor with our mock + java.lang.reflect.Field executorField = GraphQueryPITDAO.class.getDeclaredField("pitExecutor"); + executorField.setAccessible(true); + executorField.set(dao, mockExecutor); + + // Call shutdown + dao.shutdown(); + + // Verify that shutdownNow was called and thread was interrupted + verify(mockExecutor).shutdown(); + verify(mockExecutor).shutdownNow(); + verify(mockExecutor).awaitTermination(30, TimeUnit.SECONDS); + + // Verify that the current thread was interrupted + Assert.assertTrue(Thread.currentThread().isInterrupted(), "Thread should be interrupted"); + + // Clear the interrupt flag for other tests + Thread.interrupted(); + } + + @Test + public void testShutdownWhenAlreadyShutdown() throws Exception { + // Test shutdown when executor is already shutdown + SearchClientShim mockClient = mock(SearchClientShim.class); + when(mockClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + + GraphQueryPITDAO dao = createTrackedDAO(mockClient); + + // Create a custom ExecutorService that is already shutdown + ExecutorService mockExecutor = mock(ExecutorService.class); + when(mockExecutor.isShutdown()).thenReturn(true); + + // Replace the executor with our mock + java.lang.reflect.Field executorField = GraphQueryPITDAO.class.getDeclaredField("pitExecutor"); + executorField.setAccessible(true); + executorField.set(dao, mockExecutor); + + // Call shutdown + dao.shutdown(); + + // Verify that no shutdown methods were called since it's already shutdown + verify(mockExecutor, never()).shutdown(); + verify(mockExecutor, never()).shutdownNow(); + verify(mockExecutor, never()).awaitTermination(anyLong(), any(TimeUnit.class)); + } + + @Test + public void testShutdownWhenExecutorIsNull() throws Exception { + // Test shutdown when executor is null + SearchClientShim mockClient = mock(SearchClientShim.class); + when(mockClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2); + + GraphQueryPITDAO dao = createTrackedDAO(mockClient); + + // Set executor to null + java.lang.reflect.Field executorField = GraphQueryPITDAO.class.getDeclaredField("pitExecutor"); + executorField.setAccessible(true); + executorField.set(dao, null); + + // Call shutdown - should not throw exception + dao.shutdown(); + + // Test passes if no exception is thrown + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java b/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java index 73be1f977c..6a0b28d845 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java @@ -553,6 +553,9 @@ public class PropertiesCollectorConfigurationTest extends AbstractTestNGSpringCo "elasticsearch.bulkProcessor.numRetries", "elasticsearch.bulkProcessor.refreshPolicy", "elasticsearch.bulkProcessor.requestsLimit", + "elasticsearch.bulkProcessor.sizeLimit", + "elasticsearch.bulkProcessor.threadCount", + "elasticsearch.dataNodeCount", "elasticsearch.bulkProcessor.retryInterval", "elasticsearch.connectionRequestTimeout", "elasticsearch.host", diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index c81aebec8e..95d89ba627 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -340,6 +340,7 @@ elasticsearch: opensearchUseAwsIamAuth: ${OPENSEARCH_USE_AWS_IAM_AUTH:false} region: ${AWS_REGION:#{null}} idHashAlgo: ${ELASTIC_ID_HASH_ALGO:MD5} + dataNodeCount: ${ELASTICSEARCH_DATA_NODE_COUNT:1} # Multi-client shim configuration shim: # Enable the search client shim (false = use legacy RestHighLevelClient) @@ -376,7 +377,7 @@ elasticsearch: enableBatchDelete: ${ES_BULK_ENABLE_BATCH_DELETE:false} index: prefix: ${INDEX_PREFIX:} - numShards: ${ELASTICSEARCH_NUM_SHARDS_PER_INDEX:1} + numShards: ${ELASTICSEARCH_NUM_SHARDS_PER_INDEX:${elasticsearch.dataNodeCount}} numReplicas: ${ELASTICSEARCH_NUM_REPLICAS_PER_INDEX:1} numRetries: ${ELASTICSEARCH_INDEX_BUILDER_NUM_RETRIES:3} refreshIntervalSeconds: ${ELASTICSEARCH_INDEX_BUILDER_REFRESH_INTERVAL_SECONDS:3} # increase to 30 if expected indexing rates to be greater than 100/s @@ -435,9 +436,9 @@ elasticsearch: impact: maxHops: ${ELASTICSEARCH_SEARCH_GRAPH_IMPACT_MAX_HOPS:1000} # the maximum hops to traverse for impact analysis maxRelations: ${ELASTICSEARCH_SEARCH_GRAPH_IMPACT_MAX_RELATIONS:40000} # maximum number of relationships - slices: ${ELASTICSEARCH_SEARCH_GRAPH_IMPACT_SLICES:2} # number of slices for parallel search operations - keepAlive: ${ELASTICSEARCH_SEARCH_GRAPH_IMPACT_KEEP_ALIVE:5m} # Point-in-Time keepAlive duration for impact analysis queries - maxThreads: ${ELASTICSEARCH_SEARCH_GRAPH_IMPACT_MAX_THREADS:32} # maximum parallel lineage graph queries + slices: ${ELASTICSEARCH_SEARCH_GRAPH_IMPACT_SLICES:${elasticsearch.dataNodeCount}} # number of slices for parallel search operations + keepAlive: ${ELASTICSEARCH_SEARCH_GRAPH_IMPACT_KEEP_ALIVE:55s} # Point-in-Time keepAlive duration for impact analysis queries + maxThreads: ${ELASTICSEARCH_SEARCH_GRAPH_IMPACT_MAX_THREADS:16} # maximum parallel lineage graph queries queryOptimization: ${ELASTICSEARCH_SEARCH_GRAPH_QUERY_OPTIMIZATION:true} # reduce query nesting if possible # TODO: Kafka topic convention diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java index 31753b9fe9..305ef7b862 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java @@ -35,7 +35,8 @@ public class ElasticSearchGraphServiceFactory { final ConfigurationProvider configurationProvider, final EntityRegistry entityRegistry, @Value("${elasticsearch.idHashAlgo}") final String idHashAlgo, - MetricUtils metricUtils) { + MetricUtils metricUtils, + @Qualifier("esGraphQueryDAO") final ESGraphQueryDAO esGraphQueryDAO) { LineageRegistry lineageRegistry = new LineageRegistry(entityRegistry); return new ElasticSearchGraphService( lineageRegistry, @@ -46,12 +47,19 @@ public class ElasticSearchGraphServiceFactory { components.getBulkProcessor(), components.getConfig().getBulkProcessor().getNumRetries(), configurationProvider.getElasticSearch().getSearch().getGraph()), - new ESGraphQueryDAO( - components.getSearchClient(), - configurationProvider.getGraphService(), - configurationProvider.getElasticSearch(), - metricUtils), + esGraphQueryDAO, components.getIndexBuilder(), idHashAlgo); } + + @Bean(name = "esGraphQueryDAO") + @Nonnull + protected ESGraphQueryDAO createESGraphQueryDAO( + final ConfigurationProvider configurationProvider, MetricUtils metricUtils) { + return new ESGraphQueryDAO( + components.getSearchClient(), + configurationProvider.getGraphService(), + configurationProvider.getElasticSearch(), + metricUtils); + } }