mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-02 03:29:03 +00:00
* Fix #15490: openmetadata-ops.sh reindex does not work * add status for reindex --------- Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
parent
21d7a2ffba
commit
0695398a64
@ -5,6 +5,7 @@ import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.openmetadata.schema.entity.app.App;
|
||||
import org.openmetadata.schema.entity.app.AppRunRecord;
|
||||
@ -18,6 +19,7 @@ import org.quartz.JobExecutionContext;
|
||||
import org.quartz.JobExecutionException;
|
||||
import org.quartz.JobListener;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractOmAppJobListener implements JobListener {
|
||||
private final CollectionDAO collectionDAO;
|
||||
private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun";
|
||||
@ -35,38 +37,43 @@ public abstract class AbstractOmAppJobListener implements JobListener {
|
||||
|
||||
@Override
|
||||
public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {
|
||||
String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
|
||||
String appName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_NAME);
|
||||
App jobApp = collectionDAO.applicationDAO().findEntityByName(appName);
|
||||
ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp);
|
||||
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
|
||||
long jobStartTime = System.currentTimeMillis();
|
||||
AppRunRecord runRecord =
|
||||
new AppRunRecord()
|
||||
.withAppId(jobApp.getId())
|
||||
.withStartTime(jobStartTime)
|
||||
.withTimestamp(jobStartTime)
|
||||
.withRunType(runType)
|
||||
.withStatus(AppRunRecord.Status.RUNNING)
|
||||
.withScheduleInfo(jobApp.getAppSchedule());
|
||||
;
|
||||
boolean update = false;
|
||||
if (jobExecutionContext.isRecovering()) {
|
||||
AppRunRecord latestRunRecord =
|
||||
JsonUtils.readValue(
|
||||
collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()),
|
||||
AppRunRecord.class);
|
||||
if (latestRunRecord != null) {
|
||||
runRecord = latestRunRecord;
|
||||
try {
|
||||
String runType =
|
||||
(String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
|
||||
String appName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_NAME);
|
||||
App jobApp = collectionDAO.applicationDAO().findEntityByName(appName);
|
||||
ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp);
|
||||
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
|
||||
long jobStartTime = System.currentTimeMillis();
|
||||
AppRunRecord runRecord =
|
||||
new AppRunRecord()
|
||||
.withAppId(jobApp.getId())
|
||||
.withStartTime(jobStartTime)
|
||||
.withTimestamp(jobStartTime)
|
||||
.withRunType(runType)
|
||||
.withStatus(AppRunRecord.Status.RUNNING)
|
||||
.withScheduleInfo(jobApp.getAppSchedule());
|
||||
;
|
||||
boolean update = false;
|
||||
if (jobExecutionContext.isRecovering()) {
|
||||
AppRunRecord latestRunRecord =
|
||||
JsonUtils.readValue(
|
||||
collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()),
|
||||
AppRunRecord.class);
|
||||
if (latestRunRecord != null) {
|
||||
runRecord = latestRunRecord;
|
||||
}
|
||||
update = true;
|
||||
}
|
||||
update = true;
|
||||
}
|
||||
// Put the Context in the Job Data Map
|
||||
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
|
||||
// Put the Context in the Job Data Map
|
||||
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
|
||||
|
||||
// Insert new Record Run
|
||||
pushApplicationStatusUpdates(jobExecutionContext, runRecord, update);
|
||||
this.doJobToBeExecuted(jobExecutionContext);
|
||||
// Insert new Record Run
|
||||
pushApplicationStatusUpdates(jobExecutionContext, runRecord, update);
|
||||
this.doJobToBeExecuted(jobExecutionContext);
|
||||
} catch (Exception e) {
|
||||
LOG.info("Error while setting up the job context", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -216,6 +216,9 @@ public class AppScheduler {
|
||||
}
|
||||
|
||||
public void triggerOnDemandApplication(App application) {
|
||||
if (application.getFullyQualifiedName() == null) {
|
||||
throw new IllegalArgumentException("Application's fullyQualifiedName is null.");
|
||||
}
|
||||
try {
|
||||
JobDetail jobDetailScheduled =
|
||||
scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP));
|
||||
|
||||
@ -456,16 +456,16 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
public final void initializeEntity(T entity) {
|
||||
T existingEntity = findByNameOrNull(entity.getFullyQualifiedName(), ALL);
|
||||
if (existingEntity != null) {
|
||||
LOG.info("{} {} is already initialized", entityType, entity.getFullyQualifiedName());
|
||||
LOG.debug("{} {} is already initialized", entityType, entity.getFullyQualifiedName());
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("{} {} is not initialized", entityType, entity.getFullyQualifiedName());
|
||||
LOG.debug("{} {} is not initialized", entityType, entity.getFullyQualifiedName());
|
||||
entity.setUpdatedBy(ADMIN_USER_NAME);
|
||||
entity.setUpdatedAt(System.currentTimeMillis());
|
||||
entity.setId(UUID.randomUUID());
|
||||
create(null, entity);
|
||||
LOG.info("Created a new {} {}", entityType, entity.getFullyQualifiedName());
|
||||
LOG.debug("Created a new {} {}", entityType, entity.getFullyQualifiedName());
|
||||
}
|
||||
|
||||
public final T copy(T entity, CreateEntity request, String updatedBy) {
|
||||
@ -2514,7 +2514,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
} else if (fieldsChanged()) {
|
||||
newVersion = nextVersion(oldVersion);
|
||||
}
|
||||
LOG.info(
|
||||
LOG.debug(
|
||||
"{} {}->{} - Fields added {}, updated {}, deleted {}",
|
||||
original.getId(),
|
||||
oldVersion,
|
||||
|
||||
@ -176,6 +176,24 @@ public final class CollectionRegistry {
|
||||
});
|
||||
}
|
||||
|
||||
public void loadSeedData(
|
||||
Jdbi jdbi,
|
||||
OpenMetadataApplicationConfig config,
|
||||
Authorizer authorizer,
|
||||
AuthenticatorHandler authenticatorHandler) {
|
||||
// Build list of ResourceDescriptors
|
||||
for (Map.Entry<String, CollectionDetails> e : collectionMap.entrySet()) {
|
||||
CollectionDetails details = e.getValue();
|
||||
String resourceClass = details.resourceClass;
|
||||
try {
|
||||
Object resource =
|
||||
createResource(jdbi, resourceClass, config, authorizer, authenticatorHandler);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Failed to create resource for class {} {}", resourceClass, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Get collection details based on annotations in Resource classes */
|
||||
private static CollectionDetails getCollection(Class<?> cl) {
|
||||
int order = 0;
|
||||
|
||||
@ -100,7 +100,7 @@ public class TypeResource extends EntityResource<Type, TypeRepository> {
|
||||
types.forEach(
|
||||
type -> {
|
||||
type.withId(UUID.randomUUID()).withUpdatedBy(ADMIN_USER_NAME).withUpdatedAt(now);
|
||||
LOG.info("Loading type {}", type.getName());
|
||||
LOG.debug("Loading type {}", type.getName());
|
||||
try {
|
||||
Fields fields = getFields(PROPERTIES_FIELD);
|
||||
try {
|
||||
|
||||
@ -38,6 +38,7 @@ import org.jdbi.v3.sqlobject.SqlObjects;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.ServiceEntityInterface;
|
||||
import org.openmetadata.schema.entity.app.App;
|
||||
import org.openmetadata.schema.entity.app.AppRunRecord;
|
||||
import org.openmetadata.schema.entity.app.AppSchedule;
|
||||
import org.openmetadata.schema.entity.app.ScheduleTimeline;
|
||||
import org.openmetadata.schema.entity.app.ScheduledExecutionContext;
|
||||
@ -53,6 +54,7 @@ import org.openmetadata.service.apps.scheduler.AppScheduler;
|
||||
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
|
||||
import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.fernet.Fernet;
|
||||
import org.openmetadata.service.jdbi3.AppRepository;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository;
|
||||
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
|
||||
@ -61,6 +63,7 @@ import org.openmetadata.service.jdbi3.MigrationDAO;
|
||||
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
|
||||
import org.openmetadata.service.jdbi3.locator.ConnectionType;
|
||||
import org.openmetadata.service.migration.api.MigrationWorkflow;
|
||||
import org.openmetadata.service.resources.CollectionRegistry;
|
||||
import org.openmetadata.service.resources.databases.DatasourceConfig;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.secrets.SecretsManager;
|
||||
@ -248,12 +251,20 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
||||
boolean recreateIndexes) {
|
||||
try {
|
||||
parseConfig();
|
||||
CollectionRegistry.initialize();
|
||||
ApplicationHandler.initialize(config);
|
||||
// load seed data so that repositories are initialized
|
||||
CollectionRegistry.getInstance().loadSeedData(jdbi, config, null, null);
|
||||
ApplicationHandler.initialize(config);
|
||||
// creates the default search index application
|
||||
AppScheduler.initialize(config, collectionDAO, searchRepository);
|
||||
|
||||
String appName = "SearchIndexingApplication";
|
||||
App searchIndexApp =
|
||||
new App()
|
||||
.withId(UUID.randomUUID())
|
||||
.withName("SearchIndexApp")
|
||||
.withName(appName)
|
||||
.withFullyQualifiedName(appName)
|
||||
.withClassName("org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp")
|
||||
.withAppSchedule(new AppSchedule().withScheduleTimeline(ScheduleTimeline.DAILY))
|
||||
.withAppConfiguration(
|
||||
@ -268,6 +279,23 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
||||
do {
|
||||
Thread.sleep(3000l);
|
||||
} while (!AppScheduler.getInstance().getScheduler().getCurrentlyExecutingJobs().isEmpty());
|
||||
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
|
||||
AppRunRecord appRunRecord = appRepository.getLatestAppRuns(searchIndexApp.getId());
|
||||
if (appRunRecord != null) {
|
||||
List<String> columns =
|
||||
new ArrayList<>(
|
||||
List.of("status", "startTime", "endTime", "executionTime", "success", "failure"));
|
||||
List<List<String>> rows = new ArrayList<>();
|
||||
rows.add(
|
||||
Arrays.asList(
|
||||
appRunRecord.getStatus().value(),
|
||||
appRunRecord.getStartTime().toString(),
|
||||
appRunRecord.getEndTime().toString(),
|
||||
appRunRecord.getExecutionTime().toString(),
|
||||
appRunRecord.getSuccessContext().toString(),
|
||||
appRunRecord.getFailureContext().toString()));
|
||||
printToAsciiTable(columns, rows, "No Search Indexing Application found");
|
||||
}
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to reindex due to ", e);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user