getExpectedFlywayVersions() {
+ try {
+ // Query server_change_log for versions where migrationFileName contains 'flyway'
+ return migrationDAO.getFlywayMigrationVersions();
+ } catch (Exception e) {
+ // If there's an error (e.g., table doesn't exist yet), return empty list
+ LOG.debug("Could not fetch Flyway versions from SERVER_CHANGE_LOG: {}", e.getMessage());
+ return List.of();
+ }
+ }
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationProcess.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationProcess.java
index 0539aaa9ea4..86fa6e105d8 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationProcess.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationProcess.java
@@ -8,10 +8,9 @@ import org.openmetadata.service.migration.QueryStatus;
import org.openmetadata.service.migration.context.MigrationOps;
/**
- * Migration framework interface that supports three implementation approaches:
- * 1. Flyway (deprecated, do not add new migrations here)
- * 2. Native SQL migrations
- * 3. Java-based migrations
+ * Migration framework interface that supports two implementation approaches:
+ * 1. Native SQL migrations
+ * 2. Java-based migrations
*
* Migration Execution Order:
* Migrations are executed in a specific sequence that must be maintained:
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationProcessImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationProcessImpl.java
index 45265395cc0..e2cc99430ba 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationProcessImpl.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationProcessImpl.java
@@ -118,10 +118,22 @@ public class MigrationProcessImpl implements MigrationProcess {
if (!nullOrEmpty(queryList)) {
for (String sql : queryList) {
try {
- String previouslyRanSql = migrationDAO.getSqlQuery(hash(sql), version);
+ String previouslyRanSql = null;
+ try {
+ previouslyRanSql = migrationDAO.getSqlQuery(hash(sql), version);
+ } catch (Exception dbException) {
+ // If SERVER_MIGRATION_SQL_LOGS table doesn't exist yet, assume query hasn't run
+ previouslyRanSql = null;
+ }
+
if ((previouslyRanSql == null || previouslyRanSql.isEmpty())) {
handle.execute(sql);
- migrationDAO.upsertServerMigrationSQL(version, sql, hash(sql));
+ try {
+ migrationDAO.upsertServerMigrationSQL(version, sql, hash(sql));
+ } catch (Exception logException) {
+ // If logging fails (table doesn't exist yet), continue - the SQL was executed
+ // successfully
+ }
}
queryStatusMap.put(
sql, new QueryStatus(QueryStatus.Status.SUCCESS, "Successfully Executed Query"));
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java
index c46d50331c8..08d81f6ae0e 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java
@@ -23,6 +23,7 @@ import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.QueryStatus;
import org.openmetadata.service.migration.context.MigrationContext;
import org.openmetadata.service.migration.context.MigrationWorkflowContext;
+import org.openmetadata.service.migration.utils.FlywayMigrationFile;
import org.openmetadata.service.migration.utils.MigrationFile;
import org.openmetadata.service.util.AsciiTable;
@@ -35,6 +36,7 @@ public class MigrationWorkflow {
private final String nativeSQLScriptRootPath;
private final ConnectionType connectionType;
private final String extensionSQLScriptRootPath;
+ private final String flywayPath;
@Getter private final OpenMetadataApplicationConfig openMetadataApplicationConfig;
private final MigrationDAO migrationDAO;
private final Jdbi jdbi;
@@ -47,6 +49,7 @@ public class MigrationWorkflow {
String nativeSQLScriptRootPath,
ConnectionType connectionType,
String extensionSQLScriptRootPath,
+ String flywayPath,
OpenMetadataApplicationConfig config,
boolean forceMigrations) {
this.jdbi = jdbi;
@@ -55,6 +58,7 @@ public class MigrationWorkflow {
this.nativeSQLScriptRootPath = nativeSQLScriptRootPath;
this.connectionType = connectionType;
this.extensionSQLScriptRootPath = extensionSQLScriptRootPath;
+ this.flywayPath = flywayPath;
this.openMetadataApplicationConfig = config;
}
@@ -65,7 +69,8 @@ public class MigrationWorkflow {
nativeSQLScriptRootPath,
connectionType,
openMetadataApplicationConfig,
- extensionSQLScriptRootPath);
+ extensionSQLScriptRootPath,
+ flywayPath);
// Filter Migrations to Be Run
this.migrations = filterAndGetMigrationsToRun(availableMigrations);
}
@@ -84,27 +89,35 @@ public class MigrationWorkflow {
String nativeSQLScriptRootPath,
ConnectionType connectionType,
OpenMetadataApplicationConfig config,
- String extensionSQLScriptRootPath) {
+ String extensionSQLScriptRootPath,
+ String flywayPath) {
List availableOMNativeMigrations =
getMigrationFilesFromPath(nativeSQLScriptRootPath, connectionType, config, false);
- // If we only have OM migrations, return them
- if (extensionSQLScriptRootPath == null || extensionSQLScriptRootPath.isEmpty()) {
- return availableOMNativeMigrations;
+ // Get Flyway migrations first (they should run before native migrations)
+ List availableFlywayMigrations =
+ FlywayMigrationFile.getFlywayMigrationFiles(
+ flywayPath, connectionType, config, migrationDAO);
+
+ // Get extension migrations if available
+ List availableExtensionMigrations = new ArrayList<>();
+ if (extensionSQLScriptRootPath != null && !extensionSQLScriptRootPath.isEmpty()) {
+ availableExtensionMigrations =
+ getMigrationFilesFromPath(extensionSQLScriptRootPath, connectionType, config, true);
}
- // Otherwise, fetch the extension migrations and sort the executions
- List availableExtensionMigrations =
- getMigrationFilesFromPath(extensionSQLScriptRootPath, connectionType, config, true);
-
/*
- If we create migrations version as:
- - OpenMetadata: 1.1.0, 1.1.1, 1.2.0
- - Extension: 1.1.0-extension, 1.2.0-extension
- The end result will be 1.1.0, 1.1.0-extension, 1.1.1, 1.2.0, 1.2.0-extension
+ Combined execution order:
+ 1. Flyway migrations (legacy SQL files from Flyway)
+ 2. OpenMetadata native migrations
+ 3. Extension migrations
+ All sorted by version within their respective groups
*/
- return Stream.concat(
- availableOMNativeMigrations.stream(), availableExtensionMigrations.stream())
+ return Stream.of(
+ availableFlywayMigrations.stream().map(f -> (MigrationFile) f),
+ availableOMNativeMigrations.stream(),
+ availableExtensionMigrations.stream())
+ .flatMap(stream -> stream)
.sorted()
.toList();
}
@@ -123,7 +136,14 @@ public class MigrationWorkflow {
private List filterAndGetMigrationsToRun(
List availableMigrations) {
LOG.debug("Filtering Server Migrations");
- executedMigrations = migrationDAO.getMigrationVersions();
+ try {
+ executedMigrations = migrationDAO.getMigrationVersions();
+ } catch (Exception e) {
+ // SERVER_CHANGE_LOG table doesn't exist yet, run all migrations including Flyway
+ LOG.info(
+ "SERVER_CHANGE_LOG table doesn't exist yet, will run all migrations including Flyway");
+ executedMigrations = new ArrayList<>();
+ }
currentMaxMigrationVersion =
executedMigrations.stream().max(MigrationWorkflow::compareVersions);
List applyMigrations;
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v1110/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v1110/Migration.java
index 09e15db448a..0c19e83c798 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v1110/Migration.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v1110/Migration.java
@@ -1,7 +1,9 @@
package org.openmetadata.service.migration.mysql.v1110;
import java.util.Map;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.QueryStatus;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
@@ -13,7 +15,7 @@ public class Migration extends MigrationProcessImpl {
public Migration(MigrationFile migrationFile) {
super(migrationFile);
- this.migrationUtil = new MigrationUtil(migrationFile);
+ this.migrationUtil = new MigrationUtil(ConnectionType.MYSQL, migrationFile);
}
@Override
@@ -28,4 +30,10 @@ public class Migration extends MigrationProcessImpl {
isForceMigration));
return result;
}
+
+ @Override
+ @SneakyThrows
+ public void runDataMigration() {
+ this.migrationUtil.migrateFlywayHistory(handle);
+ }
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v1110/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v1110/Migration.java
index ffbf01614c3..cb47e6046e8 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v1110/Migration.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v1110/Migration.java
@@ -1,7 +1,9 @@
package org.openmetadata.service.migration.postgres.v1110;
import java.util.Map;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.QueryStatus;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
@@ -13,7 +15,7 @@ public class Migration extends MigrationProcessImpl {
public Migration(MigrationFile migrationFile) {
super(migrationFile);
- this.migrationUtil = new MigrationUtil(migrationFile);
+ this.migrationUtil = new MigrationUtil(ConnectionType.POSTGRES, migrationFile);
}
@Override
@@ -28,4 +30,10 @@ public class Migration extends MigrationProcessImpl {
isForceMigration));
return result;
}
+
+ @Override
+ @SneakyThrows
+ public void runDataMigration() {
+ this.migrationUtil.migrateFlywayHistory(handle);
+ }
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/FlywayMigrationFile.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/FlywayMigrationFile.java
new file mode 100644
index 00000000000..10284ec47c6
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/FlywayMigrationFile.java
@@ -0,0 +1,163 @@
+package org.openmetadata.service.migration.utils;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.flywaydb.core.api.configuration.ClassicConfiguration;
+import org.flywaydb.core.api.configuration.Configuration;
+import org.flywaydb.core.internal.database.postgresql.PostgreSQLParser;
+import org.flywaydb.core.internal.parser.Parser;
+import org.flywaydb.core.internal.parser.ParsingContext;
+import org.flywaydb.core.internal.resource.filesystem.FileSystemResource;
+import org.flywaydb.core.internal.sqlscript.SqlStatementIterator;
+import org.flywaydb.database.mysql.MySQLParser;
+import org.jetbrains.annotations.Nullable;
+import org.openmetadata.service.OpenMetadataApplicationConfig;
+import org.openmetadata.service.jdbi3.MigrationDAO;
+import org.openmetadata.service.jdbi3.locator.ConnectionType;
+import org.openmetadata.service.util.EntityUtil;
+
+public class FlywayMigrationFile extends MigrationFile {
+ private static final Pattern FLYWAY_FILE_PATTERN =
+ Pattern.compile("^[vV](\\d+(?:\\.\\d+)*)__.*\\.sql$");
+
+ private final File sqlFile;
+
+ public FlywayMigrationFile(
+ File sqlFile,
+ MigrationDAO migrationDAO,
+ ConnectionType connectionType,
+ OpenMetadataApplicationConfig config) {
+ super(createFlywayVersionDir(sqlFile), migrationDAO, connectionType, config, false);
+ this.sqlFile = sqlFile;
+ }
+
+ private static File createFlywayVersionDir(File sqlFile) {
+ // Create a virtual directory with semantic version name that doesn't need to exist
+ String flywayVersion = extractVersionFromFilename(sqlFile.getName());
+ String semanticVersion = String.format("0.0.%d", Integer.parseInt(flywayVersion));
+ // Return a virtual directory - it doesn't need to exist since we override file methods
+ return new File(sqlFile.getParent(), semanticVersion);
+ }
+
+ private static String extractVersionFromFilename(String filename) {
+ Matcher matcher = FLYWAY_FILE_PATTERN.matcher(filename);
+ if (matcher.matches()) {
+ return matcher.group(1);
+ }
+ throw new IllegalArgumentException("Invalid Flyway migration filename: " + filename);
+ }
+
+ @Override
+ public void parseSQLFiles() {
+ if (sqlFile.exists() && sqlFile.isFile()) {
+ try {
+ final ParsingContext parsingContext = new ParsingContext();
+ Configuration configuration = new ClassicConfiguration();
+ Parser parser = new PostgreSQLParser(configuration, parsingContext);
+ if (connectionType == ConnectionType.MYSQL) {
+ parser = new MySQLParser(configuration, parsingContext);
+ }
+
+ try (SqlStatementIterator sqlIterator =
+ parser.parse(
+ new FileSystemResource(
+ null, sqlFile.getAbsolutePath(), StandardCharsets.UTF_8, true))) {
+ while (sqlIterator.hasNext()) {
+ String sqlStatement = sqlIterator.next().getSql();
+ if (!checkIfQueryPreviouslyRan(sqlStatement)) {
+ schemaChanges.add(sqlStatement);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to parse Flyway migration file: " + sqlFile.getPath(), e);
+ }
+ }
+ }
+
+ @Override
+ public String getSchemaChangesFile() {
+ return sqlFile.getAbsolutePath();
+ }
+
+ @Override
+ public String getPostDDLScriptFile() {
+ // Flyway files don't have separate post DDL scripts
+ return "";
+ }
+
+ @Override
+ public String getMigrationsFilePath() {
+ return sqlFile.getAbsolutePath();
+ }
+
+ public static boolean isFlywayMigrationFile(File file) {
+ return file.isFile() && FLYWAY_FILE_PATTERN.matcher(file.getName()).matches();
+ }
+
+ public static List getFlywayMigrationFiles(
+ String flywayPath,
+ ConnectionType connectionType,
+ OpenMetadataApplicationConfig config,
+ MigrationDAO migrationDAO) {
+ List flywayMigrations = new ArrayList<>();
+
+ if (flywayPath == null || flywayPath.isEmpty()) {
+ return flywayMigrations;
+ }
+
+ File flywayDir = new File(flywayPath);
+ if (!flywayDir.exists() || !flywayDir.isDirectory()) {
+ return flywayMigrations;
+ }
+
+ // Get database-specific subdirectory using the actual directory names
+ File[] sqlFiles = getFiles(connectionType, flywayDir);
+
+ if (sqlFiles != null) {
+ Arrays.stream(sqlFiles)
+ .map(file -> new FlywayMigrationFile(file, migrationDAO, connectionType, config))
+ .sorted()
+ .forEach(flywayMigrations::add);
+ }
+
+ return flywayMigrations;
+ }
+
+ private static File @Nullable [] getFiles(ConnectionType connectionType, File flywayDir) {
+ String dbSubDir =
+ connectionType == ConnectionType.MYSQL
+ ? "com.mysql.cj.jdbc.Driver"
+ : "org.postgresql.Driver";
+ File dbSpecificDir = new File(flywayDir, dbSubDir);
+
+ if (!dbSpecificDir.exists() || !dbSpecificDir.isDirectory()) {
+ // Try legacy naming convention
+ String legacyDbSubDir = connectionType == ConnectionType.MYSQL ? "mysql" : "postgresql";
+ dbSpecificDir = new File(flywayDir, legacyDbSubDir);
+ if (!dbSpecificDir.exists() || !dbSpecificDir.isDirectory()) {
+ // Try the root flyway directory
+ dbSpecificDir = flywayDir;
+ }
+ }
+
+ return dbSpecificDir.listFiles((dir1, name) -> FLYWAY_FILE_PATTERN.matcher(name).matches());
+ }
+
+ private boolean checkIfQueryPreviouslyRan(String query) {
+ try {
+ String checksum = EntityUtil.hash(query);
+ String sqlStatement = migrationDAO.checkIfQueryPreviouslyRan(checksum);
+ return sqlStatement != null;
+ } catch (Exception e) {
+ // If SERVER_MIGRATION_SQL_LOGS table doesn't exist yet, assume query hasn't run
+ return false;
+ }
+ }
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/MigrationFile.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/MigrationFile.java
index 15aab810361..202abe58454 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/MigrationFile.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/MigrationFile.java
@@ -28,9 +28,9 @@ public class MigrationFile implements Comparable {
public final Boolean isExtension;
public final String dbPackageName;
- private final MigrationDAO migrationDAO;
- private final List schemaChanges;
- private final List postDDLScripts;
+ protected final MigrationDAO migrationDAO;
+ protected final List schemaChanges;
+ protected final List postDDLScripts;
public static final String DEFAULT_MIGRATION_PROCESS_CLASS =
"org.openmetadata.service.migration.api.MigrationProcessImpl";
@@ -70,6 +70,7 @@ public class MigrationFile implements Comparable {
if (connectionType == ConnectionType.MYSQL) {
parser = new MySQLParser(configuration, parsingContext);
}
+
if (new File(getSchemaChangesFile()).isFile()) {
try (SqlStatementIterator schemaChangesIterator =
parser.parse(
@@ -80,18 +81,24 @@ public class MigrationFile implements Comparable {
schemaChanges.add(sqlStatement);
}
}
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to parse schema changes file: " + getSchemaChangesFile(), e);
}
}
if (new File(getPostDDLScriptFile()).isFile()) {
- try (SqlStatementIterator schemaChangesIterator =
+ try (SqlStatementIterator postDDLIterator =
parser.parse(
new FileSystemResource(null, getPostDDLScriptFile(), StandardCharsets.UTF_8, true))) {
- while (schemaChangesIterator.hasNext()) {
- String sqlStatement = schemaChangesIterator.next().getSql();
+ while (postDDLIterator.hasNext()) {
+ String sqlStatement = postDDLIterator.next().getSql();
if (!checkIfQueryPreviouslyRan(sqlStatement)) {
postDDLScripts.add(sqlStatement);
}
}
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to parse post DDL script file: " + getPostDDLScriptFile(), e);
}
}
}
@@ -188,8 +195,13 @@ public class MigrationFile implements Comparable {
}
private boolean checkIfQueryPreviouslyRan(String query) {
- String checksum = EntityUtil.hash(query);
- String sqlStatement = migrationDAO.checkIfQueryPreviouslyRan(checksum);
- return sqlStatement != null;
+ try {
+ String checksum = EntityUtil.hash(query);
+ String sqlStatement = migrationDAO.checkIfQueryPreviouslyRan(checksum);
+ return sqlStatement != null;
+ } catch (Exception e) {
+ // If SERVER_MIGRATION_SQL_LOGS table doesn't exist yet, assume query hasn't run
+ return false;
+ }
}
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1110/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1110/MigrationUtil.java
index 89ee113310e..cd86ff380c1 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1110/MigrationUtil.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1110/MigrationUtil.java
@@ -15,6 +15,7 @@ import org.openmetadata.schema.type.Recognizer;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.MigrationDAO;
+import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.QueryStatus;
import org.openmetadata.service.migration.utils.MigrationFile;
@@ -26,7 +27,11 @@ public class MigrationUtil {
"PII.NonSensitive", "data/tags/NonSensitive.json");
private final MigrationFile migrationFile;
- public MigrationUtil(MigrationFile migrationFile) {
+ private final ConnectionType connectionType;
+ public static final String FLYWAY_TABLE_NAME = "DATABASE_CHANGE_LOG";
+
+ public MigrationUtil(ConnectionType connectionType, MigrationFile migrationFile) {
+ this.connectionType = connectionType;
this.migrationFile = migrationFile;
}
@@ -96,4 +101,132 @@ public class MigrationUtil {
}
}
}
+
+ public void migrateFlywayHistory(Handle handle) {
+ try {
+ LOG.info("Starting v1100 migration of Flyway history to SERVER_CHANGE_LOG");
+
+ // Check if DATABASE_CHANGE_LOG table exists
+ boolean tableExists = checkTableExists(handle, "DATABASE_CHANGE_LOG");
+
+ if (!tableExists) {
+ LOG.info("Flyway DATABASE_CHANGE_LOG table does not exist, skipping migration");
+ return;
+ }
+
+ // Check if Flyway records have already been migrated
+ if (hasFlywayDataAlreadyMigrated(handle)) {
+ LOG.info(
+ "Flyway records have already been migrated to SERVER_CHANGE_LOG, skipping migration");
+ return;
+ }
+
+ // Insert missing v000 baseline record if not present
+ insertV000RecordIfMissing(handle);
+
+ // Migrate Flyway migration records to SERVER_CHANGE_LOG
+ int migratedCount = migrateFlywayHistoryRecords(handle);
+
+ if (migratedCount > 0) {
+ LOG.info(
+ "Successfully migrated {} Flyway migration records to SERVER_CHANGE_LOG",
+ migratedCount);
+ } else {
+ LOG.info("No new Flyway migration records to migrate");
+ }
+
+ } catch (Exception e) {
+ LOG.error("Error during Flyway history migration", e);
+ }
+ }
+
+ public boolean checkTableExists(Handle handle, String tableName) {
+ String query =
+ switch (connectionType) {
+ case MYSQL -> "SELECT COUNT(*) FROM information_schema.tables "
+ + "WHERE table_schema = DATABASE() AND table_name = ?";
+ case POSTGRES -> "SELECT COUNT(*) FROM information_schema.tables "
+ + "WHERE table_schema = current_schema() AND table_name = ?";
+ };
+
+ Integer count = handle.createQuery(query).bind(0, tableName).mapTo(Integer.class).one();
+
+ return count > 0;
+ }
+
+ public boolean hasFlywayDataAlreadyMigrated(Handle handle) {
+ String countQuery =
+ switch (connectionType) {
+ case MYSQL -> """
+ SELECT COUNT(*) FROM SERVER_CHANGE_LOG scl
+ INNER JOIN DATABASE_CHANGE_LOG dcl ON CONCAT('0.0.', CAST(dcl.version AS UNSIGNED)) = scl.version
+ WHERE scl.migrationfilename LIKE '%flyway%'
+ """;
+ case POSTGRES -> """
+ SELECT COUNT(*) FROM SERVER_CHANGE_LOG scl
+ INNER JOIN "DATABASE_CHANGE_LOG" dcl ON '0.0.' || CAST(dcl.version AS INTEGER) = scl.version
+ WHERE scl.migrationfilename LIKE '%flyway%'
+ """;
+ };
+
+ Integer count = handle.createQuery(countQuery).mapTo(Integer.class).one();
+
+ return count > 0;
+ }
+
+ private void insertV000RecordIfMissing(Handle handle) {
+ String insertQuery =
+ switch (connectionType) {
+ case MYSQL -> """
+ INSERT IGNORE INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
+ VALUES ('0.0.0', 'bootstrap/sql/migrations/flyway/mysql/v000__create_db_connection_info.sql', '0', NOW(), NULL)
+ """;
+ case POSTGRES -> """
+ INSERT INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
+ VALUES ('0.0.0', 'bootstrap/sql/migrations/flyway/postgres/v000__create_db_connection_info.sql', '0', current_timestamp, NULL)
+ ON CONFLICT (version) DO NOTHING
+ """;
+ };
+
+ int inserted = handle.createUpdate(insertQuery).execute();
+ if (inserted > 0) {
+ LOG.info("Inserted missing v0.0.0 baseline record");
+ }
+ }
+
+ private int migrateFlywayHistoryRecords(Handle handle) {
+ String insertQuery =
+ switch (connectionType) {
+ case MYSQL -> """
+ INSERT INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
+ SELECT CONCAT('0.0.', CAST(version AS UNSIGNED)) as version,
+ CASE
+ WHEN script LIKE 'v%__.sql' THEN CONCAT('bootstrap/sql/migrations/flyway/mysql/', script)
+ ELSE CONCAT('bootstrap/sql/migrations/flyway/mysql/v', version, '__', REPLACE(LOWER(description), ' ', '_'), '.sql')
+ END as migrationfilename,
+ '0' as checksum,
+ installed_on,
+ NULL as metrics
+ FROM DATABASE_CHANGE_LOG
+ WHERE CONCAT('0.0.', CAST(version AS UNSIGNED)) NOT IN (SELECT version FROM SERVER_CHANGE_LOG)
+ AND success = true
+ """;
+ case POSTGRES -> """
+ INSERT INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
+ SELECT '0.0.' || CAST(version AS INTEGER) as version,
+ CASE
+ WHEN script LIKE 'v%__.sql' THEN 'bootstrap/sql/migrations/flyway/postgres/' || script
+ ELSE 'bootstrap/sql/migrations/flyway/postgres/v' || version || '__' || REPLACE(LOWER(description), ' ', '_') || '.sql'
+ END as migrationfilename,
+ '0' as checksum,
+ installed_on,
+ NULL as metrics
+ FROM "DATABASE_CHANGE_LOG"
+ WHERE '0.0.' || CAST(version AS INTEGER) NOT IN (SELECT version FROM SERVER_CHANGE_LOG)
+ AND success = true
+ """;
+ };
+
+ return handle.createUpdate(insertQuery).execute();
+ }
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java
index 402b9f32f68..4186be30b9c 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java
@@ -1,6 +1,5 @@
package org.openmetadata.service.util;
-import static org.flywaydb.core.internal.info.MigrationInfoDumper.dumpToAsciiTable;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.ADMIN_USER_NAME;
import static org.openmetadata.service.Entity.FIELD_OWNERS;
@@ -25,7 +24,6 @@ import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.jackson.Jackson;
import io.dropwizard.jersey.validation.Validators;
import jakarta.validation.Validator;
-import java.io.File;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -40,8 +38,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.flywaydb.core.Flyway;
-import org.flywaydb.core.api.MigrationVersion;
+import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.ServiceEntityInterface;
@@ -129,7 +126,6 @@ import picocli.CommandLine.Option;
public class OpenMetadataOperations implements Callable {
private OpenMetadataApplicationConfig config;
- private Flyway flyway;
private Jdbi jdbi;
private SearchRepository searchRepository;
private String nativeSQLScriptRootPath;
@@ -158,54 +154,6 @@ public class OpenMetadataOperations implements Callable {
return 0;
}
- @Command(
- name = "info",
- description =
- "Shows the list of migrations applied and the pending migration "
- + "waiting to be applied on the target database")
- public Integer info() {
- try {
- parseConfig();
- LOG.info(dumpToAsciiTable(flyway.info().all()));
- return 0;
- } catch (Exception e) {
- LOG.error("Failed due to ", e);
- return 1;
- }
- }
-
- @Command(
- name = "validate",
- description =
- "Checks if the all the migrations haven been applied " + "on the target database.")
- public Integer validate() {
- try {
- parseConfig();
- flyway.validate();
- return 0;
- } catch (Exception e) {
- LOG.error("Database migration validation failed due to ", e);
- return 1;
- }
- }
-
- @Command(
- name = "repair",
- description =
- "Repairs the DATABASE_CHANGE_LOG table which is used to track"
- + "all the migrations on the target database This involves removing entries for the failed migrations and update"
- + "the checksum of migrations already applied on the target database")
- public Integer repair() {
- try {
- parseConfig();
- flyway.repair();
- return 0;
- } catch (Exception e) {
- LOG.error("Repair of CHANGE_LOG failed due to ", e);
- return 1;
- }
- }
-
@Command(
name = "setOpenMetadataUrl",
description = "Set or update the OpenMetadata URL in the system repository")
@@ -561,7 +509,7 @@ public class OpenMetadataOperations implements Callable {
public Integer checkConnection() {
try {
parseConfig();
- flyway.getConfiguration().getDataSource().getConnection();
+ jdbi.open().getConnection();
return 0;
} catch (Exception e) {
LOG.error("Failed to check connection due to ", e);
@@ -579,9 +527,7 @@ public class OpenMetadataOperations implements Callable {
promptUserForDelete();
parseConfig();
LOG.info("Deleting all the OpenMetadata tables.");
- flyway.clean();
- LOG.info("Creating the OpenMetadata Schema.");
- flyway.migrate();
+ dropAllTables();
LOG.info("Running the Native Migrations.");
validateAndRunSystemDataMigrations(true);
LOG.info("OpenMetadata Database Schema is Updated.");
@@ -660,7 +606,6 @@ public class OpenMetadataOperations implements Callable {
try {
LOG.info("Migrating the OpenMetadata Schema.");
parseConfig();
- flyway.migrate();
validateAndRunSystemDataMigrations(force);
LOG.info("Update Search Indexes.");
searchRepository.updateIndexes();
@@ -1725,30 +1670,6 @@ public class OpenMetadataOperations implements Callable {
dataSourceFactory.setPassword(token);
});
- String jdbcUrl = dataSourceFactory.getUrl();
- String user = dataSourceFactory.getUser();
- String password = dataSourceFactory.getPassword();
- assert user != null && password != null;
- String flywayRootPath = config.getMigrationConfiguration().getFlywayPath();
- String location =
- "filesystem:"
- + flywayRootPath
- + File.separator
- + config.getDataSourceFactory().getDriverClass();
- flyway =
- Flyway.configure()
- .encoding(StandardCharsets.UTF_8)
- .table("DATABASE_CHANGE_LOG")
- .sqlMigrationPrefix("v")
- .validateOnMigrate(false)
- .outOfOrder(false)
- .baselineOnMigrate(true)
- .baselineVersion(MigrationVersion.fromVersion("000"))
- .cleanOnValidationError(false)
- .locations(location)
- .dataSource(jdbcUrl, user, password)
- .cleanDisabled(false)
- .load();
nativeSQLScriptRootPath = config.getMigrationConfiguration().getNativePath();
extensionSQLScriptRootPath = config.getMigrationConfiguration().getExtensionPath();
@@ -1773,6 +1694,43 @@ public class OpenMetadataOperations implements Callable {
DatasourceConfig.initialize(connType.label);
}
+ // This was before handled via flyway's clean command.
+ private void dropAllTables() {
+ try (Handle handle = jdbi.open()) {
+ ConnectionType connType = ConnectionType.from(config.getDataSourceFactory().getDriverClass());
+ if (connType == ConnectionType.MYSQL) {
+ handle.execute("SET FOREIGN_KEY_CHECKS = 0");
+ handle
+ .createQuery("SHOW TABLES")
+ .mapTo(String.class)
+ .list()
+ .forEach(
+ tableName -> {
+ try {
+ handle.execute("DROP TABLE IF EXISTS " + tableName);
+ } catch (Exception e) {
+ LOG.warn("Failed to drop table: " + tableName, e);
+ }
+ });
+ handle.execute("SET FOREIGN_KEY_CHECKS = 1");
+ } else if (connType == ConnectionType.POSTGRES) {
+ handle
+ .createQuery(
+ "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
+ .mapTo(String.class)
+ .list()
+ .forEach(
+ tableName -> {
+ try {
+ handle.execute("DROP TABLE IF EXISTS \"" + tableName + "\" CASCADE");
+ } catch (Exception e) {
+ LOG.warn("Failed to drop table: " + tableName, e);
+ }
+ });
+ }
+ }
+ }
+
private void promptUserForDelete() {
LOG.info(
"""
@@ -1797,7 +1755,13 @@ public class OpenMetadataOperations implements Callable {
DatasourceConfig.initialize(connType.label);
MigrationWorkflow workflow =
new MigrationWorkflow(
- jdbi, nativeSQLScriptRootPath, connType, extensionSQLScriptRootPath, config, force);
+ jdbi,
+ nativeSQLScriptRootPath,
+ connType,
+ extensionSQLScriptRootPath,
+ config.getMigrationConfiguration().getFlywayPath(),
+ config,
+ force);
workflow.loadMigrations();
workflow.printMigrationInfo();
workflow.runMigrationWorkflows(true);
diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/JwtAuthOpenMetadataApplicationTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/JwtAuthOpenMetadataApplicationTest.java
index ceadaae1996..39f88f64cce 100644
--- a/openmetadata-service/src/test/java/org/openmetadata/service/JwtAuthOpenMetadataApplicationTest.java
+++ b/openmetadata-service/src/test/java/org/openmetadata/service/JwtAuthOpenMetadataApplicationTest.java
@@ -273,6 +273,7 @@ public abstract class JwtAuthOpenMetadataApplicationTest {
nativeMigrationSQLPath,
connType,
extensionSQLScriptRootPath,
+ "", // flywayPath - empty string as placeholder
config,
forceMigrations);
// Initialize search repository
diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java
index c39a7ceb659..18f4e44f055 100644
--- a/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java
+++ b/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java
@@ -45,7 +45,6 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.eclipse.jetty.client.HttpClient;
-import org.flywaydb.core.Flyway;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.jetty.connector.JettyClientProperties;
@@ -176,9 +175,6 @@ public abstract class OpenMetadataApplicationTest {
dataSourceFactory.setDriverClass(sqlContainer.getDriverClassName());
config.setDataSourceFactory(dataSourceFactory);
- final String flyWayMigrationScriptsLocation =
- ResourceHelpers.resourceFilePath(
- "db/sql/migrations/flyway/" + sqlContainer.getDriverClassName());
final String nativeMigrationScriptsLocation =
ResourceHelpers.resourceFilePath("db/sql/migrations/native/");
@@ -193,17 +189,6 @@ public abstract class OpenMetadataApplicationTest {
} catch (Exception ex) {
LOG.info("Extension migrations not found");
}
- Flyway flyway =
- Flyway.configure()
- .dataSource(
- sqlContainer.getJdbcUrl(), sqlContainer.getUsername(), sqlContainer.getPassword())
- .table("DATABASE_CHANGE_LOG")
- .locations("filesystem:" + flyWayMigrationScriptsLocation)
- .sqlMigrationPrefix("v")
- .cleanDisabled(false)
- .load();
- flyway.clean();
- flyway.migrate();
ELASTIC_SEARCH_CONTAINER = new ElasticsearchContainer(elasticSearchContainerImage);
ELASTIC_SEARCH_CONTAINER.withPassword("password");
@@ -227,8 +212,6 @@ public abstract class OpenMetadataApplicationTest {
IndexMappingLoader.init(getEsConfig());
// Migration overrides
- configOverrides.add(
- ConfigOverride.config("migrationConfiguration.flywayPath", flyWayMigrationScriptsLocation));
configOverrides.add(
ConfigOverride.config("migrationConfiguration.nativePath", nativeMigrationScriptsLocation));
@@ -274,6 +257,7 @@ public abstract class OpenMetadataApplicationTest {
nativeMigrationSQLPath,
connType,
extensionSQLScriptRootPath,
+ config.getMigrationConfiguration().getFlywayPath(),
config,
forceMigrations);
// Initialize search repository
diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/FlywayMigrationIntegrationTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/FlywayMigrationIntegrationTest.java
new file mode 100644
index 00000000000..bd68666ddde
--- /dev/null
+++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/FlywayMigrationIntegrationTest.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2021 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openmetadata.service.migration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import org.jdbi.v3.core.Handle;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.openmetadata.service.OpenMetadataApplicationTest;
+import org.openmetadata.service.jdbi3.MigrationDAO;
+import org.openmetadata.service.jdbi3.locator.ConnectionType;
+import org.openmetadata.service.migration.utils.v1110.MigrationUtil;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class FlywayMigrationIntegrationTest extends OpenMetadataApplicationTest {
+
+ private static MigrationDAO migrationDAO;
+ private static ConnectionType connectionType;
+ private static MigrationUtil migrationUtil;
+
+ @BeforeAll
+ public static void setup() {
+ migrationDAO = jdbi.onDemand(MigrationDAO.class);
+ connectionType =
+ ConnectionType.from(APP.getConfiguration().getDataSourceFactory().getDriverClass());
+ migrationUtil = new MigrationUtil(connectionType, null);
+ }
+
+ @Test
+ public void testFlywayMigrationVersionsQuery() {
+ List flywayVersions = migrationDAO.getFlywayMigrationVersions();
+ assertNotNull(flywayVersions, "Flyway migration versions list should not be null");
+
+ // Verify that versions are ordered correctly
+ if (flywayVersions.size() > 1) {
+ for (int i = 1; i < flywayVersions.size(); i++) {
+ String previous = flywayVersions.get(i - 1);
+ String current = flywayVersions.get(i);
+ assertTrue(
+ previous.compareTo(current) <= 0,
+ String.format(
+ "Flyway versions should be ordered: %s should be <= %s", previous, current));
+ }
+ }
+ }
+
+ @Test
+ public void testServerChangeLogFlywayRecords() {
+ List flywayRecords = migrationDAO.getFlywayMigrationRecords();
+
+ for (MigrationDAO.ServerChangeLog record : flywayRecords) {
+ // Verify that the record has the expected Flyway characteristics
+ assertNotNull(record.getVersion(), "Version should not be null");
+ assertNotNull(record.getMigrationFileName(), "Migration file name should not be null");
+ assertTrue(
+ record.getMigrationFileName().contains("flyway"),
+ "Migration file name should contain 'flyway'");
+
+ // Verify version format for Flyway migrations (should be 0.0.X format)
+ if (record.getMigrationFileName().contains("flyway")) {
+ assertTrue(
+ record.getVersion().matches("^0\\.0\\.\\d+$"),
+ String.format(
+ "Flyway version should match 0.0.X format, but was: %s", record.getVersion()));
+ }
+
+ // Verify that the file name includes the full path for proper identification
+ assertTrue(
+ record.getMigrationFileName().contains("bootstrap/sql/migrations/flyway")
+ || record.getMigrationFileName().contains("v0")
+ || record.getMigrationFileName().endsWith(".sql"),
+ "Migration file name should contain proper path or be a valid SQL file");
+ }
+ }
+
+ @Test
+ public void testMigrationDAOFlywayVersionsConsistency() {
+ List flywayVersions = migrationDAO.getFlywayMigrationVersions();
+ List flywayRecords = migrationDAO.getFlywayMigrationRecords();
+
+ assertEquals(
+ flywayRecords.size(),
+ flywayVersions.size(),
+ "Flyway versions and records should have same count");
+
+ // Verify that each version from getFlywayMigrationVersions has a corresponding record
+ for (int i = 0; i < flywayVersions.size(); i++) {
+ String version = flywayVersions.get(i);
+ MigrationDAO.ServerChangeLog record = flywayRecords.get(i);
+ assertEquals(version, record.getVersion(), "Version should match between methods");
+ }
+ }
+
+ @Test
+ public void testFlywayMigrationFilePathFormat() {
+ List flywayRecords = migrationDAO.getFlywayMigrationRecords();
+
+ for (MigrationDAO.ServerChangeLog record : flywayRecords) {
+ String filePath = record.getMigrationFileName();
+
+ assertFalse(
+ filePath.endsWith("org.postgresql.Driver")
+ || filePath.endsWith("com.mysql.cj.jdbc.Driver"),
+ String.format("File path should not end with driver name: %s", filePath));
+
+ // Should contain proper Flyway path structure
+ assertTrue(
+ filePath.contains("flyway") || filePath.startsWith("v0"),
+ String.format("File path should contain 'flyway' or be a versioned file: %s", filePath));
+ }
+ }
+
+ @Test
+ public void testMigrationUtilFlywayDataCheck() {
+ try (Handle handle = jdbi.open()) {
+ boolean flywayTableExists =
+ migrationUtil.checkTableExists(handle, MigrationUtil.FLYWAY_TABLE_NAME);
+ assertFalse(flywayTableExists, "Flyway schema history table should not exist for new runs");
+
+ boolean serverChangeLogExists = migrationUtil.checkTableExists(handle, "SERVER_CHANGE_LOG");
+ assertTrue(serverChangeLogExists, "SERVER_CHANGE_LOG table should exist after migrations");
+ }
+ }
+
+ @Test
+ public void testVersionFormatValidation() {
+ List flywayVersions = migrationDAO.getFlywayMigrationVersions();
+
+ for (String version : flywayVersions) {
+ assertTrue(
+ version.matches("^\\d+\\.\\d+\\.\\d+.*$"),
+ String.format("Version should follow semantic versioning format: %s", version));
+
+ if (version.startsWith("0.0.")) {
+ String[] parts = version.split("\\.");
+ assertTrue(
+ parts.length >= 3,
+ String.format("0.0.X version should have at least 3 parts: %s", version));
+
+ try {
+ Integer.parseInt(parts[2]);
+ } catch (NumberFormatException e) {
+ assertTrue(
+ parts[2].matches("\\d+.*"),
+ String.format("Third version part should start with a number: %s", version));
+ }
+ }
+ }
+ }
+}
diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/util/MigrationWorkflowTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/util/MigrationWorkflowTest.java
index cbc68ea387a..5fb6ae56763 100644
--- a/openmetadata-service/src/test/java/org/openmetadata/service/util/MigrationWorkflowTest.java
+++ b/openmetadata-service/src/test/java/org/openmetadata/service/util/MigrationWorkflowTest.java
@@ -28,7 +28,13 @@ public class MigrationWorkflowTest extends OpenMetadataApplicationTest {
migrationWorkflow =
spy(
new MigrationWorkflow(
- jdbi, "nativePath", ConnectionType.MYSQL, "extensionPath", null, false));
+ jdbi,
+ "nativePath",
+ ConnectionType.MYSQL,
+ "extensionPath",
+ "flywayPath",
+ null,
+ false));
omMigrationList =
List.of(
@@ -83,7 +89,8 @@ public class MigrationWorkflowTest extends OpenMetadataApplicationTest {
"nativePath",
ConnectionType.MYSQL,
migrationWorkflow.getOpenMetadataApplicationConfig(),
- "extensionPath");
+ "extensionPath",
+ "flywayPath");
assertEquals(
List.of("1.1.0", "1.1.0-collate", "1.2.0", "1.2.1", "1.2.2-collate"),
diff --git a/pom.xml b/pom.xml
index f2d81742ad5..3e5836c08c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,7 +101,6 @@
3.37.1
1.9.0
2.17.0
- 9.22.3
2.1.0.30
2.11.0
9.3.0
@@ -161,6 +160,7 @@
5.0.0-M1
0.11.2
6.7.1.RELEASE
+ 9.22.3
2.0.4
@@ -359,17 +359,6 @@
commons-io
${commons-io.version}