From f12f8e37d037edf2a87fe7e0eea94badbd265c4e Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 15 Jul 2025 17:40:11 -0500 Subject: [PATCH] refactor(queries): refactor query edges (#14095) --- .../config/BackfillBrowsePathsV2Config.java | 28 - .../BackfillDataProcessInstancesConfig.java | 43 -- ...kfillIngestionSourceInfoIndicesConfig.java | 29 - .../config/BackfillPolicyFieldsConfig.java | 27 - .../upgrade/config/NonBlockingConfigs.java | 145 ++++ .../upgrade/config/SchemaFieldsConfig.java | 56 -- .../system/entities/RemoveQueryEdges.java | 131 ++++ .../system/entities/RemoveQueryEdgesTest.java | 263 +++++++ .../graph/elastic/ESGraphQueryDAO.java | 2 +- .../elasticsearch/update/ESWriteDAO.java | 324 +++++++- .../search/LineageServiceTestBase.java | 3 +- .../search/SearchServiceTestBase.java | 3 +- .../metadata/search/TestEntityTestBase.java | 3 +- .../WriteDAOElasticSearchTest.java | 49 ++ .../opensearch/WriteDAOOpenSearchTest.java | 50 ++ .../search/update/ESWriteDAOTest.java | 726 +++++++++++++++++- .../search/update/WriteDAOTestBase.java | 144 ++++ .../SampleDataFixtureConfiguration.java | 78 +- .../SearchLineageFixtureConfiguration.java | 2 +- .../test/search/SearchTestUtils.java | 13 + .../com/linkedin/query/QuerySubject.pdl | 4 - .../search/BulkDeleteConfiguration.java | 32 + .../search/BulkProcessorConfiguration.java | 16 + .../search/ElasticSearchConfiguration.java | 2 + .../linkedin/metadata/utils/ParseUtils.java | 13 + .../src/main/resources/application.yaml | 11 + .../ElasticSearchGraphServiceFactory.java | 5 +- ...ticSearchSystemMetadataServiceFactory.java | 2 +- .../BaseElasticSearchComponentsFactory.java | 16 +- .../search/ElasticSearchServiceFactory.java | 14 +- ...cSearchTimeseriesAspectServiceFactory.java | 2 +- 31 files changed, 1997 insertions(+), 239 deletions(-) delete mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java delete mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java delete mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillIngestionSourceInfoIndicesConfig.java delete mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillPolicyFieldsConfig.java create mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NonBlockingConfigs.java delete mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SchemaFieldsConfig.java create mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entities/RemoveQueryEdges.java create mode 100644 datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/entities/RemoveQueryEdgesTest.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/WriteDAOElasticSearchTest.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/search/opensearch/WriteDAOOpenSearchTest.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/search/update/WriteDAOTestBase.java create mode 100644 metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BulkDeleteConfiguration.java create mode 100644 metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BulkProcessorConfiguration.java create mode 100644 metadata-service/configuration/src/main/java/com/linkedin/metadata/utils/ParseUtils.java diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java deleted file mode 100644 index a33722d776..0000000000 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.linkedin.datahub.upgrade.config; - -import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; -import com.linkedin.datahub.upgrade.system.browsepaths.BackfillBrowsePathsV2; -import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.search.SearchService; -import io.datahubproject.metadata.context.OperationContext; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Configuration; - -@Configuration -@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class) -public class BackfillBrowsePathsV2Config { - - @Bean - public NonBlockingSystemUpgrade backfillBrowsePathsV2( - final OperationContext opContext, - EntityService entityService, - SearchService searchService, - @Value("${systemUpdate.browsePathsV2.enabled}") final boolean enabled, - @Value("${systemUpdate.browsePathsV2.reprocess.enabled}") final boolean reprocessEnabled, - @Value("${systemUpdate.browsePathsV2.batchSize}") final Integer batchSize) { - return new BackfillBrowsePathsV2( - opContext, entityService, searchService, enabled, reprocessEnabled, batchSize); - } -} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java deleted file mode 100644 index bc55ad3876..0000000000 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.linkedin.datahub.upgrade.config; - -import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; -import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances; -import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; -import io.datahubproject.metadata.context.OperationContext; -import org.opensearch.client.RestHighLevelClient; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Configuration; - -@Configuration -@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class) -public class BackfillDataProcessInstancesConfig { - - @Bean - public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents( - final OperationContext opContext, - EntityService entityService, - ElasticSearchService elasticSearchService, - RestHighLevelClient restHighLevelClient, - @Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled, - @Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}") - boolean reprocessEnabled, - @Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize, - @Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs, - @Value("${systemUpdate.processInstanceHasRunEvents.totalDays}") Integer totalDays, - @Value("${systemUpdate.processInstanceHasRunEvents.windowDays}") Integer windowDays) { - return new BackfillDataProcessInstances( - opContext, - entityService, - elasticSearchService, - restHighLevelClient, - enabled, - reprocessEnabled, - batchSize, - delayMs, - totalDays, - windowDays); - } -} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillIngestionSourceInfoIndicesConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillIngestionSourceInfoIndicesConfig.java deleted file mode 100644 index f525c4e358..0000000000 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillIngestionSourceInfoIndicesConfig.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.linkedin.datahub.upgrade.config; - -import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; -import com.linkedin.datahub.upgrade.system.ingestion.BackfillIngestionSourceInfoIndices; -import com.linkedin.metadata.entity.AspectDao; -import com.linkedin.metadata.entity.EntityService; -import io.datahubproject.metadata.context.OperationContext; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Configuration; - -@Configuration -@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class) -public class BackfillIngestionSourceInfoIndicesConfig { - - @Bean - public NonBlockingSystemUpgrade backfillIngestionSourceInfoIndices( - final OperationContext opContext, - final EntityService entityService, - final AspectDao aspectDao, - @Value("${systemUpdate.ingestionIndices.enabled}") final boolean enabled, - @Value("${systemUpdate.ingestionIndices.batchSize}") final Integer batchSize, - @Value("${systemUpdate.ingestionIndices.delayMs}") final Integer delayMs, - @Value("${systemUpdate.ingestionIndices.limit}") final Integer limit) { - return new BackfillIngestionSourceInfoIndices( - opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit); - } -} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillPolicyFieldsConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillPolicyFieldsConfig.java deleted file mode 100644 index 7226ec267d..0000000000 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillPolicyFieldsConfig.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.linkedin.datahub.upgrade.config; - -import com.linkedin.datahub.upgrade.system.policyfields.BackfillPolicyFields; -import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.search.SearchService; -import io.datahubproject.metadata.context.OperationContext; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Configuration; - -@Configuration -@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class) -public class BackfillPolicyFieldsConfig { - - @Bean - public BackfillPolicyFields backfillPolicyFields( - final OperationContext opContext, - EntityService entityService, - SearchService searchService, - @Value("${systemUpdate.policyFields.enabled}") final boolean enabled, - @Value("${systemUpdate.policyFields.reprocess.enabled}") final boolean reprocessEnabled, - @Value("${systemUpdate.policyFields.batchSize}") final Integer batchSize) { - return new BackfillPolicyFields( - opContext, entityService, searchService, enabled, reprocessEnabled, batchSize); - } -} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NonBlockingConfigs.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NonBlockingConfigs.java new file mode 100644 index 0000000000..a91ce13583 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NonBlockingConfigs.java @@ -0,0 +1,145 @@ +package com.linkedin.datahub.upgrade.config; + +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.datahub.upgrade.system.browsepaths.BackfillBrowsePathsV2; +import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances; +import com.linkedin.datahub.upgrade.system.entities.RemoveQueryEdges; +import com.linkedin.datahub.upgrade.system.ingestion.BackfillIngestionSourceInfoIndices; +import com.linkedin.datahub.upgrade.system.policyfields.BackfillPolicyFields; +import com.linkedin.datahub.upgrade.system.schemafield.GenerateSchemaFieldsFromSchemaMetadata; +import com.linkedin.datahub.upgrade.system.schemafield.MigrateSchemaFieldDocIds; +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory; +import com.linkedin.metadata.config.search.BulkDeleteConfiguration; +import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.SearchService; +import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; +import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; +import io.datahubproject.metadata.context.OperationContext; +import org.opensearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class) +public class NonBlockingConfigs { + + @Bean + public NonBlockingSystemUpgrade removeQueryEdges( + final OperationContext opContext, + final ConfigurationProvider configurationProvider, + EntityService entityService, + ESWriteDAO esWriteDao, + @Value("${systemUpdate.removeQueryEdges.enabled}") final boolean enabled, + @Value("${systemUpdate.removeQueryEdges.numRetries}") final int numRetries) { + BulkDeleteConfiguration override = + configurationProvider.getElasticSearch().getBulkDelete().toBuilder() + .numRetries(numRetries) + .build(); + return new RemoveQueryEdges(opContext, entityService, esWriteDao, enabled, override); + } + + @Bean + public NonBlockingSystemUpgrade backfillBrowsePathsV2( + final OperationContext opContext, + EntityService entityService, + SearchService searchService, + @Value("${systemUpdate.browsePathsV2.enabled}") final boolean enabled, + @Value("${systemUpdate.browsePathsV2.reprocess.enabled}") final boolean reprocessEnabled, + @Value("${systemUpdate.browsePathsV2.batchSize}") final Integer batchSize) { + return new BackfillBrowsePathsV2( + opContext, entityService, searchService, enabled, reprocessEnabled, batchSize); + } + + @Bean + public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents( + final OperationContext opContext, + EntityService entityService, + ElasticSearchService elasticSearchService, + RestHighLevelClient restHighLevelClient, + @Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled, + @Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}") + boolean reprocessEnabled, + @Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize, + @Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs, + @Value("${systemUpdate.processInstanceHasRunEvents.totalDays}") Integer totalDays, + @Value("${systemUpdate.processInstanceHasRunEvents.windowDays}") Integer windowDays) { + return new BackfillDataProcessInstances( + opContext, + entityService, + elasticSearchService, + restHighLevelClient, + enabled, + reprocessEnabled, + batchSize, + delayMs, + totalDays, + windowDays); + } + + @Bean + public NonBlockingSystemUpgrade backfillIngestionSourceInfoIndices( + final OperationContext opContext, + final EntityService entityService, + final AspectDao aspectDao, + @Value("${systemUpdate.ingestionIndices.enabled}") final boolean enabled, + @Value("${systemUpdate.ingestionIndices.batchSize}") final Integer batchSize, + @Value("${systemUpdate.ingestionIndices.delayMs}") final Integer delayMs, + @Value("${systemUpdate.ingestionIndices.limit}") final Integer limit) { + return new BackfillIngestionSourceInfoIndices( + opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit); + } + + @Bean + public BackfillPolicyFields backfillPolicyFields( + final OperationContext opContext, + EntityService entityService, + SearchService searchService, + @Value("${systemUpdate.policyFields.enabled}") final boolean enabled, + @Value("${systemUpdate.policyFields.reprocess.enabled}") final boolean reprocessEnabled, + @Value("${systemUpdate.policyFields.batchSize}") final Integer batchSize) { + return new BackfillPolicyFields( + opContext, entityService, searchService, enabled, reprocessEnabled, batchSize); + } + + @Bean + public NonBlockingSystemUpgrade schemaFieldsFromSchemaMetadata( + @Qualifier("systemOperationContext") final OperationContext opContext, + final EntityService entityService, + final AspectDao aspectDao, + // SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_ENABLED + @Value("${systemUpdate.schemaFieldsFromSchemaMetadata.enabled}") final boolean enabled, + // SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_BATCH_SIZE + @Value("${systemUpdate.schemaFieldsFromSchemaMetadata.batchSize}") final Integer batchSize, + // SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_DELAY_MS + @Value("${systemUpdate.schemaFieldsFromSchemaMetadata.delayMs}") final Integer delayMs, + // SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_LIMIT + @Value("${systemUpdate.schemaFieldsFromSchemaMetadata.limit}") final Integer limit) { + return new GenerateSchemaFieldsFromSchemaMetadata( + opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit); + } + + @Bean + public NonBlockingSystemUpgrade schemaFieldsDocIds( + @Qualifier("systemOperationContext") final OperationContext opContext, + @Qualifier("baseElasticSearchComponents") + final BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components, + final EntityService entityService, + // ELASTICSEARCH_INDEX_DOC_IDS_SCHEMA_FIELD_HASH_ID_ENABLED + @Value("${elasticsearch.index.docIds.schemaField.hashIdEnabled}") final boolean hashEnabled, + // SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_ENABLED + @Value("${systemUpdate.schemaFieldsDocIds.enabled}") final boolean enabled, + // SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_BATCH_SIZE + @Value("${systemUpdate.schemaFieldsDocIds.batchSize}") final Integer batchSize, + // SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_DELAY_MS + @Value("${systemUpdate.schemaFieldsDocIds.delayMs}") final Integer delayMs, + // SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_LIMIT + @Value("${systemUpdate.schemaFieldsDocIds.limit}") final Integer limit) { + return new MigrateSchemaFieldDocIds( + opContext, components, entityService, enabled && hashEnabled, batchSize, delayMs, limit); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SchemaFieldsConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SchemaFieldsConfig.java deleted file mode 100644 index 5630379c56..0000000000 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SchemaFieldsConfig.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.linkedin.datahub.upgrade.config; - -import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; -import com.linkedin.datahub.upgrade.system.schemafield.GenerateSchemaFieldsFromSchemaMetadata; -import com.linkedin.datahub.upgrade.system.schemafield.MigrateSchemaFieldDocIds; -import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory; -import com.linkedin.metadata.entity.AspectDao; -import com.linkedin.metadata.entity.EntityService; -import io.datahubproject.metadata.context.OperationContext; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Configuration; - -@Configuration -@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class) -public class SchemaFieldsConfig { - - @Bean - public NonBlockingSystemUpgrade schemaFieldsFromSchemaMetadata( - @Qualifier("systemOperationContext") final OperationContext opContext, - final EntityService entityService, - final AspectDao aspectDao, - // SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_ENABLED - @Value("${systemUpdate.schemaFieldsFromSchemaMetadata.enabled}") final boolean enabled, - // SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_BATCH_SIZE - @Value("${systemUpdate.schemaFieldsFromSchemaMetadata.batchSize}") final Integer batchSize, - // SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_DELAY_MS - @Value("${systemUpdate.schemaFieldsFromSchemaMetadata.delayMs}") final Integer delayMs, - // SYSTEM_UPDATE_SCHEMA_FIELDS_FROM_SCHEMA_METADATA_LIMIT - @Value("${systemUpdate.schemaFieldsFromSchemaMetadata.limit}") final Integer limit) { - return new GenerateSchemaFieldsFromSchemaMetadata( - opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit); - } - - @Bean - public NonBlockingSystemUpgrade schemaFieldsDocIds( - @Qualifier("systemOperationContext") final OperationContext opContext, - @Qualifier("baseElasticSearchComponents") - final BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components, - final EntityService entityService, - // ELASTICSEARCH_INDEX_DOC_IDS_SCHEMA_FIELD_HASH_ID_ENABLED - @Value("${elasticsearch.index.docIds.schemaField.hashIdEnabled}") final boolean hashEnabled, - // SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_ENABLED - @Value("${systemUpdate.schemaFieldsDocIds.enabled}") final boolean enabled, - // SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_BATCH_SIZE - @Value("${systemUpdate.schemaFieldsDocIds.batchSize}") final Integer batchSize, - // SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_DELAY_MS - @Value("${systemUpdate.schemaFieldsDocIds.delayMs}") final Integer delayMs, - // SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_LIMIT - @Value("${systemUpdate.schemaFieldsDocIds.limit}") final Integer limit) { - return new MigrateSchemaFieldDocIds( - opContext, components, entityService, enabled && hashEnabled, batchSize, delayMs, limit); - } -} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entities/RemoveQueryEdges.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entities/RemoveQueryEdges.java new file mode 100644 index 0000000000..42644cdd6a --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entities/RemoveQueryEdges.java @@ -0,0 +1,131 @@ +package com.linkedin.datahub.upgrade.system.entities; + +import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME; +import static com.linkedin.metadata.Constants.QUERY_ENTITY_NAME; +import static com.linkedin.metadata.graph.elastic.ESGraphQueryDAO.RELATIONSHIP_TYPE; + +import com.google.common.collect.ImmutableList; +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.metadata.boot.BootstrapStep; +import com.linkedin.metadata.config.search.BulkDeleteConfiguration; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; +import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; +import com.linkedin.upgrade.DataHubUpgradeState; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilders; + +@Slf4j +public class RemoveQueryEdges implements NonBlockingSystemUpgrade { + private final List steps; + + public RemoveQueryEdges( + OperationContext opContext, + EntityService entityService, + ESWriteDAO esWriteDAO, + boolean enabled, + BulkDeleteConfiguration deleteConfig) { + if (enabled) { + steps = + ImmutableList.of( + new RemoveQueryEdgesStep(opContext, esWriteDAO, entityService, deleteConfig)); + } else { + steps = ImmutableList.of(); + } + } + + @Override + public String id() { + return "RemoveQueryEdges"; + } + + @Override + public List steps() { + return steps; + } + + public static class RemoveQueryEdgesStep implements UpgradeStep { + private static final String UPGRADE_ID = "RemoveQueryEdges_V1"; + private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID); + + private final OperationContext opContext; + private final EntityService entityService; + private final ESWriteDAO esWriteDAO; + private final BulkDeleteConfiguration deleteConfig; + + public RemoveQueryEdgesStep( + OperationContext opContext, + ESWriteDAO esWriteDAO, + EntityService entityService, + BulkDeleteConfiguration deleteConfig) { + this.opContext = opContext; + this.esWriteDAO = esWriteDAO; + this.entityService = entityService; + this.deleteConfig = deleteConfig; + } + + @Override + public String id() { + return UPGRADE_ID; + } + + @Override + public Function executable() { + final String indexName = + opContext + .getSearchContext() + .getIndexConvention() + .getIndexName(ElasticSearchGraphService.INDEX_NAME); + + return (context) -> { + BoolQueryBuilder deleteQuery = QueryBuilders.boolQuery(); + deleteQuery.filter(QueryBuilders.termQuery(RELATIONSHIP_TYPE, "IsAssociatedWith")); + deleteQuery.filter(QueryBuilders.termQuery("source.entityType", QUERY_ENTITY_NAME)); + + try { + esWriteDAO.deleteByQuerySync(indexName, deleteQuery, deleteConfig); + BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService); + return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.SUCCEEDED); + } catch (Exception e) { + log.error("Failed to execute query edge delete.", e); + return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED); + } + }; + } + + /** + * Returns whether the upgrade should proceed if the step fails after exceeding the maximum + * retries. + */ + @Override + public boolean isOptional() { + return true; + } + + /** + * Returns whether the upgrade should be skipped. Uses previous run history or the environment + * variables REPROCESS_DEFAULT_POLICY_FIELDS & BACKFILL_BROWSE_PATHS_V2 to determine whether to + * skip. + */ + @Override + public boolean skip(UpgradeContext context) { + + boolean previouslyRun = + entityService.exists( + context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true); + if (previouslyRun) { + log.info("{} was already run. Skipping.", id()); + } + return previouslyRun; + } + } +} diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/entities/RemoveQueryEdgesTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/entities/RemoveQueryEdgesTest.java new file mode 100644 index 0000000000..4bb1382baa --- /dev/null +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/entities/RemoveQueryEdgesTest.java @@ -0,0 +1,263 @@ +package com.linkedin.datahub.upgrade.system.entities; + +import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME; +import static com.linkedin.metadata.graph.elastic.ESGraphQueryDAO.RELATIONSHIP_TYPE; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.metadata.boot.BootstrapStep; +import com.linkedin.metadata.config.search.BulkDeleteConfiguration; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; +import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import com.linkedin.upgrade.DataHubUpgradeState; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.metadata.context.SearchContext; +import java.util.List; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class RemoveQueryEdgesTest { + + private static final String UPGRADE_ID = "RemoveQueryEdges_V1"; + private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID); + + @Mock private OperationContext mockOpContext; + @Mock private EntityService mockEntityService; + @Mock private ESWriteDAO mockEsWriteDAO; + @Mock private SearchContext mockSearchContext; + @Mock private IndexConvention mockIndexConvention; + @Mock private UpgradeContext mockUpgradeContext; + + private RemoveQueryEdges removeQueryEdges; + private RemoveQueryEdges.RemoveQueryEdgesStep removeQueryEdgesStep; + private BulkDeleteConfiguration deleteConfig; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + + // Setup mock chain for index name resolution + when(mockOpContext.getSearchContext()).thenReturn(mockSearchContext); + when(mockSearchContext.getIndexConvention()).thenReturn(mockIndexConvention); + when(mockIndexConvention.getIndexName(ElasticSearchGraphService.INDEX_NAME)) + .thenReturn("test_graph_index"); + + when(mockUpgradeContext.opContext()).thenReturn(mockOpContext); + + this.deleteConfig = BulkDeleteConfiguration.builder().numRetries(1).build(); + } + + @Test + public void testRemoveQueryEdgesEnabledCreatesStep() { + // Test with enabled = true + removeQueryEdges = + new RemoveQueryEdges(mockOpContext, mockEntityService, mockEsWriteDAO, true, deleteConfig); + + assertEquals(removeQueryEdges.id(), "RemoveQueryEdges"); + List steps = removeQueryEdges.steps(); + assertEquals(steps.size(), 1); + assertTrue(steps.get(0) instanceof RemoveQueryEdges.RemoveQueryEdgesStep); + } + + @Test + public void testRemoveQueryEdgesDisabledCreatesNoSteps() { + // Test with enabled = false + removeQueryEdges = + new RemoveQueryEdges(mockOpContext, mockEntityService, mockEsWriteDAO, false, deleteConfig); + + assertEquals(removeQueryEdges.id(), "RemoveQueryEdges"); + List steps = removeQueryEdges.steps(); + assertTrue(steps.isEmpty()); + } + + @Test + public void testStepId() { + removeQueryEdgesStep = + new RemoveQueryEdges.RemoveQueryEdgesStep( + mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig); + + assertEquals(removeQueryEdgesStep.id(), UPGRADE_ID); + } + + @Test + public void testIsOptional() { + removeQueryEdgesStep = + new RemoveQueryEdges.RemoveQueryEdgesStep( + mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig); + + assertTrue(removeQueryEdgesStep.isOptional()); + } + + @Test + public void testSkipWhenPreviouslyRun() { + removeQueryEdgesStep = + new RemoveQueryEdges.RemoveQueryEdgesStep( + mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig); + + // Mock that the upgrade was already run + when(mockEntityService.exists( + eq(mockOpContext), + eq(UPGRADE_ID_URN), + eq(DATA_HUB_UPGRADE_RESULT_ASPECT_NAME), + eq(true))) + .thenReturn(true); + + assertTrue(removeQueryEdgesStep.skip(mockUpgradeContext)); + + verify(mockEntityService) + .exists( + eq(mockOpContext), + eq(UPGRADE_ID_URN), + eq(DATA_HUB_UPGRADE_RESULT_ASPECT_NAME), + eq(true)); + } + + @Test + public void testDontSkipWhenNotPreviouslyRun() { + removeQueryEdgesStep = + new RemoveQueryEdges.RemoveQueryEdgesStep( + mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig); + + // Mock that the upgrade was not run before + when(mockEntityService.exists( + eq(mockOpContext), + eq(UPGRADE_ID_URN), + eq(DATA_HUB_UPGRADE_RESULT_ASPECT_NAME), + eq(true))) + .thenReturn(false); + + assertFalse(removeQueryEdgesStep.skip(mockUpgradeContext)); + } + + @Test + public void testExecutableSuccess() throws Exception { + removeQueryEdgesStep = + new RemoveQueryEdges.RemoveQueryEdgesStep( + mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig); + + // Mock successful delete operation + ESWriteDAO.DeleteByQueryResult mockResult = + ESWriteDAO.DeleteByQueryResult.builder() + .success(true) + .remainingDocuments(0) + .timeTaken(1000) + .retryAttempts(0) + .build(); + + when(mockEsWriteDAO.deleteByQuerySync( + any(String.class), any(QueryBuilder.class), eq(deleteConfig))) + .thenReturn(mockResult); + + // Execute the step + UpgradeStepResult result = removeQueryEdgesStep.executable().apply(mockUpgradeContext); + + // Verify the result + assertEquals(result.stepId(), UPGRADE_ID); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + + // Capture and verify the delete query + ArgumentCaptor indexCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor queryCaptor = ArgumentCaptor.forClass(QueryBuilder.class); + + verify(mockEsWriteDAO) + .deleteByQuerySync(indexCaptor.capture(), queryCaptor.capture(), eq(deleteConfig)); + + assertEquals(indexCaptor.getValue(), "test_graph_index"); + + // Verify the query structure + QueryBuilder capturedQuery = queryCaptor.getValue(); + assertTrue(capturedQuery instanceof BoolQueryBuilder); + + BoolQueryBuilder boolQuery = (BoolQueryBuilder) capturedQuery; + + // Verify the filters + assertEquals(boolQuery.filter().size(), 2); + + // Verify relationship type filter + boolean hasRelationshipFilter = + boolQuery.filter().stream() + .anyMatch( + filter -> + filter instanceof TermQueryBuilder + && ((TermQueryBuilder) filter).fieldName().equals(RELATIONSHIP_TYPE) + && ((TermQueryBuilder) filter).value().equals("IsAssociatedWith")); + assertTrue(hasRelationshipFilter, "Should have relationship type filter"); + + // Verify source URN prefix filter + boolean hasSourceFilter = + boolQuery.filter().stream() + .anyMatch( + filter -> + filter instanceof TermQueryBuilder + && ((TermQueryBuilder) filter).fieldName().equals("source.entityType") + && ((TermQueryBuilder) filter).value().equals("query")); + assertTrue(hasSourceFilter, "Should have source URN prefix filter"); + + // Verify that bootstrap result was set + verify(mockEntityService).ingestProposal(eq(mockOpContext), any(), any(), anyBoolean()); + } + + @Test + public void testExecutableWhenDeleteFails() throws Exception { + removeQueryEdgesStep = + new RemoveQueryEdges.RemoveQueryEdgesStep( + mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig); + + // Mock failed delete operation + ESWriteDAO.DeleteByQueryResult mockResult = + ESWriteDAO.DeleteByQueryResult.builder() + .success(false) + .failureReason("Connection timeout") + .remainingDocuments(100) + .timeTaken(5000) + .retryAttempts(3) + .build(); + + when(mockEsWriteDAO.deleteByQuerySync( + any(String.class), any(QueryBuilder.class), eq(deleteConfig))) + .thenReturn(mockResult); + + // Execute the step - it should still succeed since the step is optional + UpgradeStepResult result = removeQueryEdgesStep.executable().apply(mockUpgradeContext); + + // The step should still return success even if delete failed (since it's optional) + assertEquals(result.stepId(), UPGRADE_ID); + assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED); + + // Verify that bootstrap result was still set + verify(mockEntityService).ingestProposal(eq(mockOpContext), any(), any(), anyBoolean()); + } + + @Test + public void testExecutableException() throws Exception { + removeQueryEdgesStep = + new RemoveQueryEdges.RemoveQueryEdgesStep( + mockOpContext, mockEsWriteDAO, mockEntityService, deleteConfig); + + // Mock exception during delete + when(mockEsWriteDAO.deleteByQuerySync( + any(String.class), any(QueryBuilder.class), eq(deleteConfig))) + .thenThrow(new RuntimeException("Elasticsearch connection failed")); + + UpgradeStepResult result = removeQueryEdgesStep.executable().apply(mockUpgradeContext); + assertEquals(result.result(), DataHubUpgradeState.FAILED); + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index 88a7e21171..a229c3e2ae 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -94,7 +94,7 @@ public class ESGraphQueryDAO { static final String SOURCE = "source"; static final String DESTINATION = "destination"; - static final String RELATIONSHIP_TYPE = "relationshipType"; + public static final String RELATIONSHIP_TYPE = "relationshipType"; static final String SOURCE_TYPE = SOURCE + ".entityType"; static final String SOURCE_URN = SOURCE + ".urn"; static final String DESTINATION_TYPE = DESTINATION + ".entityType"; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index 74c2b0edf7..59c0ea9dc3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -1,29 +1,61 @@ package com.linkedin.metadata.search.elasticsearch.update; +import static org.opensearch.index.reindex.AbstractBulkByScrollRequest.AUTO_SLICES; +import static org.opensearch.index.reindex.AbstractBulkByScrollRequest.AUTO_SLICES_VALUE; + +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.metadata.config.search.BulkDeleteConfiguration; +import com.linkedin.metadata.config.search.ElasticSearchConfiguration; import io.datahubproject.metadata.context.OperationContext; import java.io.IOException; +import java.time.Duration; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.core.CountRequest; +import org.opensearch.client.core.CountResponse; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.client.tasks.GetTaskRequest; +import org.opensearch.client.tasks.GetTaskResponse; +import org.opensearch.client.tasks.TaskId; +import org.opensearch.client.tasks.TaskSubmissionResponse; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.reindex.DeleteByQueryRequest; import org.opensearch.script.Script; import org.opensearch.script.ScriptType; @Slf4j @RequiredArgsConstructor public class ESWriteDAO { - + private final ElasticSearchConfiguration config; private final RestHighLevelClient searchClient; private final ESBulkProcessor bulkProcessor; - private final int numRetries; + + /** Result of a delete by query operation */ + @Data + @Builder + public static class DeleteByQueryResult { + private final long timeTaken; + private final boolean success; + private final String failureReason; + private final long remainingDocuments; + private final int retryAttempts; + private final TaskId taskId; + } /** * Updates or inserts the given search document. @@ -42,7 +74,7 @@ public class ESWriteDAO { .detectNoop(false) .docAsUpsert(true) .doc(document, XContentType.JSON) - .retryOnConflict(numRetries); + .retryOnConflict(config.getBulkProcessor().getNumRetries()); bulkProcessor.add(updateRequest); } @@ -78,7 +110,7 @@ public class ESWriteDAO { new UpdateRequest(toIndexName(opContext, entityName), docId) .detectNoop(false) .scriptedUpsert(true) - .retryOnConflict(numRetries) + .retryOnConflict(config.getBulkProcessor().getNumRetries()) .script(script) .upsert(upsert); bulkProcessor.add(updateRequest); @@ -102,6 +134,242 @@ public class ESWriteDAO { } } + /** + * Performs an async delete by query operation (non-blocking) Returns immediately with a + * CompletableFuture containing the task ID + */ + @Nonnull + public CompletableFuture deleteByQueryAsync( + @Nonnull String indexName, + @Nonnull QueryBuilder query, + @Nullable BulkDeleteConfiguration overrideConfig) { + + final BulkDeleteConfiguration finalConfig = + overrideConfig != null ? overrideConfig : config.getBulkDelete(); + + return CompletableFuture.supplyAsync( + () -> { + try { + DeleteByQueryRequest request = buildDeleteByQueryRequest(indexName, query, finalConfig); + + // Submit the task asynchronously + TaskSubmissionResponse taskId = + searchClient.submitDeleteByQueryTask(request, RequestOptions.DEFAULT); + + log.info("Started async delete by query task: {} for index: {}", taskId, indexName); + + return taskId; + } catch (IOException e) { + log.error("Failed to start async delete by query for index: {}", indexName, e); + throw new RuntimeException("Failed to start async delete by query", e); + } + }); + } + + /** + * Performs asynchronous delete by query operation with monitoring Blocks until completion and + * retries if documents remain + */ + @Nonnull + public DeleteByQueryResult deleteByQuerySync( + @Nonnull String indexName, + @Nonnull QueryBuilder query, + @Nullable BulkDeleteConfiguration overrideConfig) { + + final BulkDeleteConfiguration finalConfig = + overrideConfig != null ? overrideConfig : config.getBulkDelete(); + + long startTime = System.currentTimeMillis(); + long totalDeleted = 0; + int retryAttempts = 0; + TaskId lastTaskId = null; + + try { + // Get initial document count + long initialCount = countDocuments(indexName, query); + if (initialCount == 0) { + return DeleteByQueryResult.builder() + .timeTaken(System.currentTimeMillis() - startTime) + .success(true) + .remainingDocuments(0) + .retryAttempts(0) + .build(); + } + + log.info("Starting delete by query for index: {} with {} documents", indexName, initialCount); + + long remainingDocs = initialCount; + long previousRemainingDocs = initialCount; + + while (remainingDocs > 0 && retryAttempts < finalConfig.getNumRetries()) { + long countBeforeDelete = remainingDocs; + + // Submit delete by query task + DeleteByQueryRequest request = buildDeleteByQueryRequest(indexName, query, finalConfig); + + TaskSubmissionResponse taskSubmission = + searchClient.submitDeleteByQueryTask(request, RequestOptions.DEFAULT); + TaskId taskId = parseTaskId(taskSubmission.getTask()); + lastTaskId = taskId; + + log.info("Submitted delete by query task: {} for index: {}", taskId, indexName); + + // Monitor the task with context for proper tracking + DeleteByQueryResult iterationResult = + monitorDeleteByQueryTask(taskId, finalConfig.getTimeoutDuration(), indexName, query); + + // Calculate deleted count based on document count change + remainingDocs = iterationResult.getRemainingDocuments(); + long deleted = 0; + + if (remainingDocs >= 0 && previousRemainingDocs >= 0) { + deleted = previousRemainingDocs - remainingDocs; + if (deleted > 0) { + totalDeleted += deleted; + } + } + + previousRemainingDocs = remainingDocs; + + log.info( + "Delete by query iteration completed. Deleted: {}, Remaining: {}, Total deleted: {}", + deleted, + remainingDocs, + totalDeleted); + + // Check if we made progress + if (!iterationResult.isSuccess()) { + // Task failed or timed out + return DeleteByQueryResult.builder() + .timeTaken(System.currentTimeMillis() - startTime) + .success(false) + .failureReason(iterationResult.getFailureReason()) + .remainingDocuments(remainingDocs) + .retryAttempts(retryAttempts) + .taskId(lastTaskId) + .build(); + } else if (deleted == 0 && remainingDocs == countBeforeDelete && remainingDocs > 0) { + log.warn("Delete by query made no progress. Documents remaining: {}", remainingDocs); + if (retryAttempts < finalConfig.getNumRetries() - 1) { + retryAttempts++; + // Add a small delay before retry + Thread.sleep(finalConfig.getPollDuration().toMillis()); + } else { + // Final attempt made no progress + return DeleteByQueryResult.builder() + .timeTaken(System.currentTimeMillis() - startTime) + .success(false) + .failureReason( + "Delete operation made no progress after " + (retryAttempts + 1) + " attempts") + .remainingDocuments(remainingDocs) + .retryAttempts(retryAttempts) + .taskId(lastTaskId) + .build(); + } + } else if (remainingDocs > 0) { + retryAttempts++; + log.info( + "Retrying delete by query. Attempt {} of {}", + retryAttempts + 1, + finalConfig.getNumRetries()); + } + } + + return DeleteByQueryResult.builder() + .timeTaken(System.currentTimeMillis() - startTime) + .success(remainingDocs == 0) + .failureReason(remainingDocs > 0 ? "Documents still remaining after max retries" : null) + .remainingDocuments(remainingDocs) + .retryAttempts(retryAttempts) + .taskId(lastTaskId) + .build(); + + } catch (Exception e) { + log.error("Delete by query failed for index: {}", indexName, e); + return DeleteByQueryResult.builder() + .timeTaken(System.currentTimeMillis() - startTime) + .success(false) + .failureReason("Exception: " + e.getMessage()) + .remainingDocuments(-1) // Unknown + .retryAttempts(retryAttempts) + .taskId(lastTaskId) + .build(); + } + } + + /** + * Monitor the status of an async delete by query task For internal use with context to properly + * track deletions + */ + @VisibleForTesting + @Nonnull + public DeleteByQueryResult monitorDeleteByQueryTask( + @Nonnull TaskId taskId, + @Nullable Duration timeout, + @Nonnull String indexName, + @Nonnull QueryBuilder query) { + + Duration finalTimeout = timeout != null ? timeout : config.getBulkDelete().getTimeoutDuration(); + long startTime = System.currentTimeMillis(); + + try { + GetTaskRequest getTaskRequest = new GetTaskRequest(taskId.getNodeId(), taskId.getId()); + getTaskRequest.setWaitForCompletion(true); + getTaskRequest.setTimeout(TimeValue.timeValueMillis(finalTimeout.toMillis())); + + Optional taskResponse = + searchClient.tasks().get(getTaskRequest, RequestOptions.DEFAULT); + + if (taskResponse.isEmpty() || !taskResponse.get().isCompleted()) { + // Count remaining documents to determine if any progress was made + long remainingDocs = countDocuments(indexName, query); + + return DeleteByQueryResult.builder() + .timeTaken(System.currentTimeMillis() - startTime) + .success(false) + .failureReason("Task not completed within timeout") + .remainingDocuments(remainingDocs) + .retryAttempts(0) + .taskId(taskId) + .build(); + } + + // Task completed - count remaining documents to determine success + long remainingDocs = countDocuments(indexName, query); + + // We can't get exact delete count from task API, but we can infer success + // from whether documents remain + return DeleteByQueryResult.builder() + .timeTaken(System.currentTimeMillis() - startTime) + .success(true) // Task completed successfully + .failureReason(null) + .remainingDocuments(remainingDocs) + .retryAttempts(0) + .taskId(taskId) + .build(); + + } catch (Exception e) { + log.error("Failed to monitor delete by query task: {}", taskId, e); + + // Try to get remaining count even on error + long remainingDocs = -1; + try { + remainingDocs = countDocuments(indexName, query); + } catch (Exception countError) { + log.error("Failed to count remaining documents", countError); + } + + return DeleteByQueryResult.builder() + .timeTaken(System.currentTimeMillis() - startTime) + .success(false) + .failureReason("Monitoring failed: " + e.getMessage()) + .remainingDocuments(remainingDocs) + .retryAttempts(0) + .taskId(taskId) + .build(); + } + } + private static String toIndexName( @Nonnull OperationContext opContext, @Nonnull String entityName) { return opContext @@ -109,4 +377,52 @@ public class ESWriteDAO { .getIndexConvention() .getIndexName(opContext.getEntityRegistry().getEntitySpec(entityName)); } + + private DeleteByQueryRequest buildDeleteByQueryRequest( + @Nonnull String indexName, + @Nonnull QueryBuilder query, + @Nonnull BulkDeleteConfiguration config) { + + DeleteByQueryRequest request = new DeleteByQueryRequest(indexName); + request.setQuery(query); + request.setBatchSize(config.getBatchSize()); + + // Handle slices configuration - can be "auto" or a number + if (!AUTO_SLICES_VALUE.equalsIgnoreCase(config.getSlices())) { + try { + int sliceCount = Integer.parseInt(config.getSlices()); + request.setSlices(sliceCount); + } catch (NumberFormatException e) { + log.warn("Invalid slices value '{}', defaulting to 'auto'", config.getSlices()); + request.setSlices(AUTO_SLICES); + } + } else { + request.setSlices(AUTO_SLICES); + } + + request.setTimeout(TimeValue.timeValueMillis(config.getTimeoutDuration().toMillis())); + + // Set conflicts strategy to proceed (skip conflicting documents) + request.setConflicts("proceed"); + + return request; + } + + private long countDocuments(@Nonnull String indexName, @Nonnull QueryBuilder query) + throws IOException { + + CountRequest countRequest = new CountRequest(indexName); + countRequest.query(query); + CountResponse countResponse = searchClient.count(countRequest, RequestOptions.DEFAULT); + return countResponse.getCount(); + } + + /** Parse task ID from the task string format "nodeId:taskId" */ + private TaskId parseTaskId(String taskString) { + if (taskString == null || !taskString.contains(":")) { + throw new IllegalArgumentException("Invalid task string format: " + taskString); + } + String[] parts = taskString.split(":", 2); + return new TaskId(parts[0], Long.parseLong(parts[1])); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java index 193e3f0b0d..21f9463fb6 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java @@ -216,7 +216,8 @@ public abstract class LineageServiceTestBase extends AbstractTestNGSpringContext null, QueryFilterRewriteChain.EMPTY, TEST_SEARCH_SERVICE_CONFIG); - ESWriteDAO writeDAO = new ESWriteDAO(searchClientSpy, getBulkProcessor(), 1); + ESWriteDAO writeDAO = + new ESWriteDAO(TEST_ES_SEARCH_CONFIG, searchClientSpy, getBulkProcessor()); ElasticSearchService searchService = new ElasticSearchService( getIndexBuilder(), diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java index 5ec9b37edf..018797b013 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java @@ -137,7 +137,8 @@ public abstract class SearchServiceTestBase extends AbstractTestNGSpringContextT null, QueryFilterRewriteChain.EMPTY, TEST_SEARCH_SERVICE_CONFIG); - ESWriteDAO writeDAO = new ESWriteDAO(getSearchClient(), getBulkProcessor(), 1); + ESWriteDAO writeDAO = + new ESWriteDAO(TEST_ES_SEARCH_CONFIG, getSearchClient(), getBulkProcessor()); ElasticSearchService searchService = new ElasticSearchService( getIndexBuilder(), diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java index 4bb7857f56..b6d9432eb6 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java @@ -98,7 +98,8 @@ public abstract class TestEntityTestBase extends AbstractTestNGSpringContextTest null, QueryFilterRewriteChain.EMPTY, TEST_SEARCH_SERVICE_CONFIG); - ESWriteDAO writeDAO = new ESWriteDAO(getSearchClient(), getBulkProcessor(), 1); + ESWriteDAO writeDAO = + new ESWriteDAO(TEST_ES_SEARCH_CONFIG, getSearchClient(), getBulkProcessor()); ElasticSearchService searchService = new ElasticSearchService( getIndexBuilder(), diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/WriteDAOElasticSearchTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/WriteDAOElasticSearchTest.java new file mode 100644 index 0000000000..e2bdb46573 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/WriteDAOElasticSearchTest.java @@ -0,0 +1,49 @@ +package com.linkedin.metadata.search.elasticsearch; + +import static org.testng.Assert.assertNotNull; + +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; +import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; +import com.linkedin.metadata.search.update.WriteDAOTestBase; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.fixtures.search.SampleDataFixtureConfiguration; +import io.datahubproject.test.search.config.SearchTestContainerConfiguration; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Import; +import org.testng.annotations.Test; + +@Slf4j +@Getter +@Import({ + ElasticSearchSuite.class, + SampleDataFixtureConfiguration.class, + SearchTestContainerConfiguration.class +}) +public class WriteDAOElasticSearchTest extends WriteDAOTestBase { + + @Autowired private RestHighLevelClient searchClient; + + @Autowired private ESWriteDAO esWriteDAO; + + @Autowired + @Qualifier("sampleDataOperationContext") + protected OperationContext operationContext; + + @Autowired + @Qualifier("sampleDataEntitySearchService") + protected ElasticSearchService entitySearchService; + + @Autowired + @Qualifier("sampleDataGraphService") + protected ElasticSearchGraphService graphService; + + @Test + public void initTest() { + assertNotNull(searchClient); + assertNotNull(esWriteDAO); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/opensearch/WriteDAOOpenSearchTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/opensearch/WriteDAOOpenSearchTest.java new file mode 100644 index 0000000000..f6515655f4 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/opensearch/WriteDAOOpenSearchTest.java @@ -0,0 +1,50 @@ +package com.linkedin.metadata.search.opensearch; + +import static org.testng.Assert.assertNotNull; + +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; +import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; +import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; +import com.linkedin.metadata.search.update.WriteDAOTestBase; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.fixtures.search.SampleDataFixtureConfiguration; +import io.datahubproject.test.search.config.SearchTestContainerConfiguration; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Import; +import org.testng.annotations.Test; + +@Slf4j +@Getter +@Import({ + OpenSearchSuite.class, + SampleDataFixtureConfiguration.class, + SearchTestContainerConfiguration.class +}) +public class WriteDAOOpenSearchTest extends WriteDAOTestBase { + + @Autowired private RestHighLevelClient searchClient; + + @Autowired private ESWriteDAO esWriteDAO; + + @Autowired + @Qualifier("sampleDataOperationContext") + protected OperationContext operationContext; + + @Autowired + @Qualifier("sampleDataEntitySearchService") + protected ElasticSearchService entitySearchService; + + @Autowired + @Qualifier("sampleDataGraphService") + protected ElasticSearchGraphService graphService; + + @Test + public void initTest() { + assertNotNull(searchClient); + assertNotNull(esWriteDAO); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java index a579b98e32..678d3631ac 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java @@ -1,21 +1,32 @@ package com.linkedin.metadata.search.update; +import static io.datahubproject.test.search.SearchTestUtils.TEST_ES_SEARCH_CONFIG; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.AssertJUnit.assertFalse; -import static org.testng.AssertJUnit.assertTrue; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import com.linkedin.metadata.config.search.BulkDeleteConfiguration; +import com.linkedin.metadata.config.search.BulkProcessorConfiguration; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; import java.io.IOException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -23,16 +34,32 @@ import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.TasksClient; +import org.opensearch.client.core.CountRequest; +import org.opensearch.client.core.CountResponse; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.client.tasks.GetTaskRequest; +import org.opensearch.client.tasks.GetTaskResponse; +import org.opensearch.client.tasks.TaskId; +import org.opensearch.client.tasks.TaskSubmissionResponse; +import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.reindex.DeleteByQueryRequest; import org.opensearch.script.Script; import org.opensearch.script.ScriptType; +import org.opensearch.tasks.TaskInfo; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class ESWriteDAOTest { + private static final String TEST_DELETE_INDEX = "test_index"; + private static final String TEST_NODE_ID = "node1"; + private static final long TEST_TASK_ID = 12345L; + private static final String TEST_TASK_STRING = TEST_NODE_ID + ":" + TEST_TASK_ID; + private static final String TEST_ENTITY = "dataset"; private static final String TEST_DOC_ID = "testDocId"; private static final String TEST_INDEX = "datasetindex_v2"; @@ -40,9 +67,8 @@ public class ESWriteDAOTest { private static final String TEST_PATTERN = "*index_v2"; @Mock private RestHighLevelClient mockSearchClient; - @Mock private ESBulkProcessor mockBulkProcessor; - + @Mock private TasksClient mockTasksClient; @Mock private org.opensearch.client.IndicesClient mockIndicesClient; private final OperationContext opContext = TestOperationContexts.systemContextNoValidate(); @@ -56,7 +82,16 @@ public class ESWriteDAOTest { // Setup mock indices client when(mockSearchClient.indices()).thenReturn(mockIndicesClient); - esWriteDAO = new ESWriteDAO(mockSearchClient, mockBulkProcessor, NUM_RETRIES); + // Setup mock tasks client + when(mockSearchClient.tasks()).thenReturn(mockTasksClient); + + esWriteDAO = + new ESWriteDAO( + TEST_ES_SEARCH_CONFIG.toBuilder() + .bulkProcessor(BulkProcessorConfiguration.builder().numRetries(NUM_RETRIES).build()) + .build(), + mockSearchClient, + mockBulkProcessor); } @Test @@ -173,4 +208,685 @@ public class ESWriteDAOTest { public void testUpsertDocumentWithNullDocument() { esWriteDAO.upsertDocument(opContext, TEST_ENTITY, null, TEST_DOC_ID); } + + @Test + public void testDeleteByQueryAsyncSuccess() + throws IOException, ExecutionException, InterruptedException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + TaskSubmissionResponse mockResponse = mock(TaskSubmissionResponse.class); + when(mockResponse.getTask()).thenReturn(TEST_TASK_STRING); + + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockResponse); + + CompletableFuture future = + esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, null); + TaskSubmissionResponse result = future.get(); + + assertNotNull(result); + assertEquals(result.getTask(), TEST_TASK_STRING); + + // Verify the request + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(DeleteByQueryRequest.class); + verify(mockSearchClient) + .submitDeleteByQueryTask(requestCaptor.capture(), eq(RequestOptions.DEFAULT)); + + DeleteByQueryRequest capturedRequest = requestCaptor.getValue(); + assertEquals(capturedRequest.indices()[0], TEST_DELETE_INDEX); + assertEquals(capturedRequest.getBatchSize(), 1000); + assertEquals(capturedRequest.getSearchRequest().source().query(), query); + } + + @Test + public void testDeleteByQueryAsyncWithCustomConfig() + throws IOException, ExecutionException, InterruptedException { + QueryBuilder query = QueryBuilders.matchAllQuery(); + BulkDeleteConfiguration customConfig = + BulkDeleteConfiguration.builder() + .batchSize(5000) + .slices("10") + .timeout(1) + .timeoutUnit("HOURS") + .build(); + + TaskSubmissionResponse mockResponse = mock(TaskSubmissionResponse.class); + when(mockResponse.getTask()).thenReturn(TEST_TASK_STRING); + + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockResponse); + + CompletableFuture future = + esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, customConfig); + TaskSubmissionResponse result = future.get(); + + assertNotNull(result); + + // Verify custom config is applied + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(DeleteByQueryRequest.class); + verify(mockSearchClient) + .submitDeleteByQueryTask(requestCaptor.capture(), eq(RequestOptions.DEFAULT)); + + DeleteByQueryRequest capturedRequest = requestCaptor.getValue(); + assertEquals(capturedRequest.getBatchSize(), 5000); + // Note: We can't directly verify slices as it's set internally + } + + @Test + public void testDeleteByQueryAsyncIOException() throws IOException { + QueryBuilder query = QueryBuilders.termQuery("field", "value"); + + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenThrow(new IOException("Network error")); + + CompletableFuture future = + esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, null); + + try { + future.get(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof RuntimeException); + assertTrue(e.getCause().getMessage().contains("Failed to start async delete by query")); + } catch (InterruptedException e) { + fail("Unexpected InterruptedException"); + } + } + + @Test + public void testDeleteByQuerySyncNoDocuments() throws IOException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + + // Mock count response with 0 documents + CountResponse mockCountResponse = mock(CountResponse.class); + when(mockCountResponse.getCount()).thenReturn(0L); + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockCountResponse); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertTrue(result.isSuccess()); + assertEquals(result.getRemainingDocuments(), 0); + assertEquals(result.getRetryAttempts(), 0); + assertNull(result.getFailureReason()); + } + + @Test + public void testDeleteByQuerySyncSuccessfulDeletion() throws IOException, InterruptedException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + + // Mock count responses + CountResponse initialCount = mock(CountResponse.class); + when(initialCount.getCount()).thenReturn(100L); + + CountResponse afterDeleteCount = mock(CountResponse.class); + when(afterDeleteCount.getCount()).thenReturn(0L); + + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(initialCount) + .thenReturn(afterDeleteCount); + + // Mock task submission + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING); + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + // Mock task monitoring + GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class); + when(mockTaskResponse.isCompleted()).thenReturn(true); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskResponse.getTaskInfo()).thenReturn(mockTaskInfo); + + when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(Optional.of(mockTaskResponse)); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertTrue(result.isSuccess()); + assertEquals(result.getRemainingDocuments(), 0); + assertNull(result.getFailureReason()); + } + + @Test + public void testDeleteByQuerySyncWithRetries() throws IOException, InterruptedException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + + // Mock count responses - documents remain after first attempt + CountResponse count100 = mock(CountResponse.class); + when(count100.getCount()).thenReturn(100L); + + CountResponse count50 = mock(CountResponse.class); + when(count50.getCount()).thenReturn(50L); + + CountResponse count0 = mock(CountResponse.class); + when(count0.getCount()).thenReturn(0L); + + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(count100) // Initial count + .thenReturn(count50) // After first delete + .thenReturn(count0); // After second delete + + // Mock task submissions + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING); + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + // Mock task monitoring + GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class); + when(mockTaskResponse.isCompleted()).thenReturn(true); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskResponse.getTaskInfo()).thenReturn(mockTaskInfo); + + when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(Optional.of(mockTaskResponse)); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertTrue(result.isSuccess()); + assertEquals(result.getRemainingDocuments(), 0); + assertEquals(result.getRetryAttempts(), 1); // One retry was needed + assertNull(result.getFailureReason()); + + // Verify two delete operations were performed + verify(mockSearchClient, times(2)) + .submitDeleteByQueryTask(any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)); + } + + @Test + public void testDeleteByQuerySyncNoProgress() throws IOException, InterruptedException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + + // Mock count responses - no progress made + CountResponse count100 = mock(CountResponse.class); + when(count100.getCount()).thenReturn(100L); + + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(count100); // Always returns 100 + + // Mock task submission + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING); + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + // Mock task monitoring with no deletions + GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class); + when(mockTaskResponse.isCompleted()).thenReturn(true); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskResponse.getTaskInfo()).thenReturn(mockTaskInfo); + + when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(Optional.of(mockTaskResponse)); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertFalse(result.isSuccess()); + assertEquals(result.getRemainingDocuments(), 100); + assertTrue(result.getFailureReason().contains("no progress")); + } + + @Test + public void testDeleteByQuerySyncException() throws IOException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + + // Mock initial count + CountResponse mockCountResponse = mock(CountResponse.class); + when(mockCountResponse.getCount()).thenReturn(100L); + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockCountResponse); + + // Mock exception during task submission + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenThrow(new IOException("Connection failed")); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertFalse(result.isSuccess()); + assertEquals(result.getRemainingDocuments(), -1); // Unknown + assertTrue(result.getFailureReason().contains("Exception")); + } + + @Test + public void testParseTaskIdValid() { + // This would be a private method test, but we can test it indirectly through deleteByQuerySync + QueryBuilder query = QueryBuilders.termQuery("field", "value"); + + try { + // Setup mocks for a basic flow + CountResponse mockCount = mock(CountResponse.class); + when(mockCount.getCount()).thenReturn(100L).thenReturn(0L); + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockCount); + + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn("node123:456789"); + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class); + when(mockTaskResponse.isCompleted()).thenReturn(true); + when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(Optional.of(mockTaskResponse)); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + // If we get here without exception, parsing worked + assertNotNull(result); + } catch (Exception e) { + fail("Should not throw exception for valid task ID format"); + } + } + + @Test + public void testDeleteByQueryWithInvalidSlicesConfig() + throws IOException, ExecutionException, InterruptedException { + QueryBuilder query = QueryBuilders.matchAllQuery(); + BulkDeleteConfiguration customConfig = + BulkDeleteConfiguration.builder() + .batchSize(1000) + .slices("invalid-number") + .timeout(30) + .timeoutUnit("MINUTES") + .build(); + + TaskSubmissionResponse mockResponse = mock(TaskSubmissionResponse.class); + when(mockResponse.getTask()).thenReturn(TEST_TASK_STRING); + + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockResponse); + + // Should handle invalid slices gracefully and default to auto + CompletableFuture future = + esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, customConfig); + TaskSubmissionResponse result = future.get(); + + assertNotNull(result); + verify(mockSearchClient) + .submitDeleteByQueryTask(any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)); + } + + @Test + public void testDeleteByQueryWithAutoSlices() + throws IOException, ExecutionException, InterruptedException { + QueryBuilder query = QueryBuilders.matchAllQuery(); + BulkDeleteConfiguration customConfig = + BulkDeleteConfiguration.builder() + .batchSize(2000) + .slices("auto") // Explicitly set to auto + .timeout(45) + .timeoutUnit("MINUTES") + .build(); + + TaskSubmissionResponse mockResponse = mock(TaskSubmissionResponse.class); + when(mockResponse.getTask()).thenReturn(TEST_TASK_STRING); + + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockResponse); + + CompletableFuture future = + esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, customConfig); + TaskSubmissionResponse result = future.get(); + + assertNotNull(result); + + // Verify the request configuration + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(DeleteByQueryRequest.class); + verify(mockSearchClient) + .submitDeleteByQueryTask(requestCaptor.capture(), eq(RequestOptions.DEFAULT)); + + DeleteByQueryRequest capturedRequest = requestCaptor.getValue(); + assertEquals(capturedRequest.getBatchSize(), 2000); + assertEquals(capturedRequest.getTimeout(), TimeValue.timeValueMinutes(45)); + } + + @Test + public void testDeleteByQuerySyncTaskFailure() throws IOException, InterruptedException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + + // Mock initial count + CountResponse initialCount = mock(CountResponse.class); + when(initialCount.getCount()).thenReturn(100L); + + CountResponse afterFailureCount = mock(CountResponse.class); + when(afterFailureCount.getCount()).thenReturn(90L); // Some documents deleted before failure + + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(initialCount) + .thenReturn(afterFailureCount); + + // Mock task submission + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING); + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + // Mock task monitoring with incomplete/failed task + GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class); + when(mockTaskResponse.isCompleted()).thenReturn(false); // Task didn't complete + + when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(Optional.of(mockTaskResponse)); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertFalse(result.isSuccess()); + assertEquals(result.getRemainingDocuments(), 90); + assertTrue(result.getFailureReason().contains("Task not completed within timeout")); + } + + @Test + public void testDeleteByQuerySyncMaxRetriesWithPartialProgress() + throws IOException, InterruptedException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + + // Mock count responses - partial progress on each attempt + CountResponse count100 = mock(CountResponse.class); + when(count100.getCount()).thenReturn(100L); + + CountResponse count80 = mock(CountResponse.class); + when(count80.getCount()).thenReturn(80L); + + CountResponse count60 = mock(CountResponse.class); + when(count60.getCount()).thenReturn(60L); + + CountResponse count40 = mock(CountResponse.class); + when(count40.getCount()).thenReturn(40L); + + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(count100) // Initial + .thenReturn(count80) // After 1st delete + .thenReturn(count60) // After 2nd delete + .thenReturn(count40); // After 3rd delete (max retries) + + // Mock task submissions + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING); + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + // Mock successful task completions + GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class); + when(mockTaskResponse.isCompleted()).thenReturn(true); + + when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(Optional.of(mockTaskResponse)); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertFalse(result.isSuccess()); // Failed because documents remain + assertEquals(result.getRemainingDocuments(), 40); + assertEquals(result.getRetryAttempts(), 3); // 3 total attempts + assertTrue(result.getFailureReason().contains("Documents still remaining after max retries")); + + // Verify 3 delete operations were performed (initial + 2 retries) + verify(mockSearchClient, times(3)) + .submitDeleteByQueryTask(any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT)); + } + + @Test + public void testMonitorDeleteByQueryTaskWithCountException() throws IOException { + TaskId taskId = new TaskId(TEST_NODE_ID, TEST_TASK_ID); + + // Mock task API throwing exception + when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT))) + .thenThrow(new IOException("Task API error")); + + // Also make count fail when trying to get remaining documents + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenThrow(new IOException("Count API error")); + + // Note: This tests the internal monitor method indirectly through deleteByQuerySync + QueryBuilder query = QueryBuilders.termQuery("field", "value"); + + // Setup initial count to trigger the flow + CountResponse initialCount = mock(CountResponse.class); + when(initialCount.getCount()).thenReturn(100L); + + // First count succeeds, subsequent counts fail + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(initialCount) + .thenThrow(new IOException("Count API error")); + + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING); + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertFalse(result.isSuccess()); + assertEquals(result.getRemainingDocuments(), -1); // Unknown due to count failure + assertTrue(result.getFailureReason().contains("Monitoring failed")); + } + + @Test + public void testParseTaskIdInvalidFormats() { + QueryBuilder query = QueryBuilders.termQuery("field", "value"); + + // Test null task string + try { + CountResponse mockCount = mock(CountResponse.class); + when(mockCount.getCount()).thenReturn(100L); + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockCount); + + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn(null); // Null task string + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertFalse(result.isSuccess()); + assertTrue(result.getFailureReason().contains("Invalid task string format")); + } catch (Exception e) { + // Expected + } + + // Test task string without colon + try { + CountResponse mockCount = mock(CountResponse.class); + when(mockCount.getCount()).thenReturn(100L); + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockCount); + + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn("invalidformat"); // No colon + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertFalse(result.isSuccess()); + assertTrue(result.getFailureReason().contains("Invalid task string format")); + } catch (Exception e) { + // Expected + } + } + + @Test + public void testDeleteByQuerySyncInterruptedException() throws IOException, InterruptedException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + + // Mock count responses + CountResponse count100 = mock(CountResponse.class); + when(count100.getCount()).thenReturn(100L); + + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(count100); + + // Mock task submission + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING); + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + // Mock task monitoring that takes time + GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class); + when(mockTaskResponse.isCompleted()).thenReturn(true); + + // Simulate interruption during task monitoring + when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT))) + .thenAnswer( + invocation -> { + Thread.currentThread().interrupt(); + throw new InterruptedException("Thread interrupted"); + }); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertFalse(result.isSuccess()); + assertTrue(result.getFailureReason().contains("interrupted")); + + // Verify thread interrupted status is preserved + assertTrue(Thread.currentThread().isInterrupted()); + Thread.interrupted(); // Clear interrupt status for next tests + } + + @Test + public void testDeleteByQuerySyncEmptyTaskResponse() throws IOException, InterruptedException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + + // Mock initial count + CountResponse initialCount = mock(CountResponse.class); + when(initialCount.getCount()).thenReturn(50L); + + CountResponse afterCount = mock(CountResponse.class); + when(afterCount.getCount()).thenReturn(50L); // No change + + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(initialCount) + .thenReturn(afterCount); + + // Mock task submission + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING); + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + // Return empty Optional for task response + when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(Optional.empty()); + + ESWriteDAO.DeleteByQueryResult result = + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, null); + + assertFalse(result.isSuccess()); + assertEquals(result.getRemainingDocuments(), 50); + assertTrue(result.getFailureReason().contains("Task not completed within timeout")); + } + + @Test + public void testMonitorTaskRequestParameters() throws IOException { + QueryBuilder query = QueryBuilders.termQuery("status", "deleted"); + Duration customTimeout = Duration.ofMinutes(10); + + BulkDeleteConfiguration customConfig = + BulkDeleteConfiguration.builder() + .batchSize(1000) + .slices("5") + .timeout(10) + .timeoutUnit("MINUTES") + .pollInterval(2) + .pollIntervalUnit("SECONDS") + .numRetries(1) + .build(); + + // Setup initial count + CountResponse initialCount = mock(CountResponse.class); + when(initialCount.getCount()).thenReturn(50L); + CountResponse afterCount = mock(CountResponse.class); + when(afterCount.getCount()).thenReturn(0L); + + when(mockSearchClient.count(any(CountRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(initialCount) + .thenReturn(afterCount); + + // Mock task submission + TaskSubmissionResponse mockSubmission = mock(TaskSubmissionResponse.class); + when(mockSubmission.getTask()).thenReturn(TEST_TASK_STRING); + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockSubmission); + + // Mock task response + GetTaskResponse mockTaskResponse = mock(GetTaskResponse.class); + when(mockTaskResponse.isCompleted()).thenReturn(true); + when(mockTasksClient.get(any(GetTaskRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(Optional.of(mockTaskResponse)); + + esWriteDAO.deleteByQuerySync(TEST_DELETE_INDEX, query, customConfig); + + // Verify GetTaskRequest parameters + ArgumentCaptor taskRequestCaptor = + ArgumentCaptor.forClass(GetTaskRequest.class); + verify(mockTasksClient).get(taskRequestCaptor.capture(), eq(RequestOptions.DEFAULT)); + + GetTaskRequest capturedRequest = taskRequestCaptor.getValue(); + assertEquals(capturedRequest.getNodeId(), TEST_NODE_ID); + assertEquals(capturedRequest.getTaskId(), TEST_TASK_ID); + assertTrue(capturedRequest.getWaitForCompletion()); + assertEquals(capturedRequest.getTimeout(), TimeValue.timeValueMinutes(10)); + } + + @Test + public void testDeleteByQueryRequestConflictsStrategy() + throws IOException, ExecutionException, InterruptedException { + QueryBuilder query = QueryBuilders.matchAllQuery(); + + TaskSubmissionResponse mockResponse = mock(TaskSubmissionResponse.class); + when(mockResponse.getTask()).thenReturn(TEST_TASK_STRING); + + when(mockSearchClient.submitDeleteByQueryTask( + any(DeleteByQueryRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockResponse); + + CompletableFuture future = + esWriteDAO.deleteByQueryAsync(TEST_DELETE_INDEX, query, null); + future.get(); + + // Verify conflicts strategy is set to "proceed" + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(DeleteByQueryRequest.class); + verify(mockSearchClient) + .submitDeleteByQueryTask(requestCaptor.capture(), eq(RequestOptions.DEFAULT)); + + DeleteByQueryRequest capturedRequest = requestCaptor.getValue(); + // Note: We can't directly verify conflicts setting as it's not exposed via getter + // but we know it's set to "proceed" in the implementation + assertNotNull(capturedRequest); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/update/WriteDAOTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/update/WriteDAOTestBase.java new file mode 100644 index 0000000000..9f6c1142e5 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/update/WriteDAOTestBase.java @@ -0,0 +1,144 @@ +package com.linkedin.metadata.search.update; + +import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static com.linkedin.metadata.Constants.QUERY_ENTITY_NAME; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.aspect.models.graph.Edge; +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; +import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; +import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; +import io.datahubproject.metadata.context.OperationContext; +import java.time.Duration; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.core.CountRequest; +import org.opensearch.client.core.CountResponse; +import org.opensearch.client.tasks.TaskId; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.TermQueryBuilder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +@Slf4j +public abstract class WriteDAOTestBase extends AbstractTestNGSpringContextTests { + protected static final String TEST_RELATIONSHIP_TYPE = "IsAssociatedWith"; + + @Autowired private ESBulkProcessor bulkProcessor; + + protected abstract RestHighLevelClient getSearchClient(); + + protected abstract OperationContext getOperationContext(); + + protected abstract ElasticSearchGraphService getGraphService(); + + protected abstract ESWriteDAO getEsWriteDAO(); + + @Test + public void testDeleteByQueryAsyncOnGraphIndex() throws Exception { + String testRunId = UUID.randomUUID().toString(); + String indexName = + getOperationContext() + .getSearchContext() + .getIndexConvention() + .getIndexName(ElasticSearchGraphService.INDEX_NAME); + + // Create fewer edges for async test + int totalEdges = 8; + createTestEdges(testRunId, totalEdges); + + // Verify creation + long initialCount = countTestEdges(indexName, testRunId); + assertEquals(initialCount, totalEdges); + + // Build query + TermQueryBuilder query = + QueryBuilders.termQuery( + "destination.urn", + "urn:li:" + DATASET_ENTITY_NAME + ":(urn:li:dataPlatform:test," + testRunId + ",PROD)"); + + // Submit async delete + var taskFuture = getEsWriteDAO().deleteByQueryAsync(indexName, query, null); + var taskSubmission = taskFuture.get(30, TimeUnit.SECONDS); + + assertNotNull(taskSubmission); + assertNotNull(taskSubmission.getTask()); + + // Parse task ID + String[] taskParts = taskSubmission.getTask().split(":"); + TaskId taskId = new TaskId(taskParts[0], Long.parseLong(taskParts[1])); + + // Monitor the task + ESWriteDAO.DeleteByQueryResult monitorResult = + getEsWriteDAO().monitorDeleteByQueryTask(taskId, Duration.ofMinutes(1), indexName, query); + + assertTrue(monitorResult.isSuccess(), "Async task should complete successfully"); + + // Wait and verify deletion + Thread.sleep(2000); + long finalCount = countTestEdges(indexName, testRunId); + assertEquals(finalCount, 0, "All test edges should be deleted asynchronously"); + } + + private void createTestEdges(String testRunId, int count) throws InterruptedException { + for (int i = 0; i < count; i++) { + try { + // Create unique dataset URN for source + Urn destUrn = + Urn.createFromString( + "urn:li:" + + DATASET_ENTITY_NAME + + ":(urn:li:dataPlatform:test," + + testRunId + + ",PROD)"); + + // Create query URN for destination + Urn sourceUrn = + Urn.createFromString( + "urn:li:" + QUERY_ENTITY_NAME + ":test_query_" + testRunId + "_" + i); + + Edge edge = + new Edge( + sourceUrn, + destUrn, + TEST_RELATIONSHIP_TYPE, + System.currentTimeMillis(), + Urn.createFromString("urn:li:corpuser:test"), + System.currentTimeMillis(), + Urn.createFromString("urn:li:corpuser:test"), + Map.of()); + + getGraphService().addEdge(edge); + } catch (Exception e) { + throw new RuntimeException("Failed to create test edge", e); + } + } + bulkProcessor.flush(); + + // Wait for indexing + Thread.sleep(2000); + } + + private long countTestEdges(String indexName, String testRunId) throws Exception { + CountRequest countRequest = new CountRequest(indexName); + countRequest.query( + QueryBuilders.termQuery( + "destination.urn", + "urn:li:" + + DATASET_ENTITY_NAME + + ":(urn:li:dataPlatform:test," + + testRunId + + ",PROD)")); + + CountResponse response = getSearchClient().count(countRequest, RequestOptions.DEFAULT); + return response.getCount(); + } +} diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java index e55b817370..810302141a 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java @@ -2,6 +2,7 @@ package io.datahubproject.test.fixtures.search; import static com.linkedin.metadata.Constants.*; import static io.datahubproject.test.search.SearchTestUtils.TEST_ES_SEARCH_CONFIG; +import static io.datahubproject.test.search.SearchTestUtils.TEST_GRAPH_SERVICE_CONFIG; import static io.datahubproject.test.search.SearchTestUtils.TEST_SEARCH_SERVICE_CONFIG; import static io.datahubproject.test.search.config.SearchTestContainerConfiguration.REFRESH_INTERVAL_SECONDS; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -20,6 +21,9 @@ import com.linkedin.metadata.config.search.custom.CustomSearchConfiguration; import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityAspectIdentifier; import com.linkedin.metadata.entity.EntityServiceImpl; +import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO; +import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO; +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; import com.linkedin.metadata.search.SearchService; import com.linkedin.metadata.search.cache.EntityDocCountCache; import com.linkedin.metadata.search.client.CachingEntitySearchService; @@ -137,22 +141,22 @@ public class SampleDataFixtureConfiguration { .build(testOpContext.getSessionAuthentication(), true); } - protected ElasticSearchService entitySearchServiceHelper(OperationContext opContext) - throws IOException { + @Bean + protected ESWriteDAO esWriteDAO() { + return new ESWriteDAO(TEST_ES_SEARCH_CONFIG, _searchClient, _bulkProcessor); + } + + @Bean("sampleDataESIndexBuilder") + protected ESIndexBuilder esIndexBuilder() { GitVersion gitVersion = new GitVersion("0.0.0-test", "123456", Optional.empty()); - ESIndexBuilder indexBuilder = - new ESIndexBuilder( - _searchClient, - 1, - 0, - 1, - 1, - Map.of(), - true, - false, - false, - TEST_ES_SEARCH_CONFIG, - gitVersion); + return new ESIndexBuilder( + _searchClient, 1, 0, 1, 1, Map.of(), true, false, false, TEST_ES_SEARCH_CONFIG, gitVersion); + } + + protected ElasticSearchService entitySearchServiceHelper( + OperationContext opContext, ESWriteDAO esWriteDAO, ESIndexBuilder indexBuilder) + throws IOException { + IndexConfiguration indexConfiguration = new IndexConfiguration(); indexConfiguration.setMinSearchFilterLength(3); SettingsBuilder settingsBuilder = new SettingsBuilder(null, indexConfiguration); @@ -173,7 +177,7 @@ public class SampleDataFixtureConfiguration { _customSearchConfiguration, queryFilterRewriteChain, TEST_SEARCH_SERVICE_CONFIG); - ESWriteDAO writeDAO = new ESWriteDAO(_searchClient, _bulkProcessor, 1); + return new ElasticSearchService( indexBuilder, opContext.getEntityRegistry(), @@ -182,20 +186,52 @@ public class SampleDataFixtureConfiguration { TEST_SEARCH_SERVICE_CONFIG, searchDAO, browseDAO, - writeDAO); + esWriteDAO); + } + + @Bean(name = "sampleDataGraphService") + @Nonnull + protected ElasticSearchGraphService graphService( + @Qualifier("sampleDataOperationContext") OperationContext opContext, + @Qualifier("sampleDataESIndexBuilder") ESIndexBuilder indexBuilder) { + + IndexConvention indexConvention = opContext.getSearchContext().getIndexConvention(); + ElasticSearchGraphService graphService = + new ElasticSearchGraphService( + opContext.getLineageRegistry(), + _bulkProcessor, + indexConvention, + new ESGraphWriteDAO( + indexConvention, _bulkProcessor, 1, TEST_ES_SEARCH_CONFIG.getSearch().getGraph()), + new ESGraphQueryDAO( + _searchClient, + opContext.getLineageRegistry(), + indexConvention, + TEST_GRAPH_SERVICE_CONFIG, + TEST_ES_SEARCH_CONFIG, + null), + indexBuilder, + indexConvention.getIdHashAlgo()); + graphService.reindexAll(Collections.emptySet()); + return graphService; } @Bean(name = "sampleDataEntitySearchService") protected ElasticSearchService entitySearchService( - @Qualifier("sampleDataOperationContext") OperationContext opContext) throws IOException { - return entitySearchServiceHelper(opContext); + @Qualifier("sampleDataOperationContext") OperationContext opContext, + ESWriteDAO esWriteDAO, + @Qualifier("sampleDataESIndexBuilder") ESIndexBuilder indexBuilder) + throws IOException { + return entitySearchServiceHelper(opContext, esWriteDAO, indexBuilder); } @Bean(name = "longTailEntitySearchService") protected ElasticSearchService longTailEntitySearchService( - @Qualifier("longTailOperationContext") OperationContext longtailOperationContext) + @Qualifier("longTailOperationContext") OperationContext longtailOperationContext, + ESWriteDAO esWriteDAO, + @Qualifier("sampleDataESIndexBuilder") ESIndexBuilder indexBuilder) throws IOException { - return entitySearchServiceHelper(longtailOperationContext); + return entitySearchServiceHelper(longtailOperationContext, esWriteDAO, indexBuilder); } @Bean(name = "sampleDataSearchService") diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java index 9424f3f579..2232d8eac9 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java @@ -145,7 +145,7 @@ public class SearchLineageFixtureConfiguration { customSearchConfiguration, queryFilterRewriteChain, TEST_SEARCH_SERVICE_CONFIG); - ESWriteDAO writeDAO = new ESWriteDAO(searchClient, bulkProcessor, 1); + ESWriteDAO writeDAO = new ESWriteDAO(TEST_ES_SEARCH_CONFIG, searchClient, bulkProcessor); return new ElasticSearchService( indexBuilder, diff --git a/metadata-io/src/testFixtures/java/io/datahubproject/test/search/SearchTestUtils.java b/metadata-io/src/testFixtures/java/io/datahubproject/test/search/SearchTestUtils.java index f7f198b777..a38fa2c0c1 100644 --- a/metadata-io/src/testFixtures/java/io/datahubproject/test/search/SearchTestUtils.java +++ b/metadata-io/src/testFixtures/java/io/datahubproject/test/search/SearchTestUtils.java @@ -16,6 +16,8 @@ import com.linkedin.metadata.config.SystemMetadataServiceConfig; import com.linkedin.metadata.config.TimeseriesAspectServiceConfig; import com.linkedin.metadata.config.graph.GraphServiceConfiguration; import com.linkedin.metadata.config.search.BuildIndicesConfiguration; +import com.linkedin.metadata.config.search.BulkDeleteConfiguration; +import com.linkedin.metadata.config.search.BulkProcessorConfiguration; import com.linkedin.metadata.config.search.ElasticSearchConfiguration; import com.linkedin.metadata.config.search.GraphQueryConfiguration; import com.linkedin.metadata.config.search.SearchConfiguration; @@ -77,6 +79,17 @@ public class SearchTestUtils { }); } }) + .bulkProcessor(BulkProcessorConfiguration.builder().numRetries(1).build()) + .bulkDelete( + BulkDeleteConfiguration.builder() + .batchSize(1000) + .slices("auto") + .numRetries(3) + .timeout(30) + .timeoutUnit("MINUTES") + .pollInterval(1) + .pollIntervalUnit("SECONDS") + .build()) .buildIndices( BuildIndicesConfiguration.builder().reindexOptimizationEnabled(true).build()) .build(); diff --git a/metadata-models/src/main/pegasus/com/linkedin/query/QuerySubject.pdl b/metadata-models/src/main/pegasus/com/linkedin/query/QuerySubject.pdl index c65e9191dd..0d2fd86a79 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/query/QuerySubject.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/query/QuerySubject.pdl @@ -11,10 +11,6 @@ record QuerySubject { /** * An entity which is the subject of a query. */ - @Relationship = { - "name": "IsAssociatedWith", - "entityTypes": [ "dataset", "schemaField" ] - } @Searchable = { "fieldName": "entities", "fieldType": "URN", diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BulkDeleteConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BulkDeleteConfiguration.java new file mode 100644 index 0000000000..c723d34aeb --- /dev/null +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BulkDeleteConfiguration.java @@ -0,0 +1,32 @@ +package com.linkedin.metadata.config.search; + +import com.linkedin.metadata.utils.ParseUtils; +import java.time.Duration; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Data +@Builder(toBuilder = true) +@AllArgsConstructor +@NoArgsConstructor +public class BulkDeleteConfiguration { + private int batchSize; + private String slices; + private int pollInterval; + private String pollIntervalUnit; + private int timeout; + private String timeoutUnit; + private int numRetries; + + public Duration getPollDuration() { + return ParseUtils.parseDuration(pollInterval, pollIntervalUnit); + } + + public Duration getTimeoutDuration() { + return ParseUtils.parseDuration(timeout, timeoutUnit); + } +} diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BulkProcessorConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BulkProcessorConfiguration.java new file mode 100644 index 0000000000..49d7b34b28 --- /dev/null +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/BulkProcessorConfiguration.java @@ -0,0 +1,16 @@ +package com.linkedin.metadata.config.search; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class BulkProcessorConfiguration { + private int numRetries; +} diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/ElasticSearchConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/ElasticSearchConfiguration.java index 6cbb313f0e..05b69c0704 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/ElasticSearchConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/ElasticSearchConfiguration.java @@ -10,6 +10,8 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @Builder(toBuilder = true) public class ElasticSearchConfiguration { + private BulkDeleteConfiguration bulkDelete; + private BulkProcessorConfiguration bulkProcessor; private BuildIndicesConfiguration buildIndices; public String implementation; private SearchConfiguration search; diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/utils/ParseUtils.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/utils/ParseUtils.java new file mode 100644 index 0000000000..b76e86d15d --- /dev/null +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/utils/ParseUtils.java @@ -0,0 +1,13 @@ +package com.linkedin.metadata.utils; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +public class ParseUtils { + private ParseUtils() {} + + public static Duration parseDuration(int value, String unit) { + TimeUnit timeUnit = unit != null ? TimeUnit.valueOf(unit.toUpperCase()) : TimeUnit.SECONDS; + return Duration.of(value, timeUnit.toChronoUnit()); + } +} diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 17698adcb5..904bc814bf 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -250,6 +250,14 @@ elasticsearch: keyStoreType: ${ELASTICSEARCH_SSL_KEYSTORE_TYPE:#{null}} keyStorePassword: ${ELASTICSEARCH_SSL_KEYSTORE_PASSWORD:#{null}} keyPassword: ${ELASTICSEARCH_SSL_KEY_PASSWORD:#{null}} + bulkDelete: + batchSize: ${ES_BULK_DELETE_BATCH_SIZE:5000} + slices: ${ES_BULK_DELETE_SLICES:auto} + pollInterval: ${ES_BULK_DELETE_POLL_INTERVAL:30} + pollIntervalUnit: ${ES_BULK_DELETE_POLL_UNIT:SECONDS} + timeout: ${ES_BULK_DELETE_TIMEOUT:30} + timeoutUnit: ${ES_BULK_DELETE_TIMEOUT_UNIT:MINUTES} + numRetries: ${ES_BULK_DELETE_NUM_RETRIES:3} bulkProcessor: async: ${ES_BULK_ASYNC:true} requestsLimit: ${ES_BULK_REQUESTS_LIMIT:1000} @@ -577,6 +585,9 @@ systemUpdate: batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_PROPERTY_DEFINITIONS_BATCH_SIZE:500} delayMs: ${BOOTSTRAP_SYSTEM_UPDATE_PROPERTY_DEFINITIONS_DELAY_MS:1000} limit: ${BOOTSTRAP_SYSTEM_UPDATE_PROPERTY_DEFINITIONS_CLL_LIMIT:0} + removeQueryEdges: + enabled: ${BOOTSTRAP_SYSTEM_UPDATE_REMOVE_QUERY_EDGES_ENABLED:true} + numRetries: ${BOOTSTRAP_SYSTEM_UPDATE_REMOVE_QUERY_EDGES_RETRIES:20} structuredProperties: enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java index db508c8950..3022c28fda 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java @@ -29,11 +29,10 @@ public class ElasticSearchGraphServiceFactory { @Qualifier("baseElasticSearchComponents") private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components; - @Autowired private ConfigurationProvider configurationProvider; - @Bean(name = "graphService") @Nonnull protected GraphService getInstance( + final ConfigurationProvider configurationProvider, final EntityRegistry entityRegistry, @Value("${elasticsearch.idHashAlgo}") final String idHashAlgo, MetricUtils metricUtils) { @@ -45,7 +44,7 @@ public class ElasticSearchGraphServiceFactory { new ESGraphWriteDAO( components.getIndexConvention(), components.getBulkProcessor(), - components.getNumRetries(), + components.getConfig().getBulkProcessor().getNumRetries(), configurationProvider.getElasticSearch().getSearch().getGraph()), new ESGraphQueryDAO( components.getSearchClient(), diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchSystemMetadataServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchSystemMetadataServiceFactory.java index a32d933d69..6ae58dcb12 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchSystemMetadataServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchSystemMetadataServiceFactory.java @@ -31,7 +31,7 @@ public class ElasticSearchSystemMetadataServiceFactory { components.getSearchClient(), components.getIndexConvention(), components.getBulkProcessor(), - components.getNumRetries(), + components.getConfig().getBulkProcessor().getNumRetries(), configurationProvider.getSystemMetadataService()), components.getIndexBuilder(), elasticIdHashAlgo, diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java index 3fb69d12c8..892a62e569 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java @@ -2,6 +2,8 @@ package com.linkedin.gms.factory.search; import com.linkedin.gms.factory.common.IndexConventionFactory; import com.linkedin.gms.factory.common.RestHighLevelClientFactory; +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.config.search.ElasticSearchConfiguration; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; @@ -9,7 +11,6 @@ import javax.annotation.Nonnull; import org.opensearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -25,16 +26,13 @@ import org.springframework.context.annotation.Import; public class BaseElasticSearchComponentsFactory { @lombok.Value public static class BaseElasticSearchComponents { + ElasticSearchConfiguration config; RestHighLevelClient searchClient; IndexConvention indexConvention; ESBulkProcessor bulkProcessor; ESIndexBuilder indexBuilder; - int numRetries; } - @Value("${elasticsearch.bulkProcessor.numRetries}") - private Integer numRetries; - @Autowired @Qualifier("elasticSearchRestHighLevelClient") private RestHighLevelClient searchClient; @@ -53,8 +51,12 @@ public class BaseElasticSearchComponentsFactory { @Bean(name = "baseElasticSearchComponents") @Nonnull - protected BaseElasticSearchComponents getInstance() { + protected BaseElasticSearchComponents getInstance(ConfigurationProvider configurationProvider) { return new BaseElasticSearchComponents( - searchClient, indexConvention, bulkProcessor, indexBuilder, numRetries); + configurationProvider.getElasticSearch(), + searchClient, + indexConvention, + bulkProcessor, + indexBuilder); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java index 42de89b485..dc3d33cb2b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java @@ -74,6 +74,12 @@ public class ElasticSearchServiceFactory { configurationProvider.getSearchService()); } + @Bean + protected ESWriteDAO esWriteDAO() { + return new ESWriteDAO( + components.getConfig(), components.getSearchClient(), components.getBulkProcessor()); + } + @Bean(name = "elasticSearchService") @Nonnull protected ElasticSearchService getInstance( @@ -81,7 +87,8 @@ public class ElasticSearchServiceFactory { final QueryFilterRewriteChain queryFilterRewriteChain, final ElasticSearchConfiguration elasticSearchConfiguration, final CustomSearchConfiguration customSearchConfiguration, - final ESSearchDAO esSearchDAO) + final ESSearchDAO esSearchDAO, + final ESWriteDAO esWriteDAO) throws IOException { return new ElasticSearchService( @@ -97,9 +104,6 @@ public class ElasticSearchServiceFactory { customSearchConfiguration, queryFilterRewriteChain, configurationProvider.getSearchService()), - new ESWriteDAO( - components.getSearchClient(), - components.getBulkProcessor(), - components.getNumRetries())); + esWriteDAO); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java index 1492df7a3b..dcebafd7e1 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java @@ -34,7 +34,7 @@ public class ElasticSearchTimeseriesAspectServiceFactory { return new ElasticSearchTimeseriesAspectService( components.getSearchClient(), components.getBulkProcessor(), - components.getNumRetries(), + components.getConfig().getBulkProcessor().getNumRetries(), queryFilterRewriteChain, configurationProvider.getTimeseriesAspectService(), entityRegistry,