refactor(queries): refactor query edges (#14095)

This commit is contained in:
david-leifker 2025-07-15 17:40:11 -05:00 committed by GitHub
parent 9d8bbd6829
commit f12f8e37d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 1997 additions and 239 deletions

View File

@ -1,28 +0,0 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.browsepaths.BackfillBrowsePathsV2;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class BackfillBrowsePathsV2Config {
@Bean
public NonBlockingSystemUpgrade backfillBrowsePathsV2(
final OperationContext opContext,
EntityService<?> entityService,
SearchService searchService,
@Value("${systemUpdate.browsePathsV2.enabled}") final boolean enabled,
@Value("${systemUpdate.browsePathsV2.reprocess.enabled}") final boolean reprocessEnabled,
@Value("${systemUpdate.browsePathsV2.batchSize}") final Integer batchSize) {
return new BackfillBrowsePathsV2(
opContext, entityService, searchService, enabled, reprocessEnabled, batchSize);
}
}

View File

@ -1,43 +0,0 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import io.datahubproject.metadata.context.OperationContext;
import org.opensearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class BackfillDataProcessInstancesConfig {
@Bean
public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents(
final OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
@Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled,
@Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}")
boolean reprocessEnabled,
@Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize,
@Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs,
@Value("${systemUpdate.processInstanceHasRunEvents.totalDays}") Integer totalDays,
@Value("${systemUpdate.processInstanceHasRunEvents.windowDays}") Integer windowDays) {
return new BackfillDataProcessInstances(
opContext,
entityService,
elasticSearchService,
restHighLevelClient,
enabled,
reprocessEnabled,
batchSize,
delayMs,
totalDays,
windowDays);
}
}

View File

