- Fix Search Indexing issues in Ops (#16194)

- BatchSize and recreate updated
- added conditional Init of resources
This commit is contained in:
Mohit Yadav 2024-05-09 17:56:38 +05:30 committed by GitHub
parent 615ef9d5e9
commit f606212484
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 104 additions and 65 deletions

View File

@ -282,3 +282,13 @@ SET json = JSON_INSERT(
);
UPDATE table_entity SET json = JSON_REMOVE(json, '$.testSuite');
-- Clean up QRTZ tables
delete from QRTZ_SIMPLE_TRIGGERS;
delete from QRTZ_CRON_TRIGGERS;
delete from QRTZ_TRIGGERS;
delete from QRTZ_LOCKS;
delete from QRTZ_SCHEDULER_STATE;
delete from QRTZ_JOB_DETAILS;
delete from QRTZ_FIRED_TRIGGERS;

View File

@ -279,3 +279,11 @@ WHERE jsonb_exists(json::jsonb, 'viewDefinition') = true;
UPDATE table_entity SET json = json - 'testSuite';
-- Clean up QRTZ table
delete from QRTZ_SIMPLE_TRIGGERS;
delete from QRTZ_CRON_TRIGGERS;
delete from QRTZ_TRIGGERS;
delete from QRTZ_LOCKS;
delete from QRTZ_SCHEDULER_STATE;
delete from QRTZ_JOB_DETAILS;
delete from QRTZ_FIRED_TRIGGERS;

View File

@ -177,7 +177,7 @@ public class AppScheduler {
JobBuilder.newJob(clz)
.withIdentity(jobIdentity, APPS_JOB_GROUP)
.usingJobData(dataMap)
.requestRecovery(true);
.requestRecovery(false);
return jobBuilder.build();
}

View File

@ -196,12 +196,11 @@ public class MigrationWorkflow {
updateMigrationStepInDB(process, context);
} finally {
allRows.add(row);
printToAsciiTable(columns, allRows, "Status Unavailable");
LOG.info(
"[MigrationWorkFlow] Migration Run finished for Version: {}", process.getVersion());
}
}
printToAsciiTable(columns, allRows, "Status Unavailable");
} catch (Exception e) {
// Any Exception catch the error
LOG.error("Encountered Exception in MigrationWorkflow", e);

View File

@ -25,4 +25,6 @@ public @interface Collection {
/** Order of initialization of resource starting from 0. Only order from 0 to 9 (inclusive) are allowed */
int order() default 9;
boolean requiredForOps() default false;
}

View File

@ -180,16 +180,19 @@ public final class CollectionRegistry {
Jdbi jdbi,
OpenMetadataApplicationConfig config,
Authorizer authorizer,
AuthenticatorHandler authenticatorHandler) {
AuthenticatorHandler authenticatorHandler,
boolean isOperations) {
// Build list of ResourceDescriptors
for (Map.Entry<String, CollectionDetails> e : collectionMap.entrySet()) {
CollectionDetails details = e.getValue();
String resourceClass = details.resourceClass;
try {
Object resource =
createResource(jdbi, resourceClass, config, authorizer, authenticatorHandler);
} catch (Exception ex) {
LOG.warn("Failed to create resource for class {} {}", resourceClass, ex);
if (!isOperations || (isOperations && details.requiredForOps)) {
String resourceClass = details.resourceClass;
try {
Object resource =
createResource(jdbi, resourceClass, config, authorizer, authenticatorHandler);
} catch (Exception ex) {
LOG.warn("Failed to create resource for class {} {}", resourceClass, ex);
}
}
}
}
@ -197,6 +200,7 @@ public final class CollectionRegistry {
/** Get collection details based on annotations in Resource classes */
private static CollectionDetails getCollection(Class<?> cl) {
int order = 0;
boolean requiredForOps = false;
CollectionInfo collectionInfo = new CollectionInfo();
for (Annotation a : cl.getAnnotations()) {
if (a instanceof Path path) {
@ -209,11 +213,12 @@ public final class CollectionRegistry {
// Use @Collection annotation to get initialization information for the class
collectionInfo.withName(collection.name());
order = collection.order();
requiredForOps = collection.requiredForOps();
}
}
CollectionDescriptor cd = new CollectionDescriptor();
cd.setCollection(collectionInfo);
return new CollectionDetails(cd, cl.getCanonicalName(), order);
return new CollectionDetails(cd, cl.getCanonicalName(), order, requiredForOps);
}
/** Compile a list of REST collections based on Resource classes marked with {@code Collection} annotation */
@ -295,11 +300,14 @@ public final class CollectionRegistry {
@Getter @Setter private Object resource;
private final CollectionDescriptor cd;
private final int order;
private final boolean requiredForOps;
CollectionDetails(CollectionDescriptor cd, String resourceClass, int order) {
CollectionDetails(
CollectionDescriptor cd, String resourceClass, int order, boolean requiredForOps) {
this.cd = cd;
this.resourceClass = resourceClass;
this.order = order;
this.requiredForOps = requiredForOps;
}
}
}

View File

@ -83,7 +83,7 @@ import org.openmetadata.service.util.UserUtil;
+ "It performs this task as a special user in the system.")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "bots", order = 4) // initialize after user resource
@Collection(name = "bots", order = 4, requiredForOps = true) // initialize after user resource
public class BotResource extends EntityResource<Bot, BotRepository> {
public static final String COLLECTION_PATH = "/v1/bots/";

View File

@ -79,7 +79,7 @@ import org.openmetadata.service.util.ResultList;
"A `Policy` defines control that needs to be applied across different Data Entities.")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "policies", order = 0)
@Collection(name = "policies", order = 0, requiredForOps = true)
public class PolicyResource extends EntityResource<Policy, PolicyRepository> {
public static final String COLLECTION_PATH = "v1/policies/";
public static final String FIELDS = "owner,location,teams,roles";

View File

@ -74,7 +74,10 @@ import org.openmetadata.service.util.ResultList;
+ "team can be assigned one or multiple roles that provide privileges to a user and members of a team to perform the job function.")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "roles", order = 1) // Load roles after PolicyResource are loaded at Order 0
@Collection(
name = "roles",
order = 1,
requiredForOps = true) // Load roles after PolicyResource are loaded at Order 0
@Slf4j
public class RoleResource extends EntityResource<Role, RoleRepository> {
public static final String COLLECTION_PATH = "/v1/roles/";

View File

@ -83,7 +83,10 @@ import org.openmetadata.service.util.ResultList;
+ " more data assets. Hierarchical teams are supported `Organization` -> `BusinessUnit` -> `Division` -> `Department`.")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "teams", order = 2) // Load after roles, and policy resources
@Collection(
name = "teams",
order = 2,
requiredForOps = true) // Load after roles, and policy resources
public class TeamResource extends EntityResource<Team, TeamRepository> {
public static final String COLLECTION_PATH = "/v1/teams/";
static final String FIELDS =

View File

@ -157,7 +157,8 @@ import org.openmetadata.service.util.TokenUtil;
@Consumes(MediaType.APPLICATION_JSON)
@Collection(
name = "users",
order = 3) // Initialize user resource before bot resource (at default order 9)
order = 3,
requiredForOps = true) // Initialize user resource before bot resource (at default order 9)
public class UserResource extends EntityResource<User, UserRepository> {
public static final String COLLECTION_PATH = "v1/users/";
public static final String USER_PROTECTED_FIELDS = "authenticationMechanism";

View File

@ -307,7 +307,7 @@ public class ElasticSearchClient implements SearchClient {
"{} Deleted {}",
indexMapping.getIndexName(clusterAlias),
deleteIndexResponse.isAcknowledged());
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Failed to delete Elastic Search indexes due to", e);
}
}

