From 9fae19dd7a97fb642f11c18c38cbca097a046f99 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Wed, 14 Feb 2024 19:59:14 +0530 Subject: [PATCH] format cron from unix to quartz for internal apps , for others use unix (#15070) * format cron from unix to quartz for internal apps , for others use unix * Add Migration Files for 131 * Add Migrations * Fix Json Issue and Add Exception Handling for force --------- Co-authored-by: Ashish Gupta --- .../mysql/postDataMigrationSQLScript.sql | 0 .../native/1.3.1/mysql/schemaChanges.sql | 0 .../postgres/postDataMigrationSQLScript.sql | 0 .../native/1.3.1/postgres/schemaChanges.sql | 0 .../apps/AbstractNativeApplication.java | 13 +----- .../bundles/insights/DataInsightsApp.java | 5 +- .../service/apps/scheduler/AppScheduler.java | 11 ++++- .../migration/mysql/v131/Migration.java | 31 +++++++++++++ .../migration/postgres/v131/Migration.java | 31 +++++++++++++ .../migration/utils/v131/MigrationUtil.java | 46 +++++++++++++++++++ .../data/app/DataInsightsApplication.json | 2 +- .../data/app/SearchIndexingApplication.json | 2 +- 12 files changed, 122 insertions(+), 19 deletions(-) create mode 100644 bootstrap/sql/migrations/native/1.3.1/mysql/postDataMigrationSQLScript.sql create mode 100644 bootstrap/sql/migrations/native/1.3.1/mysql/schemaChanges.sql create mode 100644 bootstrap/sql/migrations/native/1.3.1/postgres/postDataMigrationSQLScript.sql create mode 100644 bootstrap/sql/migrations/native/1.3.1/postgres/schemaChanges.sql create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v131/Migration.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v131/Migration.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v131/MigrationUtil.java diff --git a/bootstrap/sql/migrations/native/1.3.1/mysql/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.3.1/mysql/postDataMigrationSQLScript.sql new file mode 100644 index 00000000000..e69de29bb2d diff --git a/bootstrap/sql/migrations/native/1.3.1/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.3.1/mysql/schemaChanges.sql new file mode 100644 index 00000000000..e69de29bb2d diff --git a/bootstrap/sql/migrations/native/1.3.1/postgres/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.3.1/postgres/postDataMigrationSQLScript.sql new file mode 100644 index 00000000000..e69de29bb2d diff --git a/bootstrap/sql/migrations/native/1.3.1/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.3.1/postgres/schemaChanges.sql new file mode 100644 index 00000000000..e69de29bb2d diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java index cd01c50638f..a8fd6a2228e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java @@ -1,16 +1,11 @@ package org.openmetadata.service.apps; -import static com.cronutils.model.CronType.QUARTZ; import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME; import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; import static org.openmetadata.service.apps.scheduler.AppScheduler.COLLECTION_DAO_KEY; import static org.openmetadata.service.apps.scheduler.AppScheduler.SEARCH_CLIENT_KEY; import static org.openmetadata.service.exception.CatalogExceptionMessage.LIVE_APP_SCHEDULE_ERR; -import com.cronutils.mapper.CronMapper; -import com.cronutils.model.Cron; -import com.cronutils.model.definition.CronDefinitionBuilder; -import com.cronutils.parser.CronParser; import java.util.List; import lombok.Getter; import lombok.SneakyThrows; @@ -51,9 +46,6 @@ public class AbstractNativeApplication implements NativeApplication { protected CollectionDAO collectionDAO; private @Getter App app; protected SearchRepository searchRepository; - private final @Getter CronMapper cronMapper = CronMapper.fromQuartzToUnix(); - private final @Getter CronParser cronParser = - new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ)); // Default service that contains external apps' Ingestion Pipelines private static final String SERVICE_NAME = "OpenMetadata"; @@ -149,9 +141,6 @@ public class AbstractNativeApplication implements NativeApplication { .getByName(null, SERVICE_NAME, serviceEntityRepository.getFields("id")) .getEntityReference(); - Cron quartzCron = - this.getCronParser().parse(this.getApp().getAppSchedule().getCronExpression()); - CreateIngestionPipeline createPipelineRequest = new CreateIngestionPipeline() .withName(this.getApp().getName()) @@ -167,7 +156,7 @@ public class AbstractNativeApplication implements NativeApplication { .withAppPrivateConfig(this.getApp().getPrivateConfiguration()))) .withAirflowConfig( new AirflowConfig() - .withScheduleInterval(this.getCronMapper().map(quartzCron).asString())) + .withScheduleInterval(this.getApp().getAppSchedule().getCronExpression())) .withService(service); // Get Pipeline diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java index f50c977df56..86b078b84da 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java @@ -1,6 +1,5 @@ package org.openmetadata.service.apps.bundles.insights; -import com.cronutils.model.Cron; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline; @@ -82,8 +81,6 @@ public class DataInsightsApp extends AbstractNativeApplication { .getByName(null, SERVICE_NAME, serviceRepository.getFields("id")) .getEntityReference(); - Cron quartzCron = getCronParser().parse(getApp().getAppSchedule().getCronExpression()); - CreateIngestionPipeline createPipelineRequest = new CreateIngestionPipeline() .withName(INGESTION_PIPELINE_NAME) @@ -94,7 +91,7 @@ public class DataInsightsApp extends AbstractNativeApplication { .withSourceConfig(new SourceConfig().withConfig(new MetadataToElasticSearchPipeline())) .withAirflowConfig( new AirflowConfig() - .withScheduleInterval(getCronMapper().map(quartzCron).asString())) + .withScheduleInterval(getApp().getAppSchedule().getCronExpression())) .withService(service); // Get Pipeline diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index 7f37ddfc753..cbe830d6289 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -1,8 +1,13 @@ package org.openmetadata.service.apps.scheduler; +import static com.cronutils.model.CronType.UNIX; import static org.openmetadata.service.apps.AbstractNativeApplication.getAppRuntime; import static org.quartz.impl.matchers.GroupMatcher.jobGroupEquals; +import com.cronutils.mapper.CronMapper; +import com.cronutils.model.Cron; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.parser.CronParser; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -42,6 +47,9 @@ public class AppScheduler { private static final ConcurrentHashMap appJobsKeyMap = new ConcurrentHashMap<>(); private final CollectionDAO collectionDAO; private final SearchRepository searchClient; + private static final @Getter CronMapper cronMapper = CronMapper.fromUnixToQuartz(); + private static final @Getter CronParser cronParser = + new CronParser(CronDefinitionBuilder.instanceDefinitionFor(UNIX)); private AppScheduler(CollectionDAO dao, SearchRepository searchClient) throws SchedulerException { this.collectionDAO = dao; @@ -142,7 +150,8 @@ public class AppScheduler { return CronScheduleBuilder.monthlyOnDayAndHourAndMinute(1, 0, 0); case CUSTOM: if (!CommonUtil.nullOrEmpty(scheduleInfo.getCronExpression())) { - return CronScheduleBuilder.cronSchedule(scheduleInfo.getCronExpression()); + Cron unixCron = getCronParser().parse(scheduleInfo.getCronExpression()); + return CronScheduleBuilder.cronSchedule(getCronMapper().map(unixCron).asString()); } else { throw new IllegalArgumentException("Missing Cron Expression for Custom Schedule."); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v131/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v131/Migration.java new file mode 100644 index 00000000000..04a27b28dec --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v131/Migration.java @@ -0,0 +1,31 @@ +package org.openmetadata.service.migration.mysql.v131; + +import static org.openmetadata.service.migration.utils.v131.MigrationUtil.migrateCronExpression; + +import lombok.SneakyThrows; +import org.jdbi.v3.core.Handle; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.migration.api.MigrationProcessImpl; +import org.openmetadata.service.migration.utils.MigrationFile; + +public class Migration extends MigrationProcessImpl { + private CollectionDAO collectionDAO; + private Handle handle; + + public Migration(MigrationFile migrationFile) { + super(migrationFile); + } + + @Override + public void initialize(Handle handle) { + super.initialize(handle); + this.handle = handle; + this.collectionDAO = handle.attach(CollectionDAO.class); + } + + @Override + @SneakyThrows + public void runDataMigration() { + migrateCronExpression(collectionDAO); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v131/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v131/Migration.java new file mode 100644 index 00000000000..b5570372ba3 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v131/Migration.java @@ -0,0 +1,31 @@ +package org.openmetadata.service.migration.postgres.v131; + +import static org.openmetadata.service.migration.utils.v131.MigrationUtil.migrateCronExpression; + +import lombok.SneakyThrows; +import org.jdbi.v3.core.Handle; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.migration.api.MigrationProcessImpl; +import org.openmetadata.service.migration.utils.MigrationFile; + +public class Migration extends MigrationProcessImpl { + private CollectionDAO collectionDAO; + private Handle handle; + + public Migration(MigrationFile migrationFile) { + super(migrationFile); + } + + @Override + public void initialize(Handle handle) { + super.initialize(handle); + this.handle = handle; + this.collectionDAO = handle.attach(CollectionDAO.class); + } + + @Override + @SneakyThrows + public void runDataMigration() { + migrateCronExpression(collectionDAO); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v131/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v131/MigrationUtil.java new file mode 100644 index 00000000000..3febb3e68a9 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v131/MigrationUtil.java @@ -0,0 +1,46 @@ +package org.openmetadata.service.migration.utils.v131; + +import static com.cronutils.model.CronType.QUARTZ; + +import com.cronutils.mapper.CronMapper; +import com.cronutils.model.Cron; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.parser.CronParser; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.entity.app.App; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.util.JsonUtils; + +@Slf4j +public class MigrationUtil { + + private MigrationUtil() { + /* Cannot create object util class*/ + } + + public static void migrateCronExpression(CollectionDAO daoCollection) { + try { + CronMapper quartzToUnixMapper = CronMapper.fromQuartzToUnix(); + CronParser quartzParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ)); + ListFilter filter = new ListFilter(Include.ALL); + List jsons = daoCollection.applicationDAO().listAfter(filter, Integer.MAX_VALUE, ""); + for (String jsonStr : jsons) { + App application = JsonUtils.readValue(jsonStr, App.class); + String cronExpression = application.getAppSchedule().getCronExpression(); + Cron quartzCronExpression = quartzParser.parse(cronExpression); + String unixCron = quartzToUnixMapper.map(quartzCronExpression).asString(); + application.getAppSchedule().setCronExpression(unixCron); + daoCollection.applicationDAO().update(application); + } + } catch (IllegalArgumentException e) { + LOG.warn( + "Got IllegalArgumentExpr Cron Expression might already be Migrated. Message : {}", + e.getMessage()); + } catch (Exception ex) { + LOG.error("Error while migrating cron expression, Logging and moving further", ex); + } + } +} diff --git a/openmetadata-service/src/main/resources/json/data/app/DataInsightsApplication.json b/openmetadata-service/src/main/resources/json/data/app/DataInsightsApplication.json index 12019f3fd4d..590d467790a 100644 --- a/openmetadata-service/src/main/resources/json/data/app/DataInsightsApplication.json +++ b/openmetadata-service/src/main/resources/json/data/app/DataInsightsApplication.json @@ -4,6 +4,6 @@ "appConfiguration": {}, "appSchedule": { "scheduleType": "Custom", - "cronExpression": "0 0 0 1/1 * ? *" + "cronExpression": "0 0 1/1 * *" } } \ No newline at end of file diff --git a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json index d32c4ace2ec..179fc674cdd 100644 --- a/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json +++ b/openmetadata-service/src/main/resources/json/data/app/SearchIndexingApplication.json @@ -44,6 +44,6 @@ }, "appSchedule": { "scheduleType": "Custom", - "cronExpression": "0 0 0 1/1 * ? *" + "cronExpression": "0 0 * * *" } } \ No newline at end of file