@ -1,29 +0,0 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.ingestion.BackfillIngestionSourceInfoIndices;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class BackfillIngestionSourceInfoIndicesConfig {
@Bean
public NonBlockingSystemUpgrade backfillIngestionSourceInfoIndices(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${systemUpdate.ingestionIndices.enabled}") final boolean enabled,
@Value("${systemUpdate.ingestionIndices.batchSize}") final Integer batchSize,
@Value("${systemUpdate.ingestionIndices.delayMs}") final Integer delayMs,
@Value("${systemUpdate.ingestionIndices.limit}") final Integer limit) {
return new BackfillIngestionSourceInfoIndices(
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
}

View File

@ -1,27 +0,0 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.policyfields.BackfillPolicyFields;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class BackfillPolicyFieldsConfig {
@Bean
public BackfillPolicyFields backfillPolicyFields(
final OperationContext opContext,
EntityService<?> entityService,
SearchService searchService,
@Value("${systemUpdate.policyFields.enabled}") final boolean enabled,
@Value("${systemUpdate.policyFields.reprocess.enabled}") final boolean reprocessEnabled,
@Value("${systemUpdate.policyFields.batchSize}") final Integer batchSize) {
return new BackfillPolicyFields(
opContext, entityService, searchService, enabled, reprocessEnabled, batchSize);
}
}

View File

@ -0,0 +1,145 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.browsepaths.BackfillBrowsePathsV2;
import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances;
import com.linkedin.datahub.upgrade.system.entities.RemoveQueryEdges;
import com.linkedin.datahub.upgrade.system.ingestion.BackfillIngestionSourceInfoIndices;
import com.linkedin.datahub.upgrade.system.policyfields.BackfillPolicyFields;
import com.linkedin.datahub.upgrade.system.schemafield.GenerateSchemaFieldsFromSchemaMetadata;
import com.linkedin.datahub.upgrade.system.schemafield.MigrateSchemaFieldDocIds;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.config.search.BulkDeleteConfiguration;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO;
import io.datahubproject.metadata.context.OperationContext;
import org.opensearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class NonBlockingConfigs {
@Bean
public NonBlockingSystemUpgrade removeQueryEdges(
final OperationContext opContext,
final ConfigurationProvider configurationProvider,
EntityService<?> entityService,
ESWriteDAO esWriteDao,
@Value("${systemUpdate.removeQueryEdges.enabled}") final boolean enabled,
@Value("${systemUpdate.removeQueryEdges.numRetries}") final int numRetries) {
BulkDeleteConfiguration override =
configurationProvider.getElasticSearch().getBulkDelete().toBuilder()
.numRetries(numRetries)
.build();
return new RemoveQueryEdges(opContext, entityService, esWriteDao, enabled, override);
}
@Bean
public NonBlockingSystemUpgrade backfillBrowsePathsV2(
final OperationContext opContext,
EntityService<?> entityService,
SearchService searchService,
@Value("${systemUpdate.browsePathsV2.enabled}") final boolean enabled,
@Value("${systemUpdate.browsePathsV2.reprocess.enabled}") final boolean reprocessEnabled,
@Value("${systemUpdate.browsePathsV2.batchSize}") final Integer batchSize) {
return new BackfillBrowsePathsV2(
opContext, entityService, searchService, enabled, reprocessEnabled, batchSize);
}
@Bean
public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents(
final OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
@Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled,
@Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}")
boolean reprocessEnabled,
@Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize,
@Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs,
@Value("${systemUpdate.processInstanceHasRunEvents.totalDays}") Integer totalDays,
@Value("${systemUpdate.processInstanceHasRunEvents.windowDays}") Integer windowDays) {
return new BackfillDataProcessInstances(
opContext,
entityService,
elasticSearchService,
restHighLevelClient,
enabled,
reprocessEnabled,
batchSize,
delayMs,
totalDays,
windowDays);
}
@Bean
public NonBlockingSystemUpgrade backfillIngestionSourceInfoIndices(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${systemUpdate.ingestionIndices.enabled}") final boolean enabled,
@Value("${systemUpdate.ingestionIndices.batchSize}") final Integer batchSize,
@Value("${systemUpdate.ingestionIndices.delayMs}") final Integer delayMs,
@Value("${systemUpdate.ingestionIndices.limit}") final Integer limit) {
return new BackfillIngestionSourceInfoIndices(
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
@Bean
public BackfillPolicyFields backfillPolicyFields(
final OperationContext opContext,
EntityService<?> entityService,
SearchService searchService,
@Value("${systemUpdate.policyFields.enabled}") final boolean enabled,
@Value("${systemUpdate.policyFields.reprocess.enabled}") final boolean reprocessEnabled,
@Value("${systemUpdate.policyFields.batchSize}") final Integer batchSize) {
return new BackfillPolicyFields(
opContext, entityService, searchService, enabled, reprocessEnabled, batchSize);
}
@Bean
public NonBlockingSystemUpgrade schemaFieldsFromSchemaMetadata(
@Qualifier("systemOperationContext") final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
// SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_ENABLED
@Value("${systemUpdate.schemaFieldsFromSchemaMetadata.enabled}") final boolean enabled,
// SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_BATCH_SIZE
@Value("${systemUpdate.schemaFieldsFromSchemaMetadata.batchSize}") final Integer batchSize,
// SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_DELAY_MS
@Value("${systemUpdate.schemaFieldsFromSchemaMetadata.delayMs}") final Integer delayMs,
// SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_LIMIT
@Value("${systemUpdate.schemaFieldsFromSchemaMetadata.limit}") final Integer limit) {
return new GenerateSchemaFieldsFromSchemaMetadata(
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
@Bean
public NonBlockingSystemUpgrade schemaFieldsDocIds(
@Qualifier("systemOperationContext") final OperationContext opContext,
@Qualifier("baseElasticSearchComponents")
final BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components,
final EntityService<?> entityService,
// ELASTICSEARCH_INDEX_DOC_IDS_SCHEMA_FIELD_HASH_ID_ENABLED
@Value("${elasticsearch.index.docIds.schemaField.hashIdEnabled}") final boolean hashEnabled,
// SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_ENABLED
@Value("${systemUpdate.schemaFieldsDocIds.enabled}") final boolean enabled,
// SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_BATCH_SIZE
@Value("${systemUpdate.schemaFieldsDocIds.batchSize}") final Integer batchSize,
// SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_DELAY_MS
@Value("${systemUpdate.schemaFieldsDocIds.delayMs}") final Integer delayMs,
// SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_LIMIT
@Value("${systemUpdate.schemaFieldsDocIds.limit}") final Integer limit) {
return new MigrateSchemaFieldDocIds(
opContext, components, entityService, enabled && hashEnabled, batchSize, delayMs, limit);
}
}

View File

@ -1,56 +0,0 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.schemafield.GenerateSchemaFieldsFromSchemaMetadata;
import com.linkedin.datahub.upgrade.system.schemafield.MigrateSchemaFieldDocIds;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class SchemaFieldsConfig {
@Bean
public NonBlockingSystemUpgrade schemaFieldsFromSchemaMetadata(
@Qualifier("systemOperationContext") final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
// SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_ENABLED
@Value("${systemUpdate.schemaFieldsFromSchemaMetadata.enabled}") final boolean enabled,
// SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_BATCH_SIZE
@Value("${systemUpdate.schemaFieldsFromSchemaMetadata.batchSize}") final Integer batchSize,
// SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_DELAY_MS
@Value("${systemUpdate.schemaFieldsFromSchemaMetadata.delayMs}") final Integer delayMs,
// SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_LIMIT
@Value("${systemUpdate.schemaFieldsFromSchemaMetadata.limit}") final Integer limit) {
return new GenerateSchemaFieldsFromSchemaMetadata(
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
@Bean
public NonBlockingSystemUpgrade schemaFieldsDocIds(
@Qualifier("systemOperationContext") final OperationContext opContext,
@Qualifier("baseElasticSearchComponents")
final BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components,
final EntityService<?> entityService,
// ELASTICSEARCH_INDEX_DOC_IDS_SCHEMA_FIELD_HASH_ID_ENABLED
@Value("${elasticsearch.index.docIds.schemaField.hashIdEnabled}") final boolean hashEnabled,
// SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_ENABLED
@Value("${systemUpdate.schemaFieldsDocIds.enabled}") final boolean enabled,
// SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_BATCH_SIZE
@Value("${systemUpdate.schemaFieldsDocIds.batchSize}") final Integer batchSize,
// SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_DELAY_MS
@Value("${systemUpdate.schemaFieldsDocIds.delayMs}") final Integer delayMs,
// SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_LIMIT
@Value("${systemUpdate.schemaFieldsDocIds.limit}") final Integer limit) {
return new MigrateSchemaFieldDocIds(
opContext, components, entityService, enabled && hashEnabled, batchSize, delayMs, limit);
}
}

View File

@ -0,0 +1,131 @@
package com.linkedin.datahub.upgrade.system.entities;
import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;
import static com.linkedin.metadata.Constants.QUERY_ENTITY_NAME;
import static com.linkedin.metadata.graph.elastic.ESGraphQueryDAO.RELATIONSHIP_TYPE;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.config.search.BulkDeleteConfiguration;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
@Slf4j
public class RemoveQueryEdges implements NonBlockingSystemUpgrade {
private final List<UpgradeStep> steps;
public RemoveQueryEdges(
OperationContext opContext,
EntityService<?> entityService,
ESWriteDAO esWriteDAO,
boolean enabled,
BulkDeleteConfiguration deleteConfig) {
if (enabled) {
steps =
ImmutableList.of(
new RemoveQueryEdgesStep(opContext, esWriteDAO, entityService, deleteConfig));
} else {
steps = ImmutableList.of();
}
}
@Override
public String id() {
return "RemoveQueryEdges";
}
@Override
public List<UpgradeStep> steps() {
return steps;
}
public static class RemoveQueryEdgesStep implements UpgradeStep {
private static final String UPGRADE_ID = "RemoveQueryEdges_V1";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
private final OperationContext opContext;
private final EntityService<?> entityService;
private final ESWriteDAO esWriteDAO;
private final BulkDeleteConfiguration deleteConfig;
public RemoveQueryEdgesStep(
OperationContext opContext,
ESWriteDAO esWriteDAO,
EntityService<?> entityService,
BulkDeleteConfiguration deleteConfig) {
this.opContext = opContext;
this.esWriteDAO = esWriteDAO;
this.entityService = entityService;
this.deleteConfig = deleteConfig;
}
@Override
public String id() {
return UPGRADE_ID;
}
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
final String indexName =
opContext
.getSearchContext()
.getIndexConvention()
.getIndexName(ElasticSearchGraphService.INDEX_NAME);
return (context) -> {
BoolQueryBuilder deleteQuery = QueryBuilders.boolQuery();
deleteQuery.filter(QueryBuilders.termQuery(RELATIONSHIP_TYPE, "IsAssociatedWith"));
deleteQuery.filter(QueryBuilders.termQuery("source.entityType", QUERY_ENTITY_NAME));
try {
esWriteDAO.deleteByQuerySync(indexName, deleteQuery, deleteConfig);
BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.SUCCEEDED);
} catch (Exception e) {
log.error("Failed to execute query edge delete.", e);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
}
};
}
/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return true;
}
/**
* Returns whether the upgrade should be skipped. Uses previous run history or the environment
* variables REPROCESS_DEFAULT_POLICY_FIELDS & BACKFILL_BROWSE_PATHS_V2 to determine whether to
* skip.
*/
@Override
public boolean skip(UpgradeContext context) {
boolean previouslyRun =
entityService.exists(
context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}
}
}

View File

@ -0,0 +1,263 @@
package com.linkedin.datahub.upgrade.system.entities;
import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;
import static com.linkedin.metadata.graph.elastic.ESGraphQueryDAO.RELATIONSHIP_TYPE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.config.search.BulkDeleteConfiguration;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.SearchContext;
import java.util.List;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class RemoveQueryEdgesTest {
private static final String UPGRADE_ID = "RemoveQueryEdges_V1";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
@Mock private OperationContext mockOpContext;
@Mock private EntityService<?> mockEntityService;
@Mock private ESWriteDAO mockEsWriteDAO;
@Mock private SearchContext mockSearchContext;
@Mock private IndexConvention mockIndexConvention;
@Mock private UpgradeContext mockUpgradeContext;
private RemoveQueryEdges removeQueryEdges;
private RemoveQueryEdges.RemoveQueryEdgesStep removeQueryEdgesStep;
private BulkDeleteConfiguration deleteConfig;
@BeforeMethod
public void setup() {
MockitoAnnotations.openMocks(this);
// Setup mock chain for index name resolution
when(mockOpContext.getSearchContext()).thenReturn(mockSearchContext);
when(mockSearchContext.getIndexConvention()).thenReturn(mockIndexConvention);
when(mockIndexConvention.getIndexName(ElasticSearchGraphService.INDEX_NAME))
.thenReturn("test_graph_index");
when(mockUpgradeContext.opContext()).thenReturn(mockOpContext);
this.deleteConfig = BulkDeleteConfiguration.builder().numRetries(1).build();
}
@Test
public void testRemoveQueryEdgesEnabledCreatesStep() {
// Test with enabled = true
removeQueryEdges =
new RemoveQueryEdges(mockOpContext, mockEntityService, mockEsWriteDAO, true, deleteConfig);
assertEquals(removeQueryEdges.id(), "RemoveQueryEdges");
List<UpgradeStep> steps = removeQueryEdges.steps();
assertEquals(steps.size(), 1);
assertTrue(steps.get(0) instanceof RemoveQueryEdges.RemoveQueryEdgesStep);
}
@Test
public void testRemoveQueryEdgesDisabledCreatesNoSteps() {
// Test with enabled = false
removeQueryEdges =
new RemoveQueryEdges(mockOpContext, mockEntityService, mockEsWriteDAO, false, deleteConfig);
assertEquals(removeQueryEdges.id(), "RemoveQueryEdges");
List<UpgradeStep> steps = removeQueryEdges.steps();
assertTrue(steps.isEmpty());
}
@Test
public void testStepId() {
removeQueryEdgesStep =
new RemoveQueryEdges.RemoveQueryEdgesStep(
mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig);
assertEquals(removeQueryEdgesStep.id(), UPGRADE_ID);
}
@Test
public void testIsOptional() {
removeQueryEdgesStep =
new RemoveQueryEdges.RemoveQueryEdgesStep(
mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig);
assertTrue(removeQueryEdgesStep.isOptional());
}
@Test
public void testSkipWhenPreviouslyRun() {
removeQueryEdgesStep =
new RemoveQueryEdges.RemoveQueryEdgesStep(
mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig);
// Mock that the upgrade was already run
when(mockEntityService.exists(
eq(mockOpContext),
eq(UPGRADE_ID_URN),
eq(DATA_HUB_UPGRADE_RESULT_ASPECT_NAME),
eq(true)))
.thenReturn(true);
assertTrue(removeQueryEdgesStep.skip(mockUpgradeContext));
verify(mockEntityService)
.exists(
eq(mockOpContext),
eq(UPGRADE_ID_URN),
eq(DATA_HUB_UPGRADE_RESULT_ASPECT_NAME),
eq(true));
}
@Test
public void testDontSkipWhenNotPreviouslyRun() {
removeQueryEdgesStep =
new RemoveQueryEdges.RemoveQueryEdgesStep(
mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig);
// Mock that the upgrade was not run before
when(mockEntityService.exists(
eq(mockOpContext),
eq(UPGRADE_ID_URN),
eq(DATA_HUB_UPGRADE_RESULT_ASPECT_NAME),
eq(true)))
.thenReturn(false);
assertFalse(removeQueryEdgesStep.skip(mockUpgradeContext));
}
@Test
public void testExecutableSuccess() throws Exception {
removeQueryEdgesStep =
new RemoveQueryEdges.RemoveQueryEdgesStep(
mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig);
// Mock successful delete operation
ESWriteDAO.DeleteByQueryResult mockResult =
ESWriteDAO.DeleteByQueryResult.builder()
.success(true)
.remainingDocuments(0)
.timeTaken(1000)
.retryAttempts(0)
.build();
when(mockEsWriteDAO.deleteByQuerySync(
any(String.class), any(QueryBuilder.class), eq(deleteConfig)))
.thenReturn(mockResult);
// Execute the step
UpgradeStepResult result = removeQueryEdgesStep.executable().apply(mockUpgradeContext);
// Verify the result
assertEquals(result.stepId(), UPGRADE_ID);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
// Capture and verify the delete query
ArgumentCaptor<String> indexCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<QueryBuilder> queryCaptor = ArgumentCaptor.forClass(QueryBuilder.class);
verify(mockEsWriteDAO)
.deleteByQuerySync(indexCaptor.capture(), queryCaptor.capture(), eq(deleteConfig));
assertEquals(indexCaptor.getValue(), "test_graph_index");
// Verify the query structure
QueryBuilder capturedQuery = queryCaptor.getValue();
assertTrue(capturedQuery instanceof BoolQueryBuilder);
BoolQueryBuilder boolQuery = (BoolQueryBuilder) capturedQuery;
// Verify the filters
assertEquals(boolQuery.filter().size(), 2);
// Verify relationship type filter
boolean hasRelationshipFilter =
boolQuery.filter().stream()
.anyMatch(
filter ->
filter instanceof TermQueryBuilder
&& ((TermQueryBuilder) filter).fieldName().equals(RELATIONSHIP_TYPE)
&& ((TermQueryBuilder) filter).value().equals("IsAssociatedWith"));
assertTrue(hasRelationshipFilter, "Should have relationship type filter");
// Verify source URN prefix filter
boolean hasSourceFilter =
boolQuery.filter().stream()
.anyMatch(
filter ->
filter instanceof TermQueryBuilder
&& ((TermQueryBuilder) filter).fieldName().equals("source.entityType")
&& ((TermQueryBuilder) filter).value().equals("query"));
assertTrue(hasSourceFilter, "Should have source URN prefix filter");
// Verify that bootstrap result was set
verify(mockEntityService).ingestProposal(eq(mockOpContext), any(), any(), anyBoolean());
}
@Test
public void testExecutableWhenDeleteFails() throws Exception {
removeQueryEdgesStep =
new RemoveQueryEdges.RemoveQueryEdgesStep(
mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig);
// Mock failed delete operation
ESWriteDAO.DeleteByQueryResult mockResult =
ESWriteDAO.DeleteByQueryResult.builder()
.success(false)
.failureReason("Connection timeout")
.remainingDocuments(100)
.timeTaken(5000)
.retryAttempts(3)
.build();
when(mockEsWriteDAO.deleteByQuerySync(
any(String.class), any(QueryBuilder.class), eq(deleteConfig)))
.thenReturn(mockResult);
// Execute the step - it should still succeed since the step is optional
UpgradeStepResult result = removeQueryEdgesStep.executable().apply(mockUpgradeContext);
// The step should still return success even if delete failed (since it's optional)
assertEquals(result.stepId(), UPGRADE_ID);
assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
// Verify that bootstrap result was still set
verify(mockEntityService).ingestProposal(eq(mockOpContext), any(), any(), anyBoolean());
}
@Test
public void testExecutableException() throws Exception {
removeQueryEdgesStep =
new RemoveQueryEdges.RemoveQueryEdgesStep(
mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig);
// Mock exception during delete
when(mockEsWriteDAO.deleteByQuerySync(
any(String.class), any(QueryBuilder.class), eq(deleteConfig)))
.thenThrow(new RuntimeException("Elasticsearch connection failed"));
UpgradeStepResult result = removeQueryEdgesStep.executable().apply(mockUpgradeContext);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
}
}

View File

@ -94,7 +94,7 @@ public class ESGraphQueryDAO {
static final String SOURCE = "source";
static final String DESTINATION = "destination";
static final String RELATIONSHIP_TYPE = "relationshipType";
public static final String RELATIONSHIP_TYPE = "relationshipType";
static final String SOURCE_TYPE = SOURCE + ".entityType";
static final String SOURCE_URN = SOURCE + ".urn";
static final String DESTINATION_TYPE = DESTINATION + ".entityType";

View File

@ -1,29 +1,61 @@
package com.linkedin.metadata.search.elasticsearch.update;
import static org.opensearch.index.reindex.AbstractBulkByScrollRequest.AUTO_SLICES;
import static org.opensearch.index.reindex.AbstractBulkByScrollRequest.AUTO_SLICES_VALUE;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.metadata.config.search.BulkDeleteConfiguration;
import com.linkedin.metadata.config.search.ElasticSearchConfiguration;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.core.CountRequest;
import org.opensearch.client.core.CountResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.tasks.GetTaskRequest;
import org.opensearch.client.tasks.GetTaskResponse;
import org.opensearch.client.tasks.TaskId;
import org.opensearch.client.tasks.TaskSubmissionResponse;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
@Slf4j
@RequiredArgsConstructor
public class ESWriteDAO {
private final ElasticSearchConfiguration config;
private final RestHighLevelClient searchClient;
private final ESBulkProcessor bulkProcessor;
private final int numRetries;
/** Result of a delete by query operation */
@Data
@Builder
public static class DeleteByQueryResult {
private final long timeTaken;
private final boolean success;
private final String failureReason;
private final long remainingDocuments;
private final int retryAttempts;
private final TaskId taskId;
}
/**
* Updates or inserts the given search document.
@ -42,7 +74,7 @@ public class ESWriteDAO {
.detectNoop(false)
.docAsUpsert(true)
.doc(document, XContentType.JSON)
.retryOnConflict(numRetries);
.retryOnConflict(config.getBulkProcessor().getNumRetries());
bulkProcessor.add(updateRequest);
}
@ -78,7 +110,7 @@ public class ESWriteDAO {
new UpdateRequest(toIndexName(opContext, entityName), docId)
.detectNoop(false)
.scriptedUpsert(true)
.retryOnConflict(numRetries)
.retryOnConflict(config.getBulkProcessor().getNumRetries())
.script(script)
.upsert(upsert);
bulkProcessor.add(updateRequest);
@ -102,6 +134,242 @@ public class ESWriteDAO {
}
}
/**
* Performs an async delete by query operation (non-blocking) Returns immediately with a
* CompletableFuture containing the task ID
*/
@Nonnull
public CompletableFuture<TaskSubmissionResponse> deleteByQueryAsync(
@Nonnull String indexName,
@Nonnull QueryBuilder query,
@Nullable BulkDeleteConfiguration overrideConfig) {
final BulkDeleteConfiguration finalConfig =
overrideConfig != null ? overrideConfig : config.getBulkDelete();
return CompletableFuture.supplyAsync(
() -> {
try {
DeleteByQueryRequest request = buildDeleteByQueryRequest(indexName, query, finalConfig);
// Submit the task asynchronously
TaskSubmissionResponse taskId =
searchClient.submitDeleteByQueryTask(request, RequestOptions.DEFAULT);
log.info("Started async delete by query task: {} for index: {}", taskId, indexName);
return taskId;
} catch (IOException e) {
log.error("Failed to start async delete by query for index: {}", indexName, e);
throw new RuntimeException("Failed to start async delete by query", e);
}
});
}
/**
* Performs asynchronous delete by query operation with monitoring Blocks until completion and
* retries if documents remain
*/
@Nonnull
public DeleteByQueryResult deleteByQuerySync(
@Nonnull String indexName,
@Nonnull QueryBuilder query,
@Nullable BulkDeleteConfiguration overrideConfig) {
final BulkDeleteConfiguration finalConfig =
overrideConfig != null ? overrideConfig : config.getBulkDelete();
long startTime = System.currentTimeMillis();
long totalDeleted = 0;
int retryAttempts = 0;
TaskId lastTaskId = null;
try {
// Get initial document count
long initialCount = countDocuments(indexName, query);
if (initialCount == 0) {
return DeleteByQueryResult.builder()
.timeTaken(System.currentTimeMillis() - startTime)
.success(true)
.remainingDocuments(0)
.retryAttempts(0)
.build();
}
log.info("Starting delete by query for index: {} with {} documents", indexName, initialCount);
long remainingDocs = initialCount;
long previousRemainingDocs = initialCount;
while (remainingDocs > 0 && retryAttempts < finalConfig.getNumRetries()) {
long countBeforeDelete = remainingDocs;
// Submit delete by query task
DeleteByQueryRequest request = buildDeleteByQueryRequest(indexName, query, finalConfig);
TaskSubmissionResponse taskSubmission =
searchClient.submitDeleteByQueryTask(request, RequestOptions.DEFAULT);
TaskId taskId = parseTaskId(taskSubmission.getTask());
lastTaskId = taskId;
log.info("Submitted delete by query task: {} for index: {}", taskId, indexName);
// Monitor the task with context for proper tracking
DeleteByQueryResult iterationResult =
monitorDeleteByQueryTask(taskId, finalConfig.getTimeoutDuration(), indexName, query);
// Calculate deleted count based on document count change
remainingDocs = iterationResult.getRemainingDocuments();
long deleted = 0;
if (remainingDocs >= 0 && previousRemainingDocs >= 0) {
deleted = previousRemainingDocs - remainingDocs;
if (deleted > 0) {
totalDeleted += deleted;
}
}
previousRemainingDocs = remainingDocs;
log.info(
"Delete by query iteration completed. Deleted: {}, Remaining: {}, Total deleted: {}",
deleted,
remainingDocs,
totalDeleted);
// Check if we made progress
if (!iterationResult.isSuccess()) {
// Task failed or timed out
return DeleteByQueryResult.builder()
.timeTaken(System.currentTimeMillis() - startTime)
.success(false)
.failureReason(iterationResult.getFailureReason())
.remainingDocuments(remainingDocs)
.retryAttempts(retryAttempts)
.taskId(lastTaskId)
.build();
} else if (deleted == 0 && remainingDocs == countBeforeDelete && remainingDocs > 0) {
log.warn("Delete by query made no progress. Documents remaining: {}", remainingDocs);
if (retryAttempts < finalConfig.getNumRetries() - 1) {
retryAttempts++;
// Add a small delay before retry
Thread.sleep(finalConfig.getPollDuration().toMillis());
} else {
// Final attempt made no progress
return DeleteByQueryResult.builder()
.timeTaken(System.currentTimeMillis() - startTime)
.success(false)
.failureReason(
"Delete operation made no progress after " + (retryAttempts + 1) + " attempts")
.remainingDocuments(remainingDocs)
.retryAttempts(retryAttempts)
.taskId(lastTaskId)
.build();
}
} else if (remainingDocs > 0) {
retryAttempts++;
log.info(
"Retrying delete by query. Attempt {} of {}",
retryAttempts + 1,
finalConfig.getNumRetries());
}
}
return DeleteByQueryResult.builder()
.timeTaken(System.currentTimeMillis() - startTime)
.success(remainingDocs == 0)
.failureReason(remainingDocs > 0 ? "Documents still remaining after max retries" : null)
.remainingDocuments(remainingDocs)
.retryAttempts(retryAttempts)
.taskId(lastTaskId)
.build();
} catch (Exception e) {
log.error("Delete by query failed for index: {}", indexName, e);
return DeleteByQueryResult.builder()
.timeTaken(System.currentTimeMillis() - startTime)
.success(false)
.failureReason("Exception: " + e.getMessage())
.remainingDocuments(-1) // Unknown
.retryAttempts(retryAttempts)
.taskId(lastTaskId)
.build();
}
}
/**
* Monitor the status of an async delete by query task For internal use with context to properly
* track deletions
*/
@VisibleForTesting
@Nonnull
public DeleteByQueryResult monitorDeleteByQueryTask(
@Nonnull TaskId taskId,
@Nullable Duration timeout,
@Nonnull String indexName,
@Nonnull QueryBuilder query) {
Duration finalTimeout = timeout != null ? timeout : config.getBulkDelete().getTimeoutDuration();
long startTime = System.currentTimeMillis();
try {
GetTaskRequest getTaskRequest = new GetTaskRequest(taskId.getNodeId(), taskId.getId());
getTaskRequest.setWaitForCompletion(true);
getTaskRequest.setTimeout(TimeValue.timeValueMillis(finalTimeout.toMillis()));
Optional<GetTaskResponse> taskResponse =
searchClient.tasks().get(getTaskRequest, RequestOptions.DEFAULT);
if (taskResponse.isEmpty() || !taskResponse.get().isCompleted()) {
// Count remaining documents to determine if any progress was made
long remainingDocs = countDocuments(indexName, query);
return DeleteByQueryResult.builder()
.timeTaken(System.currentTimeMillis() - startTime)
.success(false)
.failureReason("Task not completed within timeout")
.remainingDocuments(remainingDocs)
.retryAttempts(0)
.taskId(taskId)
.build();
}
// Task completed - count remaining documents to determine success
long remainingDocs = countDocuments(indexName, query);
// We can't get exact delete count from task API, but we can infer success
// from whether documents remain
return DeleteByQueryResult.builder()
.timeTaken(System.currentTimeMillis() - startTime)
.success(true) // Task completed successfully
.failureReason(null)
.remainingDocuments(remainingDocs)
.retryAttempts(0)
.taskId(taskId)
.build();
} catch (Exception e) {
log.error("Failed to monitor delete by query task: {}", taskId, e);
// Try to get remaining count even on error
long remainingDocs = -1;
try {
remainingDocs = countDocuments(indexName, query);
} catch (Exception countError) {
log.error("Failed to count remaining documents", countError);
}
return DeleteByQueryResult.builder()
.timeTaken(System.currentTimeMillis() - startTime)
.success(false)
.failureReason("Monitoring failed: " + e.getMessage())
.remainingDocuments(remainingDocs)
.retryAttempts(0)
.taskId(taskId)
.build();
}
}
private static String toIndexName(
@Nonnull OperationContext opContext, @Nonnull String entityName) {
return opContext
@ -109,4 +377,52 @@ public class ESWriteDAO {
.getIndexConvention()
.getIndexName(opContext.getEntityRegistry().getEntitySpec(entityName));
}
private DeleteByQueryRequest buildDeleteByQueryRequest(
@Nonnull String indexName,
@Nonnull QueryBuilder query,
@Nonnull BulkDeleteConfiguration config) {
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
request.setQuery(query);
request.setBatchSize(config.getBatchSize());
// Handle slices configuration - can be "auto" or a number
if (!AUTO_SLICES_VALUE.equalsIgnoreCase(config.getSlices())) {
try {
int sliceCount = Integer.parseInt(config.getSlices());
request.setSlices(sliceCount);
} catch (NumberFormatException e) {
log.warn("Invalid slices value '{}', defaulting to 'auto'", config.getSlices());
request.setSlices(AUTO_SLICES);
}
} else {
request.setSlices(AUTO_SLICES);
}
request.setTimeout(TimeValue.timeValueMillis(config.getTimeoutDuration().toMillis()));
// Set conflicts strategy to proceed (skip conflicting documents)
request.setConflicts("proceed");
return request;
}
private long countDocuments(@Nonnull String indexName, @Nonnull QueryBuilder query)
throws IOException {
CountRequest countRequest = new CountRequest(indexName);
countRequest.query(query);
CountResponse countResponse = searchClient.count(countRequest, RequestOptions.DEFAULT);
return countResponse.getCount();
}
/** Parse task ID from the task string format "nodeId:taskId" */
private TaskId parseTaskId(String taskString) {
if (taskString == null || !taskString.contains(":")) {
throw new IllegalArgumentException("Invalid task string format: " + taskString);
}
String[] parts = taskString.split(":", 2);
return new TaskId(parts[0], Long.parseLong(parts[1]));
}
}

View File

@ -216,7 +216,8 @@ public abstract class LineageServiceTestBase extends AbstractTestNGSpringContext
null,
QueryFilterRewriteChain.EMPTY,
TEST_SEARCH_SERVICE_CONFIG);
ESWriteDAO writeDAO = new ESWriteDAO(searchClientSpy, getBulkProcessor(), 1);
ESWriteDAO writeDAO =
new ESWriteDAO(TEST_ES_SEARCH_CONFIG, searchClientSpy, getBulkProcessor());
ElasticSearchService searchService =
new ElasticSearchService(
getIndexBuilder(),

View File

@ -137,7 +137,8 @@ public abstract class SearchServiceTestBase extends AbstractTestNGSpringContextT
null,
QueryFilterRewriteChain.EMPTY,
TEST_SEARCH_SERVICE_CONFIG);
ESWriteDAO writeDAO = new ESWriteDAO(getSearchClient(), getBulkProcessor(), 1);
ESWriteDAO writeDAO =
new ESWriteDAO(TEST_ES_SEARCH_CONFIG, getSearchClient(), getBulkProcessor());
ElasticSearchService searchService =
new ElasticSearchService(
getIndexBuilder(),

View File

@ -98,7 +98,8 @@ public abstract class TestEntityTestBase extends AbstractTestNGSpringContextTest
null,
QueryFilterRewriteChain.EMPTY,
TEST_SEARCH_SERVICE_CONFIG);
ESWriteDAO writeDAO = new ESWriteDAO(getSearchClient(), getBulkProcessor(), 1);
ESWriteDAO writeDAO =
new ESWriteDAO(TEST_ES_SEARCH_CONFIG, getSearchClient(), getBulkProcessor());
ElasticSearchService searchService =
new ElasticSearchService(
getIndexBuilder(),

View File

@ -0,0 +1,49 @@
package com.linkedin.metadata.search.elasticsearch;
import static org.testng.Assert.assertNotNull;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO;
import com.linkedin.metadata.search.update.WriteDAOTestBase;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.fixtures.search.SampleDataFixtureConfiguration;
import io.datahubproject.test.search.config.SearchTestContainerConfiguration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Import;
import org.testng.annotations.Test;
@Slf4j
@Getter
@Import({
ElasticSearchSuite.class,
SampleDataFixtureConfiguration.class,
SearchTestContainerConfiguration.class
})
public class WriteDAOElasticSearchTest extends WriteDAOTestBase {
@Autowired private RestHighLevelClient searchClient;
@Autowired private ESWriteDAO esWriteDAO;
@Autowired
@Qualifier("sampleDataOperationContext")
protected OperationContext operationContext;
@Autowired
@Qualifier("sampleDataEntitySearchService")
protected ElasticSearchService entitySearchService;
@Autowired
@Qualifier("sampleDataGraphService")
protected ElasticSearchGraphService graphService;
@Test
public void initTest() {
assertNotNull(searchClient);
assertNotNull(esWriteDAO);
}
}

View File

@ -0,0 +1,50 @@
package com.linkedin.metadata.search.opensearch;
import static org.testng.Assert.assertNotNull;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO;
import com.linkedin.metadata.search.update.WriteDAOTestBase;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.fixtures.search.SampleDataFixtureConfiguration;
import io.datahubproject.test.search.config.SearchTestContainerConfiguration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Import;
import org.testng.annotations.Test;
@Slf4j
@Getter
@Import({
OpenSearchSuite.class,
SampleDataFixtureConfiguration.class,
SearchTestContainerConfiguration.class
})
public class WriteDAOOpenSearchTest extends WriteDAOTestBase {
@Autowired private RestHighLevelClient searchClient;
@Autowired private ESWriteDAO esWriteDAO;
@Autowired
@Qualifier("sampleDataOperationContext")
protected OperationContext operationContext;
@Autowired
@Qualifier("sampleDataEntitySearchService")
protected ElasticSearchService entitySearchService;
@Autowired
@Qualifier("sampleDataGraphService")
protected ElasticSearchGraphService graphService;
@Test
public void initTest() {
assertNotNull(searchClient);
assertNotNull(esWriteDAO);
}
}

View File

@ -1,21 +1,32 @@
package com.linkedin.metadata.search.update;
import static io.datahubproject.test.search.SearchTestUtils.TEST_ES_SEARCH_CONFIG;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.linkedin.metadata.config.search.BulkDeleteConfiguration;
import com.linkedin.metadata.config.search.BulkProcessorConfiguration;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@ -23,16 +34,32 @@ import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.TasksClient;
import org.opensearch.client.core.CountRequest;
import org.opensearch.client.core.CountResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.tasks.GetTaskRequest;
import org.opensearch.client.tasks.GetTaskResponse;
import org.opensearch.client.tasks.TaskId;
import org.opensearch.client.tasks.TaskSubmissionResponse;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.tasks.TaskInfo;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ESWriteDAOTest {
private static final String TEST_DELETE_INDEX = "test_index";
private static final String TEST_NODE_ID = "node1";
private static final long TEST_TASK_ID = 12345L;
private static final String TEST_TASK_STRING = TEST_NODE_ID + ":" + TEST_TASK_ID;
private static final String TEST_ENTITY = "dataset";
private static final String TEST_DOC_ID = "testDocId";
private static final String TEST_INDEX = "datasetindex_v2";
@ -40,9 +67,8 @@ public class ESWriteDAOTest {
private static final String TEST_PATTERN = "*index_v2";
@Mock private RestHighLevelClient mockSearchClient;
@Mock private ESBulkProcessor mockBulkProcessor;
@Mock private TasksClient mockTasksClient;
@Mock private org.opensearch.client.IndicesClient mockIndicesClient;
private final OperationContext opContext = TestOperationContexts.systemContextNoValidate();
@ -56,7 +82,16 @@ public class ESWriteDAOTest {
// Setup mock indices client
when(mockSearchClient.indices()).thenReturn(mockIndicesClient);
esWriteDAO = new ESWriteDAO(mockSearchClient, mockBulkProcessor, NUM_RETRIES);
// Setup mock tasks client
when(mockSearchClient.tasks()).thenReturn(mockTasksClient);
esWriteDAO =
new ESWriteDAO(
TEST_ES_SEARCH_CONFIG.toBuilder()
.bulkProcessor(BulkProcessorConfiguration.builder().numRetries(NUM_RETRIES).build())
.build(),
mockSearchClient,
mockBulkProcessor);
}
@Test
@ -173,4 +208,685 @@ public class ESWriteDAOTest {
public void testUpsertDocumentWithNullDocument() {
esWriteDAO.upsertDocument(opContext, TEST_ENTITY, null, TEST_DOC_ID);
}
@Test
public void testDeleteByQueryAsyncSuccess()
throws IOException, ExecutionException, InterruptedException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
TaskSubmissionResponse mockResponse = mock(TaskSubmissionResponse.class);
when(mockResponse.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockResponse);
CompletableFuture<TaskSubmissionResponse> future =
esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, null);
TaskSubmissionResponse result = future.get();
assertNotNull(result);
assertEquals(result.getTask(), TEST_TASK_STRING);
// Verify the request
ArgumentCaptor<DeleteByQueryRequest> requestCaptor =
ArgumentCaptor.forClass(DeleteByQueryRequest.class);
verify(mockSearchClient)
.submitDeleteByQueryTask(requestCaptor.capture(), eq(RequestOptions.DEFAULT));
DeleteByQueryRequest capturedRequest = requestCaptor.getValue();
assertEquals(capturedRequest.indices()[0], TEST_DELETE_INDEX);
assertEquals(capturedRequest.getBatchSize(), 1000);
assertEquals(capturedRequest.getSearchRequest().source().query(), query);
}
@Test
public void testDeleteByQueryAsyncWithCustomConfig()
throws IOException, ExecutionException, InterruptedException {
QueryBuilder query = QueryBuilders.matchAllQuery();
BulkDeleteConfiguration customConfig =
BulkDeleteConfiguration.builder()
.batchSize(5000)
.slices("10")
.timeout(1)
.timeoutUnit("HOURS")
.build();
TaskSubmissionResponse mockResponse = mock(TaskSubmissionResponse.class);
when(mockResponse.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockResponse);
CompletableFuture<TaskSubmissionResponse> future =
esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, customConfig);
TaskSubmissionResponse result = future.get();
assertNotNull(result);
// Verify custom config is applied
ArgumentCaptor<DeleteByQueryRequest> requestCaptor =
ArgumentCaptor.forClass(DeleteByQueryRequest.class);
verify(mockSearchClient)
.submitDeleteByQueryTask(requestCaptor.capture(), eq(RequestOptions.DEFAULT));
DeleteByQueryRequest capturedRequest = requestCaptor.getValue();
assertEquals(capturedRequest.getBatchSize(), 5000);
// Note: We can't directly verify slices as it's set internally
}
@Test
public void testDeleteByQueryAsyncIOException() throws IOException {
QueryBuilder query = QueryBuilders.termQuery("field", "value");
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenThrow(new IOException("Network error"));
CompletableFuture<TaskSubmissionResponse> future =
esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, null);
try {
future.get();
fail("Expected ExecutionException");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof RuntimeException);
assertTrue(e.getCause().getMessage().contains("Failed to start async delete by query"));
} catch (InterruptedException e) {
fail("Unexpected InterruptedException");
}
}
@Test
public void testDeleteByQuerySyncNoDocuments() throws IOException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
// Mock count response with 0 documents
CountResponse mockCountResponse = mock(CountResponse.class);
when(mockCountResponse.getCount()).thenReturn(0L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockCountResponse);
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertTrue(result.isSuccess());
assertEquals(result.getRemainingDocuments(), 0);
assertEquals(result.getRetryAttempts(), 0);
assertNull(result.getFailureReason());
}
@Test
public void testDeleteByQuerySyncSuccessfulDeletion() throws IOException, InterruptedException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
// Mock count responses
CountResponse initialCount = mock(CountResponse.class);
when(initialCount.getCount()).thenReturn(100L);
CountResponse afterDeleteCount = mock(CountResponse.class);
when(afterDeleteCount.getCount()).thenReturn(0L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(initialCount)
.thenReturn(afterDeleteCount);
// Mock task submission
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
// Mock task monitoring
GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class);
when(mockTaskResponse.isCompleted()).thenReturn(true);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskResponse.getTaskInfo()).thenReturn(mockTaskInfo);
when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(Optional.of(mockTaskResponse));
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertTrue(result.isSuccess());
assertEquals(result.getRemainingDocuments(), 0);
assertNull(result.getFailureReason());
}
@Test
public void testDeleteByQuerySyncWithRetries() throws IOException, InterruptedException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
// Mock count responses - documents remain after first attempt
CountResponse count100 = mock(CountResponse.class);
when(count100.getCount()).thenReturn(100L);
CountResponse count50 = mock(CountResponse.class);
when(count50.getCount()).thenReturn(50L);
CountResponse count0 = mock(CountResponse.class);
when(count0.getCount()).thenReturn(0L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(count100) // Initial count
.thenReturn(count50) // After first delete
.thenReturn(count0); // After second delete
// Mock task submissions
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
// Mock task monitoring
GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class);
when(mockTaskResponse.isCompleted()).thenReturn(true);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskResponse.getTaskInfo()).thenReturn(mockTaskInfo);
when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(Optional.of(mockTaskResponse));
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertTrue(result.isSuccess());
assertEquals(result.getRemainingDocuments(), 0);
assertEquals(result.getRetryAttempts(), 1); // One retry was needed
assertNull(result.getFailureReason());
// Verify two delete operations were performed
verify(mockSearchClient, times(2))
.submitDeleteByQueryTask(any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT));
}
@Test
public void testDeleteByQuerySyncNoProgress() throws IOException, InterruptedException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
// Mock count responses - no progress made
CountResponse count100 = mock(CountResponse.class);
when(count100.getCount()).thenReturn(100L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(count100); // Always returns 100
// Mock task submission
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
// Mock task monitoring with no deletions
GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class);
when(mockTaskResponse.isCompleted()).thenReturn(true);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskResponse.getTaskInfo()).thenReturn(mockTaskInfo);
when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(Optional.of(mockTaskResponse));
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertFalse(result.isSuccess());
assertEquals(result.getRemainingDocuments(), 100);
assertTrue(result.getFailureReason().contains("no progress"));
}
@Test
public void testDeleteByQuerySyncException() throws IOException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
// Mock initial count
CountResponse mockCountResponse = mock(CountResponse.class);
when(mockCountResponse.getCount()).thenReturn(100L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockCountResponse);
// Mock exception during task submission
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenThrow(new IOException("Connection failed"));
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertFalse(result.isSuccess());
assertEquals(result.getRemainingDocuments(), -1); // Unknown
assertTrue(result.getFailureReason().contains("Exception"));
}
@Test
public void testParseTaskIdValid() {
// This would be a private method test, but we can test it indirectly through deleteByQuerySync
QueryBuilder query = QueryBuilders.termQuery("field", "value");
try {
// Setup mocks for a basic flow
CountResponse mockCount = mock(CountResponse.class);
when(mockCount.getCount()).thenReturn(100L).thenReturn(0L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockCount);
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn("node123:456789");
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class);
when(mockTaskResponse.isCompleted()).thenReturn(true);
when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(Optional.of(mockTaskResponse));
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
// If we get here without exception, parsing worked
assertNotNull(result);
} catch (Exception e) {
fail("Should not throw exception for valid task ID format");
}
}
@Test
public void testDeleteByQueryWithInvalidSlicesConfig()
throws IOException, ExecutionException, InterruptedException {
QueryBuilder query = QueryBuilders.matchAllQuery();
BulkDeleteConfiguration customConfig =
BulkDeleteConfiguration.builder()
.batchSize(1000)
.slices("invalid-number")
.timeout(30)
.timeoutUnit("MINUTES")
.build();
TaskSubmissionResponse mockResponse = mock(TaskSubmissionResponse.class);
when(mockResponse.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockResponse);
// Should handle invalid slices gracefully and default to auto
CompletableFuture<TaskSubmissionResponse> future =
esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, customConfig);
TaskSubmissionResponse result = future.get();
assertNotNull(result);
verify(mockSearchClient)
.submitDeleteByQueryTask(any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT));
}
@Test
public void testDeleteByQueryWithAutoSlices()
throws IOException, ExecutionException, InterruptedException {
QueryBuilder query = QueryBuilders.matchAllQuery();
BulkDeleteConfiguration customConfig =
BulkDeleteConfiguration.builder()
.batchSize(2000)
.slices("auto") // Explicitly set to auto
.timeout(45)
.timeoutUnit("MINUTES")
.build();
TaskSubmissionResponse mockResponse = mock(TaskSubmissionResponse.class);
when(mockResponse.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockResponse);
CompletableFuture<TaskSubmissionResponse> future =
esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, customConfig);
TaskSubmissionResponse result = future.get();
assertNotNull(result);
// Verify the request configuration
ArgumentCaptor<DeleteByQueryRequest> requestCaptor =
ArgumentCaptor.forClass(DeleteByQueryRequest.class);
verify(mockSearchClient)
.submitDeleteByQueryTask(requestCaptor.capture(), eq(RequestOptions.DEFAULT));
DeleteByQueryRequest capturedRequest = requestCaptor.getValue();
assertEquals(capturedRequest.getBatchSize(), 2000);
assertEquals(capturedRequest.getTimeout(), TimeValue.timeValueMinutes(45));
}
@Test
public void testDeleteByQuerySyncTaskFailure() throws IOException, InterruptedException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
// Mock initial count
CountResponse initialCount = mock(CountResponse.class);
when(initialCount.getCount()).thenReturn(100L);
CountResponse afterFailureCount = mock(CountResponse.class);
when(afterFailureCount.getCount()).thenReturn(90L); // Some documents deleted before failure
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(initialCount)
.thenReturn(afterFailureCount);
// Mock task submission
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
// Mock task monitoring with incomplete/failed task
GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class);
when(mockTaskResponse.isCompleted()).thenReturn(false); // Task didn't complete
when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(Optional.of(mockTaskResponse));
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertFalse(result.isSuccess());
assertEquals(result.getRemainingDocuments(), 90);
assertTrue(result.getFailureReason().contains("Task not completed within timeout"));
}
@Test
public void testDeleteByQuerySyncMaxRetriesWithPartialProgress()
throws IOException, InterruptedException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
// Mock count responses - partial progress on each attempt
CountResponse count100 = mock(CountResponse.class);
when(count100.getCount()).thenReturn(100L);
CountResponse count80 = mock(CountResponse.class);
when(count80.getCount()).thenReturn(80L);
CountResponse count60 = mock(CountResponse.class);
when(count60.getCount()).thenReturn(60L);
CountResponse count40 = mock(CountResponse.class);
when(count40.getCount()).thenReturn(40L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(count100) // Initial
.thenReturn(count80) // After 1st delete
.thenReturn(count60) // After 2nd delete
.thenReturn(count40); // After 3rd delete (max retries)
// Mock task submissions
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
// Mock successful task completions
GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class);
when(mockTaskResponse.isCompleted()).thenReturn(true);
when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(Optional.of(mockTaskResponse));
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertFalse(result.isSuccess()); // Failed because documents remain
assertEquals(result.getRemainingDocuments(), 40);
assertEquals(result.getRetryAttempts(), 3); // 3 total attempts
assertTrue(result.getFailureReason().contains("Documents still remaining after max retries"));
// Verify 3 delete operations were performed (initial + 2 retries)
verify(mockSearchClient, times(3))
.submitDeleteByQueryTask(any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT));
}
@Test
public void testMonitorDeleteByQueryTaskWithCountException() throws IOException {
TaskId taskId = new TaskId(TEST_NODE_ID, TEST_TASK_ID);
// Mock task API throwing exception
when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT)))
.thenThrow(new IOException("Task API error"));
// Also make count fail when trying to get remaining documents
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenThrow(new IOException("Count API error"));
// Note: This tests the internal monitor method indirectly through deleteByQuerySync
QueryBuilder query = QueryBuilders.termQuery("field", "value");
// Setup initial count to trigger the flow
CountResponse initialCount = mock(CountResponse.class);
when(initialCount.getCount()).thenReturn(100L);
// First count succeeds, subsequent counts fail
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(initialCount)
.thenThrow(new IOException("Count API error"));
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertFalse(result.isSuccess());
assertEquals(result.getRemainingDocuments(), -1); // Unknown due to count failure
assertTrue(result.getFailureReason().contains("Monitoring failed"));
}
@Test
public void testParseTaskIdInvalidFormats() {
QueryBuilder query = QueryBuilders.termQuery("field", "value");
// Test null task string
try {
CountResponse mockCount = mock(CountResponse.class);
when(mockCount.getCount()).thenReturn(100L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockCount);
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn(null); // Null task string
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertFalse(result.isSuccess());
assertTrue(result.getFailureReason().contains("Invalid task string format"));
} catch (Exception e) {
// Expected
}
// Test task string without colon
try {
CountResponse mockCount = mock(CountResponse.class);
when(mockCount.getCount()).thenReturn(100L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockCount);
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn("invalidformat"); // No colon
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertFalse(result.isSuccess());
assertTrue(result.getFailureReason().contains("Invalid task string format"));
} catch (Exception e) {
// Expected
}
}
@Test
public void testDeleteByQuerySyncInterruptedException() throws IOException, InterruptedException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
// Mock count responses
CountResponse count100 = mock(CountResponse.class);
when(count100.getCount()).thenReturn(100L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(count100);
// Mock task submission
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
// Mock task monitoring that takes time
GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class);
when(mockTaskResponse.isCompleted()).thenReturn(true);
// Simulate interruption during task monitoring
when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT)))
.thenAnswer(
invocation -> {
Thread.currentThread().interrupt();
throw new InterruptedException("Thread interrupted");
});
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertFalse(result.isSuccess());
assertTrue(result.getFailureReason().contains("interrupted"));
// Verify thread interrupted status is preserved
assertTrue(Thread.currentThread().isInterrupted());
Thread.interrupted(); // Clear interrupt status for next tests
}
@Test
public void testDeleteByQuerySyncEmptyTaskResponse() throws IOException, InterruptedException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
// Mock initial count
CountResponse initialCount = mock(CountResponse.class);
when(initialCount.getCount()).thenReturn(50L);
CountResponse afterCount = mock(CountResponse.class);
when(afterCount.getCount()).thenReturn(50L); // No change
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(initialCount)
.thenReturn(afterCount);
// Mock task submission
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
// Return empty Optional for task response
when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(Optional.empty());
ESWriteDAO.DeleteByQueryResult result =
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null);
assertFalse(result.isSuccess());
assertEquals(result.getRemainingDocuments(), 50);
assertTrue(result.getFailureReason().contains("Task not completed within timeout"));
}
@Test
public void testMonitorTaskRequestParameters() throws IOException {
QueryBuilder query = QueryBuilders.termQuery("status", "deleted");
Duration customTimeout = Duration.ofMinutes(10);
BulkDeleteConfiguration customConfig =
BulkDeleteConfiguration.builder()
.batchSize(1000)
.slices("5")
.timeout(10)
.timeoutUnit("MINUTES")
.pollInterval(2)
.pollIntervalUnit("SECONDS")
.numRetries(1)
.build();
// Setup initial count
CountResponse initialCount = mock(CountResponse.class);
when(initialCount.getCount()).thenReturn(50L);
CountResponse afterCount = mock(CountResponse.class);
when(afterCount.getCount()).thenReturn(0L);
when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(initialCount)
.thenReturn(afterCount);
// Mock task submission
TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class);
when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockSubmission);
// Mock task response
GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class);
when(mockTaskResponse.isCompleted()).thenReturn(true);
when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(Optional.of(mockTaskResponse));
esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, customConfig);
// Verify GetTaskRequest parameters
ArgumentCaptor<GetTaskRequest> taskRequestCaptor =
ArgumentCaptor.forClass(GetTaskRequest.class);
verify(mockTasksClient).get(taskRequestCaptor.capture(), eq(RequestOptions.DEFAULT));
GetTaskRequest capturedRequest = taskRequestCaptor.getValue();
assertEquals(capturedRequest.getNodeId(), TEST_NODE_ID);
assertEquals(capturedRequest.getTaskId(), TEST_TASK_ID);
assertTrue(capturedRequest.getWaitForCompletion());
assertEquals(capturedRequest.getTimeout(), TimeValue.timeValueMinutes(10));
}
@Test
public void testDeleteByQueryRequestConflictsStrategy()
throws IOException, ExecutionException, InterruptedException {
QueryBuilder query = QueryBuilders.matchAllQuery();
TaskSubmissionResponse mockResponse = mock(TaskSubmissionResponse.class);
when(mockResponse.getTask()).thenReturn(TEST_TASK_STRING);
when(mockSearchClient.submitDeleteByQueryTask(
any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)))
.thenReturn(mockResponse);
CompletableFuture<TaskSubmissionResponse> future =
esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, null);
future.get();
// Verify conflicts strategy is set to "proceed"
ArgumentCaptor<DeleteByQueryRequest> requestCaptor =
ArgumentCaptor.forClass(DeleteByQueryRequest.class);
verify(mockSearchClient)
.submitDeleteByQueryTask(requestCaptor.capture(), eq(RequestOptions.DEFAULT));
DeleteByQueryRequest capturedRequest = requestCaptor.getValue();
// Note: We can't directly verify conflicts setting as it's not exposed via getter
// but we know it's set to "proceed" in the implementation
assertNotNull(capturedRequest);
}
}