View File

@ -301,7 +301,7 @@ public class OpenSearchClient implements SearchClient {
"{} Deleted {}",
indexMapping.getIndexName(clusterAlias),
deleteIndexResponse.isAcknowledged());
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Failed to delete Open Search indexes due to", e);
}
}

View File

@ -22,14 +22,13 @@ import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import javax.json.JsonPatch;
import javax.validation.Validator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@ -42,9 +41,6 @@ import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppSchedule;
import org.openmetadata.schema.entity.app.ScheduleTimeline;
import org.openmetadata.schema.entity.app.ScheduledExecutionContext;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.system.EventPublisherJob;
@ -258,44 +254,46 @@ public class OpenMetadataOperations implements Callable<Integer> {
CollectionRegistry.initialize();
ApplicationHandler.initialize(config);
// load seed data so that repositories are initialized
CollectionRegistry.getInstance().loadSeedData(jdbi, config, null, null);
CollectionRegistry.getInstance().loadSeedData(jdbi, config, null, null, true);
ApplicationHandler.initialize(config);
// creates the default search index application
AppScheduler.initialize(config, collectionDAO, searchRepository);
String appName = "SearchIndexingApplication";
App searchIndexApp = getSearchIndexingApp(appName, batchSize, recreateIndexes);
AppScheduler.getInstance().triggerOnDemandApplication(searchIndexApp);
return waitAndReturnReindexingAppStatus(searchIndexApp);
return executeSearchReindexApp(appName, batchSize, recreateIndexes);
} catch (Exception e) {
LOG.error("Failed to reindex due to ", e);
return 1;
}
}
private App getSearchIndexingApp(String appName, int batchSize, boolean recreateIndexes) {
try {
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
App searchApp =
appRepository.getByName(null, "SearchIndexingApplication", appRepository.getFields("id"));
return searchApp;
} catch (EntityNotFoundException ex) {
return new App()
.withId(UUID.randomUUID())
.withName(appName)
.withFullyQualifiedName(appName)
.withClassName("org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp")
.withAppSchedule(new AppSchedule().withScheduleTimeline(ScheduleTimeline.DAILY))
.withAppConfiguration(
new EventPublisherJob()
.withEntities(new HashSet<>(List.of("all")))
.withRecreateIndex(recreateIndexes)
.withBatchSize(batchSize)
.withSearchIndexMappingLanguage(
config.getElasticSearchConfiguration().getSearchIndexMappingLanguage()))
.withRuntime(new ScheduledExecutionContext().withEnabled(true));
}
private int executeSearchReindexApp(String appName, int batchSize, boolean recreateIndexes) {
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
App originalSearchIndexApp =
appRepository.getByName(null, appName, appRepository.getFields("id"));
EventPublisherJob storedJob =
JsonUtils.convertValue(
originalSearchIndexApp.getAppConfiguration(), EventPublisherJob.class);
// Update the search index app with the new batch size and recreate index flag
App updatedSearchIndexApp = JsonUtils.deepCopy(originalSearchIndexApp, App.class);
updatedSearchIndexApp.withAppConfiguration(
storedJob.withRecreateIndex(recreateIndexes).withBatchSize(batchSize));
JsonPatch patch = JsonUtils.getJsonPatch(originalSearchIndexApp, updatedSearchIndexApp);
appRepository.patch(null, originalSearchIndexApp.getId(), "admin", patch);
// Trigger Application
AppScheduler.getInstance().triggerOnDemandApplication(updatedSearchIndexApp);
int result = waitAndReturnReindexingAppStatus(updatedSearchIndexApp);
// Repatch with original
JsonPatch repatch = JsonUtils.getJsonPatch(updatedSearchIndexApp, originalSearchIndexApp);
appRepository.patch(null, originalSearchIndexApp.getId(), "admin", repatch);
return result;
}
@SneakyThrows
@ -306,23 +304,19 @@ public class OpenMetadataOperations implements Callable<Integer> {
AppRepository appRepository =
(AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
appRunRecord = appRepository.getLatestAppRuns(searchIndexApp.getId());
if (appRunRecord != null) {
if (isRunCompleted(appRunRecord)) {
List<String> columns =
new ArrayList<>(
List.of("status", "startTime", "endTime", "executionTime", "success", "failure"));
List<List<String>> rows = new ArrayList<>();
rows.add(
Arrays.asList(
appRunRecord.getStatus().value(),
appRunRecord.getStartTime().toString(),
appRunRecord.getEndTime().toString(),
appRunRecord.getExecutionTime().toString(),
nullOrEmpty(appRunRecord.getSuccessContext())
? "Unavailable"
: JsonUtils.pojoToJson(appRunRecord.getSuccessContext()),
nullOrEmpty(appRunRecord.getFailureContext())
? "Unavailable"
: JsonUtils.pojoToJson(appRunRecord.getFailureContext())));
getValueOrUnavailable(appRunRecord.getStatus().value()),
getValueOrUnavailable(appRunRecord.getStartTime()),
getValueOrUnavailable(appRunRecord.getEndTime()),
getValueOrUnavailable(appRunRecord.getExecutionTime()),
getValueOrUnavailable(appRunRecord.getSuccessContext()),
getValueOrUnavailable(appRunRecord.getFailureContext())));
printToAsciiTable(columns, rows, "Failed to run Search Reindexing");
}
} catch (UnhandledServerException e) {
@ -331,9 +325,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
appRunRecord = null;
Thread.sleep(10000);
}
} while (appRunRecord == null
&& (appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS)
|| appRunRecord.getStatus().equals(AppRunRecord.Status.FAILED)));
} while (!isRunCompleted(appRunRecord));
if (appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS)
|| appRunRecord.getStatus().equals(AppRunRecord.Status.COMPLETED)) {
@ -344,6 +336,19 @@ public class OpenMetadataOperations implements Callable<Integer> {
return 1;
}
public String getValueOrUnavailable(Object obj) {
return nullOrEmpty(obj) ? "Unavailable" : JsonUtils.pojoToJson(obj);
}
boolean isRunCompleted(AppRunRecord appRunRecord) {
if (appRunRecord == null) {
return false;
}
return appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS)
|| appRunRecord.getStatus().equals(AppRunRecord.Status.FAILED);
}
@Command(name = "deploy-pipelines", description = "Deploy all the service pipelines.")
public Integer deployPipelines() {
try {