format cron from unix to quartz for internal apps , for others use unix (#15070)

* format cron from unix to quartz for internal apps , for others use unix

* Add Migration Files for 131

* Add Migrations

* Fix Json Issue and Add Exception Handling for force

---------

Co-authored-by: Ashish Gupta <ashish@getcollate.io>
This commit is contained in:
Mohit Yadav 2024-02-14 19:59:14 +05:30 committed by GitHub
parent 268c36aab6
commit 9fae19dd7a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 122 additions and 19 deletions

View File

@ -1,16 +1,11 @@
package org.openmetadata.service.apps;
import static com.cronutils.model.CronType.QUARTZ;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
import static org.openmetadata.service.apps.scheduler.AppScheduler.COLLECTION_DAO_KEY;
import static org.openmetadata.service.apps.scheduler.AppScheduler.SEARCH_CLIENT_KEY;
import static org.openmetadata.service.exception.CatalogExceptionMessage.LIVE_APP_SCHEDULE_ERR;
import com.cronutils.mapper.CronMapper;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
import java.util.List;
import lombok.Getter;
import lombok.SneakyThrows;
@ -51,9 +46,6 @@ public class AbstractNativeApplication implements NativeApplication {
protected CollectionDAO collectionDAO;
private @Getter App app;
protected SearchRepository searchRepository;
private final @Getter CronMapper cronMapper = CronMapper.fromQuartzToUnix();
private final @Getter CronParser cronParser =
new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
// Default service that contains external apps' Ingestion Pipelines
private static final String SERVICE_NAME = "OpenMetadata";
@ -149,9 +141,6 @@ public class AbstractNativeApplication implements NativeApplication {
.getByName(null, SERVICE_NAME, serviceEntityRepository.getFields("id"))
.getEntityReference();
Cron quartzCron =
this.getCronParser().parse(this.getApp().getAppSchedule().getCronExpression());
CreateIngestionPipeline createPipelineRequest =
new CreateIngestionPipeline()
.withName(this.getApp().getName())
@ -167,7 +156,7 @@ public class AbstractNativeApplication implements NativeApplication {
.withAppPrivateConfig(this.getApp().getPrivateConfiguration())))
.withAirflowConfig(
new AirflowConfig()
.withScheduleInterval(this.getCronMapper().map(quartzCron).asString()))
.withScheduleInterval(this.getApp().getAppSchedule().getCronExpression()))
.withService(service);
// Get Pipeline

View File

@ -1,6 +1,5 @@
package org.openmetadata.service.apps.bundles.insights;
import com.cronutils.model.Cron;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
@ -82,8 +81,6 @@ public class DataInsightsApp extends AbstractNativeApplication {
.getByName(null, SERVICE_NAME, serviceRepository.getFields("id"))
.getEntityReference();
Cron quartzCron = getCronParser().parse(getApp().getAppSchedule().getCronExpression());
CreateIngestionPipeline createPipelineRequest =
new CreateIngestionPipeline()
.withName(INGESTION_PIPELINE_NAME)
@ -94,7 +91,7 @@ public class DataInsightsApp extends AbstractNativeApplication {
.withSourceConfig(new SourceConfig().withConfig(new MetadataToElasticSearchPipeline()))
.withAirflowConfig(
new AirflowConfig()
.withScheduleInterval(getCronMapper().map(quartzCron).asString()))
.withScheduleInterval(getApp().getAppSchedule().getCronExpression()))
.withService(service);
// Get Pipeline

View File

@ -1,8 +1,13 @@
package org.openmetadata.service.apps.scheduler;
import static com.cronutils.model.CronType.UNIX;
import static org.openmetadata.service.apps.AbstractNativeApplication.getAppRuntime;
import static org.quartz.impl.matchers.GroupMatcher.jobGroupEquals;
import com.cronutils.mapper.CronMapper;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -42,6 +47,9 @@ public class AppScheduler {
private static final ConcurrentHashMap<UUID, JobDetail> appJobsKeyMap = new ConcurrentHashMap<>();
private final CollectionDAO collectionDAO;
private final SearchRepository searchClient;
private static final @Getter CronMapper cronMapper = CronMapper.fromUnixToQuartz();
private static final @Getter CronParser cronParser =
new CronParser(CronDefinitionBuilder.instanceDefinitionFor(UNIX));
private AppScheduler(CollectionDAO dao, SearchRepository searchClient) throws SchedulerException {
this.collectionDAO = dao;
@ -142,7 +150,8 @@ public class AppScheduler {
return CronScheduleBuilder.monthlyOnDayAndHourAndMinute(1, 0, 0);
case CUSTOM:
if (!CommonUtil.nullOrEmpty(scheduleInfo.getCronExpression())) {
return CronScheduleBuilder.cronSchedule(scheduleInfo.getCronExpression());
Cron unixCron = getCronParser().parse(scheduleInfo.getCronExpression());
return CronScheduleBuilder.cronSchedule(getCronMapper().map(unixCron).asString());
} else {
throw new IllegalArgumentException("Missing Cron Expression for Custom Schedule.");
}

View File

@ -0,0 +1,31 @@
package org.openmetadata.service.migration.mysql.v131;
import static org.openmetadata.service.migration.utils.v131.MigrationUtil.migrateCronExpression;
import lombok.SneakyThrows;
import org.jdbi.v3.core.Handle;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
public class Migration extends MigrationProcessImpl {
private CollectionDAO collectionDAO;
private Handle handle;
public Migration(MigrationFile migrationFile) {
super(migrationFile);
}
@Override
public void initialize(Handle handle) {
super.initialize(handle);
this.handle = handle;
this.collectionDAO = handle.attach(CollectionDAO.class);
}
@Override
@SneakyThrows
public void runDataMigration() {
migrateCronExpression(collectionDAO);
}
}

View File

@ -0,0 +1,31 @@
package org.openmetadata.service.migration.postgres.v131;
import static org.openmetadata.service.migration.utils.v131.MigrationUtil.migrateCronExpression;
import lombok.SneakyThrows;
import org.jdbi.v3.core.Handle;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
public class Migration extends MigrationProcessImpl {
private CollectionDAO collectionDAO;
private Handle handle;
public Migration(MigrationFile migrationFile) {
super(migrationFile);
}
@Override
public void initialize(Handle handle) {
super.initialize(handle);
this.handle = handle;
this.collectionDAO = handle.attach(CollectionDAO.class);
}
@Override
@SneakyThrows
public void runDataMigration() {
migrateCronExpression(collectionDAO);
}
}

View File

@ -0,0 +1,46 @@
package org.openmetadata.service.migration.utils.v131;
import static com.cronutils.model.CronType.QUARTZ;
import com.cronutils.mapper.CronMapper;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class MigrationUtil {
private MigrationUtil() {
/* Cannot create object util class*/
}
public static void migrateCronExpression(CollectionDAO daoCollection) {
try {
CronMapper quartzToUnixMapper = CronMapper.fromQuartzToUnix();
CronParser quartzParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
ListFilter filter = new ListFilter(Include.ALL);
List<String> jsons = daoCollection.applicationDAO().listAfter(filter, Integer.MAX_VALUE, "");
for (String jsonStr : jsons) {
App application = JsonUtils.readValue(jsonStr, App.class);
String cronExpression = application.getAppSchedule().getCronExpression();
Cron quartzCronExpression = quartzParser.parse(cronExpression);
String unixCron = quartzToUnixMapper.map(quartzCronExpression).asString();
application.getAppSchedule().setCronExpression(unixCron);
daoCollection.applicationDAO().update(application);
}
} catch (IllegalArgumentException e) {
LOG.warn(
"Got IllegalArgumentExpr Cron Expression might already be Migrated. Message : {}",
e.getMessage());
} catch (Exception ex) {
LOG.error("Error while migrating cron expression, Logging and moving further", ex);
}
}
}

View File

@ -4,6 +4,6 @@
"appConfiguration": {},
"appSchedule": {
"scheduleType": "Custom",
"cronExpression": "0 0 0 1/1 * ? *"
"cronExpression": "0 0 1/1 * *"
}
}

View File

@ -44,6 +44,6 @@
},
"appSchedule": {
"scheduleType": "Custom",
"cronExpression": "0 0 0 1/1 * ? *"
"cronExpression": "0 0 * * *"
}
}