View File

@ -0,0 +1,144 @@
package com.linkedin.metadata.search.update;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.Constants.QUERY_ENTITY_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.aspect.models.graph.Edge;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO;
import io.datahubproject.metadata.context.OperationContext;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.core.CountRequest;
import org.opensearch.client.core.CountResponse;
import org.opensearch.client.tasks.TaskId;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermQueryBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;
@Slf4j
public abstract class WriteDAOTestBase extends AbstractTestNGSpringContextTests {
protected static final String TEST_RELATIONSHIP_TYPE = "IsAssociatedWith";
@Autowired private ESBulkProcessor bulkProcessor;
protected abstract RestHighLevelClient getSearchClient();
protected abstract OperationContext getOperationContext();
protected abstract ElasticSearchGraphService getGraphService();
protected abstract ESWriteDAO getEsWriteDAO();
@Test
public void testDeleteByQueryAsyncOnGraphIndex() throws Exception {
String testRunId = UUID.randomUUID().toString();
String indexName =
getOperationContext()
.getSearchContext()
.getIndexConvention()
.getIndexName(ElasticSearchGraphService.INDEX_NAME);
// Create fewer edges for async test
int totalEdges = 8;
createTestEdges(testRunId, totalEdges);
// Verify creation
long initialCount = countTestEdges(indexName, testRunId);
assertEquals(initialCount, totalEdges);
// Build query
TermQueryBuilder query =
QueryBuilders.termQuery(
"destination.urn",
"urn:li:" + DATASET_ENTITY_NAME + ":(urn:li:dataPlatform:test," + testRunId + ",PROD)");
// Submit async delete
var taskFuture = getEsWriteDAO().deleteByQueryAsync(indexName, query, null);
var taskSubmission = taskFuture.get(30, TimeUnit.SECONDS);
assertNotNull(taskSubmission);
assertNotNull(taskSubmission.getTask());
// Parse task ID
String[] taskParts = taskSubmission.getTask().split(":");
TaskId taskId = new TaskId(taskParts[0], Long.parseLong(taskParts[1]));
// Monitor the task
ESWriteDAO.DeleteByQueryResult monitorResult =
getEsWriteDAO().monitorDeleteByQueryTask(taskId, Duration.ofMinutes(1), indexName, query);
assertTrue(monitorResult.isSuccess(), "Async task should complete successfully");
// Wait and verify deletion
Thread.sleep(2000);
long finalCount = countTestEdges(indexName, testRunId);
assertEquals(finalCount, 0, "All test edges should be deleted asynchronously");
}
private void createTestEdges(String testRunId, int count) throws InterruptedException {
for (int i = 0; i < count; i++) {
try {
// Create unique dataset URN for source
Urn destUrn =
Urn.createFromString(
"urn:li:"
+ DATASET_ENTITY_NAME
+ ":(urn:li:dataPlatform:test,"
+ testRunId
+ ",PROD)");
// Create query URN for destination
Urn sourceUrn =
Urn.createFromString(
"urn:li:" + QUERY_ENTITY_NAME + ":test_query_" + testRunId + "_" + i);
Edge edge =
new Edge(
sourceUrn,
destUrn,
TEST_RELATIONSHIP_TYPE,
System.currentTimeMillis(),
Urn.createFromString("urn:li:corpuser:test"),
System.currentTimeMillis(),
Urn.createFromString("urn:li:corpuser:test"),
Map.of());
getGraphService().addEdge(edge);
} catch (Exception e) {
throw new RuntimeException("Failed to create test edge", e);
}
}
bulkProcessor.flush();
// Wait for indexing
Thread.sleep(2000);
}
private long countTestEdges(String indexName, String testRunId) throws Exception {
CountRequest countRequest = new CountRequest(indexName);
countRequest.query(
QueryBuilders.termQuery(
"destination.urn",
"urn:li:"
+ DATASET_ENTITY_NAME
+ ":(urn:li:dataPlatform:test,"
+ testRunId
+ ",PROD)"));
CountResponse response = getSearchClient().count(countRequest, RequestOptions.DEFAULT);
return response.getCount();
}
}

