Move Recreate Out of executors (#19501)

* Move Recreate Out of executors

* Add Entities option in Operations

* Fix Reindexing for null list for referred columns
This commit is contained in:
Mohit Yadav 2025-01-24 10:38:45 +05:30 committed by GitHub
parent ed55fb3137
commit 907eb7cb07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 82 additions and 70 deletions

View File

@ -116,6 +116,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
jobData.setRecreateIndex(false);
}
reCreateIndexes(jobData.getEntities());
performReindex(jobExecutionContext);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
@ -243,7 +244,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
jobExecutor.submit(
() -> {
try {
reCreateIndexes(entityType);
int totalEntityRecords = getTotalEntityRecords(entityType);
Source<?> source = createSource(entityType);
int loadPerThread = calculateNumberOfThreads(totalEntityRecords);
@ -434,20 +434,22 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
}
private void reCreateIndexes(String entityType) throws SearchIndexException {
if (Boolean.FALSE.equals(jobData.getRecreateIndex())) {
LOG.debug("RecreateIndex is false. Skipping index recreation for '{}'.", entityType);
return;
}
private void reCreateIndexes(Set<String> entities) throws SearchIndexException {
for (String entityType : entities) {
if (Boolean.FALSE.equals(jobData.getRecreateIndex())) {
LOG.debug("RecreateIndex is false. Skipping index recreation for '{}'.", entityType);
return;
}
try {
IndexMapping indexType = searchRepository.getIndexMapping(entityType);
searchRepository.deleteIndex(indexType);
searchRepository.createIndex(indexType);
LOG.info("Recreated index for entityType '{}'.", entityType);
} catch (Exception e) {
LOG.error("Failed to recreate index for entityType '{}'.", entityType, e);
throw new SearchIndexException(e);
try {
IndexMapping indexType = searchRepository.getIndexMapping(entityType);
searchRepository.deleteIndex(indexType);
searchRepository.createIndex(indexType);
LOG.info("Recreated index for entityType '{}'.", entityType);
} catch (Exception e) {
LOG.error("Failed to recreate index for entityType '{}'.", entityType, e);
throw new SearchIndexException(e);
}
}
}

View File

@ -1,5 +1,6 @@
package org.openmetadata.service.search.indexes;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
@ -182,59 +183,57 @@ public interface SearchIndex {
Table relatedEntity,
List<Map<String, Object>> constraints,
Boolean updateForeignTableIndex) {
if (!nullOrEmpty(entity.getTableConstraints())) {
for (TableConstraint tableConstraint : entity.getTableConstraints()) {
if (!tableConstraint
.getConstraintType()
.value()
.equalsIgnoreCase(TableConstraint.ConstraintType.FOREIGN_KEY.value())) {
continue;
}
int columnIndex = 0;
for (String referredColumn : tableConstraint.getReferredColumns()) {
String relatedEntityFQN = getParentFQN(referredColumn);
String destinationIndexName = null;
try {
if (updateForeignTableIndex) {
relatedEntity = getEntityByName(Entity.TABLE, relatedEntityFQN, "*", ALL);
IndexMapping destinationIndexMapping =
Entity.getSearchRepository()
.getIndexMapping(relatedEntity.getEntityReference().getType());
destinationIndexName =
destinationIndexMapping.getIndexName(
Entity.getSearchRepository().getClusterAlias());
}
Map<String, Object> relationshipsMap = buildRelationshipsMap(entity, relatedEntity);
int relatedEntityIndex =
checkRelatedEntity(
entity.getFullyQualifiedName(),
relatedEntity.getFullyQualifiedName(),
constraints);
if (relatedEntityIndex >= 0) {
updateExistingConstraint(
entity,
tableConstraint,
constraints.get(relatedEntityIndex),
destinationIndexName,
relatedEntity,
referredColumn,
columnIndex,
updateForeignTableIndex);
} else {
addNewConstraint(
entity,
tableConstraint,
constraints,
relationshipsMap,
destinationIndexName,
relatedEntity,
referredColumn,
columnIndex,
updateForeignTableIndex);
}
columnIndex++;
} catch (EntityNotFoundException ex) {
for (TableConstraint tableConstraint : listOrEmpty(entity.getTableConstraints())) {
if (!tableConstraint
.getConstraintType()
.value()
.equalsIgnoreCase(TableConstraint.ConstraintType.FOREIGN_KEY.value())) {
continue;
}
int columnIndex = 0;
for (String referredColumn : listOrEmpty(tableConstraint.getReferredColumns())) {
String relatedEntityFQN = getParentFQN(referredColumn);
String destinationIndexName = null;
try {
if (updateForeignTableIndex) {
relatedEntity = getEntityByName(Entity.TABLE, relatedEntityFQN, "*", ALL);
IndexMapping destinationIndexMapping =
Entity.getSearchRepository()
.getIndexMapping(relatedEntity.getEntityReference().getType());
destinationIndexName =
destinationIndexMapping.getIndexName(
Entity.getSearchRepository().getClusterAlias());
}
Map<String, Object> relationshipsMap = buildRelationshipsMap(entity, relatedEntity);
int relatedEntityIndex =
checkRelatedEntity(
entity.getFullyQualifiedName(),
relatedEntity.getFullyQualifiedName(),
constraints);
if (relatedEntityIndex >= 0) {
updateExistingConstraint(
entity,
tableConstraint,
constraints.get(relatedEntityIndex),
destinationIndexName,
relatedEntity,
referredColumn,
columnIndex,
updateForeignTableIndex);
} else {
addNewConstraint(
entity,
tableConstraint,
constraints,
relationshipsMap,
destinationIndexName,
relatedEntity,
referredColumn,
columnIndex,
updateForeignTableIndex);
}
columnIndex++;
} catch (EntityNotFoundException ex) {
}
}
}

View File

@ -424,10 +424,17 @@ public class OpenMetadataOperations implements Callable<Integer> {
names = {"--retries"},
defaultValue = "3",
description = "Maximum number of retries for failed search requests.")
int retries) {
int retries,
@Option(
names = {"--entities"},
defaultValue = "'all'",
description =
"Entities to reindex. Passing --entities='table,dashboard' will reindex table and dashboard entities. Passing nothing will reindex everything.")
String entityStr) {
try {
LOG.info(
"Running Reindexing with Batch Size: {}, Payload Size: {}, Recreate-Index: {}, Producer threads: {}, Consumer threads: {}, Queue Size: {}, Back-off: {}, Max Back-off: {}, Max Requests: {}, Retries: {}",
"Running Reindexing with Entities:{} , Batch Size: {}, Payload Size: {}, Recreate-Index: {}, Producer threads: {}, Consumer threads: {}, Queue Size: {}, Back-off: {}, Max Back-off: {}, Max Requests: {}, Retries: {}",
entityStr,
batchSize,
payloadSize,
recreateIndexes,
@ -445,8 +452,11 @@ public class OpenMetadataOperations implements Callable<Integer> {
ApplicationHandler.initialize(config);
AppScheduler.initialize(config, collectionDAO, searchRepository);
String appName = "SearchIndexingApplication";
Set<String> entities =
new HashSet<>(Arrays.asList(entityStr.substring(1, entityStr.length() - 1).split(",")));
return executeSearchReindexApp(
appName,
entities,
batchSize,
payloadSize,
recreateIndexes,
@ -465,6 +475,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
private int executeSearchReindexApp(
String appName,
Set<String> entities,
int batchSize,
long payloadSize,
boolean recreateIndexes,
@ -485,6 +496,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
EventPublisherJob updatedJob = JsonUtils.deepCopy(storedJob, EventPublisherJob.class);
updatedJob
.withEntities(entities)
.withBatchSize(batchSize)
.withPayLoadSize(payloadSize)
.withRecreateIndex(recreateIndexes)
@ -494,8 +506,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
.withInitialBackoff(backOff)
.withMaxBackoff(maxBackOff)
.withMaxConcurrentRequests(maxRequests)
.withMaxRetries(retries)
.withEntities(Set.of("all"));
.withMaxRetries(retries);
// Update the search index app with the new configurations
App updatedSearchIndexApp = JsonUtils.deepCopy(originalSearchIndexApp, App.class);