From 0695398a64510f04d8ebc86f64107985d2b355f0 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 26 Apr 2024 05:17:11 -0700 Subject: [PATCH] Fix #15490: openmetadata-ops.sh reindex does not work (#15993) * Fix #15490: openmetadata-ops.sh reindex does not work * add status for reindex --------- Co-authored-by: Pere Miquel Brull --- .../scheduler/AbstractOmAppJobListener.java | 67 ++++++++++--------- .../service/apps/scheduler/AppScheduler.java | 3 + .../service/jdbi3/EntityRepository.java | 8 +-- .../service/resources/CollectionRegistry.java | 18 +++++ .../service/resources/types/TypeResource.java | 2 +- .../service/util/OpenMetadataOperations.java | 30 ++++++++- 6 files changed, 92 insertions(+), 36 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java index cadf140af8f..d2d959c1624 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java @@ -5,6 +5,7 @@ import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppRunRecord; @@ -18,6 +19,7 @@ import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobListener; +@Slf4j public abstract class AbstractOmAppJobListener implements JobListener { private final CollectionDAO collectionDAO; private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun"; @@ -35,38 +37,43 @@ public abstract class AbstractOmAppJobListener implements JobListener { @Override public void jobToBeExecuted(JobExecutionContext jobExecutionContext) { - String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); - String appName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_NAME); - App jobApp = collectionDAO.applicationDAO().findEntityByName(appName); - ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); - JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); - long jobStartTime = System.currentTimeMillis(); - AppRunRecord runRecord = - new AppRunRecord() - .withAppId(jobApp.getId()) - .withStartTime(jobStartTime) - .withTimestamp(jobStartTime) - .withRunType(runType) - .withStatus(AppRunRecord.Status.RUNNING) - .withScheduleInfo(jobApp.getAppSchedule()); - ; - boolean update = false; - if (jobExecutionContext.isRecovering()) { - AppRunRecord latestRunRecord = - JsonUtils.readValue( - collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()), - AppRunRecord.class); - if (latestRunRecord != null) { - runRecord = latestRunRecord; + try { + String runType = + (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); + String appName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_NAME); + App jobApp = collectionDAO.applicationDAO().findEntityByName(appName); + ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); + JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + long jobStartTime = System.currentTimeMillis(); + AppRunRecord runRecord = + new AppRunRecord() + .withAppId(jobApp.getId()) + .withStartTime(jobStartTime) + .withTimestamp(jobStartTime) + .withRunType(runType) + .withStatus(AppRunRecord.Status.RUNNING) + .withScheduleInfo(jobApp.getAppSchedule()); + ; + boolean update = false; + if (jobExecutionContext.isRecovering()) { + AppRunRecord latestRunRecord = + JsonUtils.readValue( + collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()), + AppRunRecord.class); + if (latestRunRecord != null) { + runRecord = latestRunRecord; + } + update = true; } - update = true; - } - // Put the Context in the Job Data Map - dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); + // Put the Context in the Job Data Map + dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); - // Insert new Record Run - pushApplicationStatusUpdates(jobExecutionContext, runRecord, update); - this.doJobToBeExecuted(jobExecutionContext); + // Insert new Record Run + pushApplicationStatusUpdates(jobExecutionContext, runRecord, update); + this.doJobToBeExecuted(jobExecutionContext); + } catch (Exception e) { + LOG.info("Error while setting up the job context", e); + } } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index 98e84ccdf12..cb6c44f571d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -216,6 +216,9 @@ public class AppScheduler { } public void triggerOnDemandApplication(App application) { + if (application.getFullyQualifiedName() == null) { + throw new IllegalArgumentException("Application's fullyQualifiedName is null."); + } try { JobDetail jobDetailScheduled = scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index 72a7d8982ed..4e5be8c7ce8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -456,16 +456,16 @@ public abstract class EntityRepository { public final void initializeEntity(T entity) { T existingEntity = findByNameOrNull(entity.getFullyQualifiedName(), ALL); if (existingEntity != null) { - LOG.info("{} {} is already initialized", entityType, entity.getFullyQualifiedName()); + LOG.debug("{} {} is already initialized", entityType, entity.getFullyQualifiedName()); return; } - LOG.info("{} {} is not initialized", entityType, entity.getFullyQualifiedName()); + LOG.debug("{} {} is not initialized", entityType, entity.getFullyQualifiedName()); entity.setUpdatedBy(ADMIN_USER_NAME); entity.setUpdatedAt(System.currentTimeMillis()); entity.setId(UUID.randomUUID()); create(null, entity); - LOG.info("Created a new {} {}", entityType, entity.getFullyQualifiedName()); + LOG.debug("Created a new {} {}", entityType, entity.getFullyQualifiedName()); } public final T copy(T entity, CreateEntity request, String updatedBy) { @@ -2514,7 +2514,7 @@ public abstract class EntityRepository { } else if (fieldsChanged()) { newVersion = nextVersion(oldVersion); } - LOG.info( + LOG.debug( "{} {}->{} - Fields added {}, updated {}, deleted {}", original.getId(), oldVersion, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/CollectionRegistry.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/CollectionRegistry.java index 1ee3eba47f7..adddf7545d2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/CollectionRegistry.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/CollectionRegistry.java @@ -176,6 +176,24 @@ public final class CollectionRegistry { }); } + public void loadSeedData( + Jdbi jdbi, + OpenMetadataApplicationConfig config, + Authorizer authorizer, + AuthenticatorHandler authenticatorHandler) { + // Build list of ResourceDescriptors + for (Map.Entry e : collectionMap.entrySet()) { + CollectionDetails details = e.getValue(); + String resourceClass = details.resourceClass; + try { + Object resource = + createResource(jdbi, resourceClass, config, authorizer, authenticatorHandler); + } catch (Exception ex) { + LOG.warn("Failed to create resource for class {} {}", resourceClass, ex); + } + } + } + /** Get collection details based on annotations in Resource classes */ private static CollectionDetails getCollection(Class cl) { int order = 0; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/types/TypeResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/types/TypeResource.java index 95104071064..b7b0ed2bbbc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/types/TypeResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/types/TypeResource.java @@ -100,7 +100,7 @@ public class TypeResource extends EntityResource { types.forEach( type -> { type.withId(UUID.randomUUID()).withUpdatedBy(ADMIN_USER_NAME).withUpdatedAt(now); - LOG.info("Loading type {}", type.getName()); + LOG.debug("Loading type {}", type.getName()); try { Fields fields = getFields(PROPERTIES_FIELD); try { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index 56c60c1268d..3d3a5928f69 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -38,6 +38,7 @@ import org.jdbi.v3.sqlobject.SqlObjects; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.ServiceEntityInterface; import org.openmetadata.schema.entity.app.App; +import org.openmetadata.schema.entity.app.AppRunRecord; import org.openmetadata.schema.entity.app.AppSchedule; import org.openmetadata.schema.entity.app.ScheduleTimeline; import org.openmetadata.schema.entity.app.ScheduledExecutionContext; @@ -53,6 +54,7 @@ import org.openmetadata.service.apps.scheduler.AppScheduler; import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.fernet.Fernet; +import org.openmetadata.service.jdbi3.AppRepository; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.IngestionPipelineRepository; @@ -61,6 +63,7 @@ import org.openmetadata.service.jdbi3.MigrationDAO; import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator; import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.migration.api.MigrationWorkflow; +import org.openmetadata.service.resources.CollectionRegistry; import org.openmetadata.service.resources.databases.DatasourceConfig; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.secrets.SecretsManager; @@ -248,12 +251,20 @@ public class OpenMetadataOperations implements Callable { boolean recreateIndexes) { try { parseConfig(); + CollectionRegistry.initialize(); ApplicationHandler.initialize(config); + // load seed data so that repositories are initialized + CollectionRegistry.getInstance().loadSeedData(jdbi, config, null, null); + ApplicationHandler.initialize(config); + // creates the default search index application AppScheduler.initialize(config, collectionDAO, searchRepository); + + String appName = "SearchIndexingApplication"; App searchIndexApp = new App() .withId(UUID.randomUUID()) - .withName("SearchIndexApp") + .withName(appName) + .withFullyQualifiedName(appName) .withClassName("org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp") .withAppSchedule(new AppSchedule().withScheduleTimeline(ScheduleTimeline.DAILY)) .withAppConfiguration( @@ -268,6 +279,23 @@ public class OpenMetadataOperations implements Callable { do { Thread.sleep(3000l); } while (!AppScheduler.getInstance().getScheduler().getCurrentlyExecutingJobs().isEmpty()); + AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); + AppRunRecord appRunRecord = appRepository.getLatestAppRuns(searchIndexApp.getId()); + if (appRunRecord != null) { + List columns = + new ArrayList<>( + List.of("status", "startTime", "endTime", "executionTime", "success", "failure")); + List> rows = new ArrayList<>(); + rows.add( + Arrays.asList( + appRunRecord.getStatus().value(), + appRunRecord.getStartTime().toString(), + appRunRecord.getEndTime().toString(), + appRunRecord.getExecutionTime().toString(), + appRunRecord.getSuccessContext().toString(), + appRunRecord.getFailureContext().toString())); + printToAsciiTable(columns, rows, "No Search Indexing Application found"); + } return 0; } catch (Exception e) { LOG.error("Failed to reindex due to ", e);