View File

@ -2,6 +2,7 @@ package io.datahubproject.test.fixtures.search;
import static com.linkedin.metadata.Constants.*;
import static io.datahubproject.test.search.SearchTestUtils.TEST_ES_SEARCH_CONFIG;
import static io.datahubproject.test.search.SearchTestUtils.TEST_GRAPH_SERVICE_CONFIG;
import static io.datahubproject.test.search.SearchTestUtils.TEST_SEARCH_SERVICE_CONFIG;
import static io.datahubproject.test.search.config.SearchTestContainerConfiguration.REFRESH_INTERVAL_SECONDS;
import static org.mockito.ArgumentMatchers.anyBoolean;
@ -20,6 +21,9 @@ import com.linkedin.metadata.config.search.custom.CustomSearchConfiguration;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityAspectIdentifier;
import com.linkedin.metadata.entity.EntityServiceImpl;
import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO;
import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.cache.EntityDocCountCache;
import com.linkedin.metadata.search.client.CachingEntitySearchService;
@ -137,22 +141,22 @@ public class SampleDataFixtureConfiguration {
.build(testOpContext.getSessionAuthentication(), true);
}
protected ElasticSearchService entitySearchServiceHelper(OperationContext opContext)
throws IOException {
@Bean
protected ESWriteDAO esWriteDAO() {
return new ESWriteDAO(TEST_ES_SEARCH_CONFIG, _searchClient, _bulkProcessor);
}
@Bean("sampleDataESIndexBuilder")
protected ESIndexBuilder esIndexBuilder() {
GitVersion gitVersion = new GitVersion("0.0.0-test", "123456", Optional.empty());
ESIndexBuilder indexBuilder =
new ESIndexBuilder(
_searchClient,
1,
0,
1,
1,
Map.of(),
true,
false,
false,
TEST_ES_SEARCH_CONFIG,
gitVersion);
return new ESIndexBuilder(
_searchClient, 1, 0, 1, 1, Map.of(), true, false, false, TEST_ES_SEARCH_CONFIG, gitVersion);
}
protected ElasticSearchService entitySearchServiceHelper(
OperationContext opContext, ESWriteDAO esWriteDAO, ESIndexBuilder indexBuilder)
throws IOException {
IndexConfiguration indexConfiguration = new IndexConfiguration();
indexConfiguration.setMinSearchFilterLength(3);
SettingsBuilder settingsBuilder = new SettingsBuilder(null, indexConfiguration);
@ -173,7 +177,7 @@ public class SampleDataFixtureConfiguration {
_customSearchConfiguration,
queryFilterRewriteChain,
TEST_SEARCH_SERVICE_CONFIG);
ESWriteDAO writeDAO = new ESWriteDAO(_searchClient, _bulkProcessor, 1);
return new ElasticSearchService(
indexBuilder,
opContext.getEntityRegistry(),
@ -182,20 +186,52 @@ public class SampleDataFixtureConfiguration {
TEST_SEARCH_SERVICE_CONFIG,
searchDAO,
browseDAO,
writeDAO);
esWriteDAO);
}
@Bean(name = "sampleDataGraphService")
@Nonnull
protected ElasticSearchGraphService graphService(
@Qualifier("sampleDataOperationContext") OperationContext opContext,
@Qualifier("sampleDataESIndexBuilder") ESIndexBuilder indexBuilder) {
IndexConvention indexConvention = opContext.getSearchContext().getIndexConvention();
ElasticSearchGraphService graphService =
new ElasticSearchGraphService(
opContext.getLineageRegistry(),
_bulkProcessor,
indexConvention,
new ESGraphWriteDAO(
indexConvention, _bulkProcessor, 1, TEST_ES_SEARCH_CONFIG.getSearch().getGraph()),
new ESGraphQueryDAO(
_searchClient,
opContext.getLineageRegistry(),
indexConvention,
TEST_GRAPH_SERVICE_CONFIG,
TEST_ES_SEARCH_CONFIG,
null),
indexBuilder,
indexConvention.getIdHashAlgo());
graphService.reindexAll(Collections.emptySet());
return graphService;
}
@Bean(name = "sampleDataEntitySearchService")
protected ElasticSearchService entitySearchService(
@Qualifier("sampleDataOperationContext") OperationContext opContext) throws IOException {
return entitySearchServiceHelper(opContext);
@Qualifier("sampleDataOperationContext") OperationContext opContext,
ESWriteDAO esWriteDAO,
@Qualifier("sampleDataESIndexBuilder") ESIndexBuilder indexBuilder)
throws IOException {
return entitySearchServiceHelper(opContext, esWriteDAO, indexBuilder);
}
@Bean(name = "longTailEntitySearchService")
protected ElasticSearchService longTailEntitySearchService(
@Qualifier("longTailOperationContext") OperationContext longtailOperationContext)
@Qualifier("longTailOperationContext") OperationContext longtailOperationContext,
ESWriteDAO esWriteDAO,
@Qualifier("sampleDataESIndexBuilder") ESIndexBuilder indexBuilder)
throws IOException {
return entitySearchServiceHelper(longtailOperationContext);
return entitySearchServiceHelper(longtailOperationContext, esWriteDAO, indexBuilder);
}
@Bean(name = "sampleDataSearchService")

View File

@ -145,7 +145,7 @@ public class SearchLineageFixtureConfiguration {
customSearchConfiguration,
queryFilterRewriteChain,
TEST_SEARCH_SERVICE_CONFIG);
ESWriteDAO writeDAO = new ESWriteDAO(searchClient, bulkProcessor, 1);
ESWriteDAO writeDAO = new ESWriteDAO(TEST_ES_SEARCH_CONFIG, searchClient, bulkProcessor);
return new ElasticSearchService(
indexBuilder,

View File

@ -16,6 +16,8 @@ import com.linkedin.metadata.config.SystemMetadataServiceConfig;
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
import com.linkedin.metadata.config.graph.GraphServiceConfiguration;
import com.linkedin.metadata.config.search.BuildIndicesConfiguration;
import com.linkedin.metadata.config.search.BulkDeleteConfiguration;
import com.linkedin.metadata.config.search.BulkProcessorConfiguration;
import com.linkedin.metadata.config.search.ElasticSearchConfiguration;
import com.linkedin.metadata.config.search.GraphQueryConfiguration;
import com.linkedin.metadata.config.search.SearchConfiguration;
@ -77,6 +79,17 @@ public class SearchTestUtils {
});
}
})
.bulkProcessor(BulkProcessorConfiguration.builder().numRetries(1).build())
.bulkDelete(
BulkDeleteConfiguration.builder()
.batchSize(1000)
.slices("auto")
.numRetries(3)
.timeout(30)
.timeoutUnit("MINUTES")
.pollInterval(1)
.pollIntervalUnit("SECONDS")
.build())
.buildIndices(
BuildIndicesConfiguration.builder().reindexOptimizationEnabled(true).build())
.build();

View File

@ -11,10 +11,6 @@ record QuerySubject {
/**
* An entity which is the subject of a query.
*/
@Relationship = {
"name": "IsAssociatedWith",
"entityTypes": [ "dataset", "schemaField" ]
}
@Searchable = {
"fieldName": "entities",
"fieldType": "URN",

View File

@ -0,0 +1,32 @@
package com.linkedin.metadata.config.search;
import com.linkedin.metadata.utils.ParseUtils;
import java.time.Duration;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
@Builder(toBuilder = true)
@AllArgsConstructor
@NoArgsConstructor
public class BulkDeleteConfiguration {
private int batchSize;
private String slices;
private int pollInterval;
private String pollIntervalUnit;
private int timeout;
private String timeoutUnit;
private int numRetries;
public Duration getPollDuration() {
return ParseUtils.parseDuration(pollInterval, pollIntervalUnit);
}
public Duration getTimeoutDuration() {
return ParseUtils.parseDuration(timeout, timeoutUnit);
}
}

View File

@ -0,0 +1,16 @@
package com.linkedin.metadata.config.search;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class BulkProcessorConfiguration {
private int numRetries;
}

View File

@ -10,6 +10,8 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
@Builder(toBuilder = true)
public class ElasticSearchConfiguration {
private BulkDeleteConfiguration bulkDelete;
private BulkProcessorConfiguration bulkProcessor;
private BuildIndicesConfiguration buildIndices;
public String implementation;
private SearchConfiguration search;

View File

@ -0,0 +1,13 @@
package com.linkedin.metadata.utils;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
public class ParseUtils {
private ParseUtils() {}
public static Duration parseDuration(int value, String unit) {
TimeUnit timeUnit = unit != null ? TimeUnit.valueOf(unit.toUpperCase()) : TimeUnit.SECONDS;
return Duration.of(value, timeUnit.toChronoUnit());
}
}

View File

@ -250,6 +250,14 @@ elasticsearch:
keyStoreType: ${ELASTICSEARCH_SSL_KEYSTORE_TYPE:#{null}}
keyStorePassword: ${ELASTICSEARCH_SSL_KEYSTORE_PASSWORD:#{null}}
keyPassword: ${ELASTICSEARCH_SSL_KEY_PASSWORD:#{null}}
bulkDelete:
batchSize: ${ES_BULK_DELETE_BATCH_SIZE:5000}
slices: ${ES_BULK_DELETE_SLICES:auto}
pollInterval: ${ES_BULK_DELETE_POLL_INTERVAL:30}
pollIntervalUnit: ${ES_BULK_DELETE_POLL_UNIT:SECONDS}
timeout: ${ES_BULK_DELETE_TIMEOUT:30}
timeoutUnit: ${ES_BULK_DELETE_TIMEOUT_UNIT:MINUTES}
numRetries: ${ES_BULK_DELETE_NUM_RETRIES:3}
bulkProcessor:
async: ${ES_BULK_ASYNC:true}
requestsLimit: ${ES_BULK_REQUESTS_LIMIT:1000}
@ -577,6 +585,9 @@ systemUpdate:
batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_PROPERTY_DEFINITIONS_BATCH_SIZE:500}
delayMs: ${BOOTSTRAP_SYSTEM_UPDATE_PROPERTY_DEFINITIONS_DELAY_MS:1000}
limit: ${BOOTSTRAP_SYSTEM_UPDATE_PROPERTY_DEFINITIONS_CLL_LIMIT:0}
removeQueryEdges:
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_REMOVE_QUERY_EDGES_ENABLED:true}
numRetries: ${BOOTSTRAP_SYSTEM_UPDATE_REMOVE_QUERY_EDGES_RETRIES:20}
structuredProperties:
enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings

View File

@ -29,11 +29,10 @@ public class ElasticSearchGraphServiceFactory {
@Qualifier("baseElasticSearchComponents")
private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components;
@Autowired private ConfigurationProvider configurationProvider;
@Bean(name = "graphService")
@Nonnull
protected GraphService getInstance(
final ConfigurationProvider configurationProvider,
final EntityRegistry entityRegistry,
@Value("${elasticsearch.idHashAlgo}") final String idHashAlgo,
MetricUtils metricUtils) {
@ -45,7 +44,7 @@ public class ElasticSearchGraphServiceFactory {
new ESGraphWriteDAO(
components.getIndexConvention(),
components.getBulkProcessor(),
components.getNumRetries(),
components.getConfig().getBulkProcessor().getNumRetries(),
configurationProvider.getElasticSearch().getSearch().getGraph()),
new ESGraphQueryDAO(
components.getSearchClient(),

View File

@ -31,7 +31,7 @@ public class ElasticSearchSystemMetadataServiceFactory {
components.getSearchClient(),
components.getIndexConvention(),
components.getBulkProcessor(),
components.getNumRetries(),
components.getConfig().getBulkProcessor().getNumRetries(),
configurationProvider.getSystemMetadataService()),
components.getIndexBuilder(),
elasticIdHashAlgo,

View File

@ -2,6 +2,8 @@ package com.linkedin.gms.factory.search;
import com.linkedin.gms.factory.common.IndexConventionFactory;
import com.linkedin.gms.factory.common.RestHighLevelClientFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.config.search.ElasticSearchConfiguration;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
@ -9,7 +11,6 @@ import javax.annotation.Nonnull;
import org.opensearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@ -25,16 +26,13 @@ import org.springframework.context.annotation.Import;
public class BaseElasticSearchComponentsFactory {
@lombok.Value
public static class BaseElasticSearchComponents {
ElasticSearchConfiguration config;
RestHighLevelClient searchClient;
IndexConvention indexConvention;
ESBulkProcessor bulkProcessor;
ESIndexBuilder indexBuilder;
int numRetries;
}
@Value("${elasticsearch.bulkProcessor.numRetries}")
private Integer numRetries;
@Autowired
@Qualifier("elasticSearchRestHighLevelClient")
private RestHighLevelClient searchClient;
@ -53,8 +51,12 @@ public class BaseElasticSearchComponentsFactory {
@Bean(name = "baseElasticSearchComponents")
@Nonnull
protected BaseElasticSearchComponents getInstance() {
protected BaseElasticSearchComponents getInstance(ConfigurationProvider configurationProvider) {
return new BaseElasticSearchComponents(
searchClient, indexConvention, bulkProcessor, indexBuilder, numRetries);
configurationProvider.getElasticSearch(),
searchClient,
indexConvention,
bulkProcessor,
indexBuilder);
}
}

View File

@ -74,6 +74,12 @@ public class ElasticSearchServiceFactory {
configurationProvider.getSearchService());
}
@Bean
protected ESWriteDAO esWriteDAO() {
return new ESWriteDAO(
components.getConfig(), components.getSearchClient(), components.getBulkProcessor());
}
@Bean(name = "elasticSearchService")
@Nonnull
protected ElasticSearchService getInstance(
@ -81,7 +87,8 @@ public class ElasticSearchServiceFactory {
final QueryFilterRewriteChain queryFilterRewriteChain,
final ElasticSearchConfiguration elasticSearchConfiguration,
final CustomSearchConfiguration customSearchConfiguration,
final ESSearchDAO esSearchDAO)
final ESSearchDAO esSearchDAO,
final ESWriteDAO esWriteDAO)
throws IOException {
return new ElasticSearchService(
@ -97,9 +104,6 @@ public class ElasticSearchServiceFactory {
customSearchConfiguration,
queryFilterRewriteChain,
configurationProvider.getSearchService()),
new ESWriteDAO(
components.getSearchClient(),
components.getBulkProcessor(),
components.getNumRetries()));
esWriteDAO);
}
}

View File

@ -34,7 +34,7 @@ public class ElasticSearchTimeseriesAspectServiceFactory {
return new ElasticSearchTimeseriesAspectService(
components.getSearchClient(),
components.getBulkProcessor(),
components.getNumRetries(),
components.getConfig().getBulkProcessor().getNumRetries(),
queryFilterRewriteChain,
configurationProvider.getTimeseriesAspectService(),
entityRegistry,