MINOR - Remove flyway (#23179)

* test

* test

* format

* pom

* pom

* format

* Handle migration

* Handle migration

* Handle migration

* fix merge

* bump main

* bump main

* undo unnecessary changes

* simplify checksum for migration

* format

* FIX
This commit is contained in:
Pere Miquel Brull 2025-10-28 04:41:03 +01:00 committed by GitHub
parent a846d3ad84
commit b3a590fe22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 890 additions and 248 deletions

View File

@ -0,0 +1,163 @@
# OpenMetadata Migration System
This document describes the migration system architecture and execution order for OpenMetadata database schema and data migrations.
## Migration System Overview
OpenMetadata uses a hybrid migration system that combines:
1. **Legacy Flyway migrations** (being phased out)
2. **Native OpenMetadata migrations** (current system)
3. **Extension migrations** (for custom/plugin functionality)
## Migration Execution Order
The migration system executes in a specific order to ensure database consistency:
```
1. Flyway Migrations (Legacy)
├── v000__create_server_change_log.sql (Creates migration tracking tables)
├── v001__*.sql
├── v002__*.sql
└── ...
2. Native OpenMetadata Migrations
├── 1.1.0/
├── 1.1.1/
├── 1.2.0/
└── ...
3. Extension Migrations
├── custom-extension-1.0.0/
└── ...
```
## Migration Tracking Tables
### SERVER_CHANGE_LOG
Primary table for tracking all migration executions:
- `installed_rank`: Auto-increment sequence number
- `version`: Migration version identifier (PRIMARY KEY)
- `migrationFileName`: Path to the migration file
- `checksum`: Hash of migration content for integrity validation
- `installed_on`: Timestamp of migration execution
- `metrics`: JSON/JSONB field for migration execution metrics
### SERVER_MIGRATION_SQL_LOGS
Detailed SQL execution logs:
- `version`: Migration version identifier
- `sqlStatement`: Individual SQL statement executed
- `checksum`: Hash of the SQL statement (PRIMARY KEY)
- `executedAt`: Timestamp of SQL execution
## Migration Logic
The migration workflow follows this decision tree:
```
IF native migrations are already executed:
└── Skip all Flyway migrations (they've already run)
└── Execute remaining native migrations
└── Execute extension migrations
ELSE IF no native migrations executed:
├── Execute Flyway migrations (creates SERVER_CHANGE_LOG tables)
├── Execute native migrations
└── Execute extension migrations
```
## File Structure
```
bootstrap/sql/migrations/
├── flyway/
│ ├── com.mysql.cj.jdbc.Driver/ # MySQL-specific Flyway migrations
│ │ ├── v000__create_server_change_log.sql
│ │ ├── v001__*.sql
│ │ └── ...
│ └── org.postgresql.Driver/ # PostgreSQL-specific Flyway migrations
│ ├── v000__create_server_change_log.sql
│ ├── v001__*.sql
│ └── ...
├── native/
│ ├── 1.1.0/
│ │ ├── mysql/schemaChanges.sql
│ │ └── postgres/schemaChanges.sql
│ ├── 1.1.1/
│ └── ...
└── extensions/ # Custom extension migrations
└── [extension-name]/
├── mysql/
└── postgres/
```
## Migration Implementation Classes
- `MigrationWorkflow`: Orchestrates the entire migration process
- `FlywayMigrationFile`: Adapter for legacy Flyway migrations
- `MigrationFile`: Handler for native OpenMetadata migrations
- `MigrationProcess`: Executes individual migration steps
## SQL Statement Parsing
**Important**: While OpenMetadata has removed Flyway as the migration framework, we still use **Flyway's SQL parsers** for reliable statement splitting:
- **MySQL**: Uses `org.flywaydb.database.mysql.MySQLParser`
- **PostgreSQL**: Uses `org.flywaydb.database.postgresql.PostgreSQLParser`
This ensures proper handling of:
- Complex SQL statements with string literals containing semicolons
- Comments (both `--` and `/* */` style)
- Escaped characters and quotes
- Database-specific SQL syntax
The parsers split SQL files into individual statements via `SqlStatementIterator`, which is far more reliable than simple string splitting.
**Dependencies**: Requires `flyway-core` and `flyway-mysql` for SQL parsing only (not migration management).
## Key Design Decisions
1. **Hybrid Approach**: Custom migration management + Flyway SQL parsing for reliability
2. **Backward Compatibility**: Flyway migrations continue to work during transition period
3. **Single Source of Truth**: All migrations are tracked in `SERVER_CHANGE_LOG` regardless of type
4. **Database Agnostic**: Separate migration files for MySQL and PostgreSQL
5. **Execution Order**: Flyway → Native → Extensions ensures proper dependency resolution
6. **Migration Tracking**: v000 Flyway migration creates the tracking infrastructure before any other migrations
## Troubleshooting
### Common Issues
1. **Missing SERVER_CHANGE_LOG table**:
- Ensure v000 Flyway migration has executed
- Check database permissions
2. **Migration version conflicts**:
- Verify no duplicate version numbers across migration types
- Check migration file naming conventions
3. **Database-specific failures**:
- Ensure correct SQL syntax for target database (MySQL vs PostgreSQL)
- Validate database-specific features (JSON vs JSONB, AUTO_INCREMENT vs SERIAL)
### Migration Recovery
If migrations fail:
1. Check `SERVER_CHANGE_LOG` table for last successful migration
2. Review `SERVER_MIGRATION_SQL_LOGS` for failed SQL statements
3. Fix underlying issues and restart migration process
4. Use `--force` flag only if absolutely necessary
## Configuration
Migration paths are configured in `MigrationConfiguration`:
- `nativePath`: Path to native OpenMetadata migrations
- `flywayPath`: Path to legacy Flyway migrations
- `extensionPath`: Path to extension migrations
Example:
```yaml
migrationConfiguration:
nativePath: "bootstrap/sql/migrations/native"
flywayPath: "bootstrap/sql/migrations/flyway"
extensionPath: "bootstrap/sql/migrations/extensions"
```

View File

@ -0,0 +1,20 @@
-- Create tables for tracking server migrations
-- This migration runs before all other migrations to ensure migration tracking tables exist
CREATE TABLE IF NOT EXISTS SERVER_CHANGE_LOG (
installed_rank BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
version VARCHAR(256) NOT NULL,
migrationFileName VARCHAR(256) NOT NULL,
checksum VARCHAR(256) NOT NULL,
installed_on TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP,
metrics JSON,
PRIMARY KEY (version),
UNIQUE KEY installed_rank (installed_rank)
);
CREATE TABLE IF NOT EXISTS SERVER_MIGRATION_SQL_LOGS (
version VARCHAR(256) NOT NULL,
sqlStatement VARCHAR(10000) NOT NULL,
checksum VARCHAR(256) PRIMARY KEY,
executedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

View File

@ -1 +1,2 @@
ALTER TABLE SERVER_CHANGE_LOG ADD COLUMN metrics JSON; -- This column is already created in v000__create_server_change_log.sql
-- ALTER TABLE SERVER_CHANGE_LOG ADD COLUMN metrics JSON;

View File

@ -0,0 +1,18 @@
-- Create tables for tracking server migrations
-- This migration runs before all other migrations to ensure migration tracking tables exist
CREATE TABLE IF NOT EXISTS SERVER_CHANGE_LOG (
installed_rank SERIAL,
version VARCHAR(256) PRIMARY KEY,
migrationFileName VARCHAR(256) NOT NULL,
checksum VARCHAR(256) NOT NULL,
installed_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metrics JSONB
);
CREATE TABLE IF NOT EXISTS SERVER_MIGRATION_SQL_LOGS (
version VARCHAR(256) NOT NULL,
sqlStatement VARCHAR(10000) NOT NULL,
checksum VARCHAR(256) PRIMARY KEY,
executedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

View File

@ -1 +1,2 @@
ALTER TABLE SERVER_CHANGE_LOG ADD COLUMN metrics jsonb; -- This column is already created in v000__create_server_change_log.sql
-- ALTER TABLE SERVER_CHANGE_LOG ADD COLUMN metrics jsonb;

View File

@ -375,14 +375,6 @@
<groupId>commons-cli</groupId> <groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId> <artifactId>commons-cli</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-maven-plugin</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-mysql</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.github.classgraph</groupId> <groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId> <artifactId>classgraph</artifactId>
@ -498,6 +490,16 @@
<scope>import</scope> <scope>import</scope>
</dependency> </dependency>
<!-- Flyway dependencies - only for SQL parsing, not migration management -->
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-mysql</artifactId>
</dependency>
<!-- Micrometer Core --> <!-- Micrometer Core -->
<dependency> <dependency>
<groupId>io.micrometer</groupId> <groupId>io.micrometer</groupId>

View File

@ -54,7 +54,6 @@ import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import javax.naming.ConfigurationException; import javax.naming.ConfigurationException;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -103,7 +102,6 @@ import org.openmetadata.service.jobs.JobDAO;
import org.openmetadata.service.jobs.JobHandlerRegistry; import org.openmetadata.service.jobs.JobHandlerRegistry;
import org.openmetadata.service.limits.DefaultLimits; import org.openmetadata.service.limits.DefaultLimits;
import org.openmetadata.service.limits.Limits; import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.migration.Migration;
import org.openmetadata.service.migration.MigrationValidationClient; import org.openmetadata.service.migration.MigrationValidationClient;
import org.openmetadata.service.migration.api.MigrationWorkflow; import org.openmetadata.service.migration.api.MigrationWorkflow;
import org.openmetadata.service.monitoring.EventMonitor; import org.openmetadata.service.monitoring.EventMonitor;
@ -271,7 +269,7 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
.buildValidatorFactory() .buildValidatorFactory()
.getValidator()); .getValidator());
// Validate flyway Migrations // Validate native migrations
validateMigrations(jdbi, catalogConfig); validateMigrations(jdbi, catalogConfig);
// Register Authorizer // Register Authorizer
@ -579,21 +577,6 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
private void validateMigrations(Jdbi jdbi, OpenMetadataApplicationConfig conf) private void validateMigrations(Jdbi jdbi, OpenMetadataApplicationConfig conf)
throws IOException { throws IOException {
LOG.info("Validating Flyway migrations");
Optional<String> lastMigrated = Migration.lastMigrated(jdbi);
String maxMigration = Migration.lastMigrationFile(conf.getMigrationConfiguration());
if (lastMigrated.isEmpty()) {
throw new IllegalStateException(
"Could not validate Flyway migrations in the database. Make sure you have run `./bootstrap/openmetadata-ops.sh migrate` at least once.");
}
if (lastMigrated.get().compareTo(maxMigration) < 0) {
throw new IllegalStateException(
"There are pending migrations to be run on the database."
+ " Please backup your data and run `./bootstrap/openmetadata-ops.sh migrate`."
+ " You can find more information on upgrading OpenMetadata at"
+ " https://docs.open-metadata.org/deployment/upgrade ");
}
LOG.info("Validating native migrations"); LOG.info("Validating native migrations");
ConnectionType connectionType = ConnectionType connectionType =
ConnectionType.from(conf.getDataSourceFactory().getDriverClass()); ConnectionType.from(conf.getDataSourceFactory().getDriverClass());
@ -603,6 +586,7 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
conf.getMigrationConfiguration().getNativePath(), conf.getMigrationConfiguration().getNativePath(),
connectionType, connectionType,
conf.getMigrationConfiguration().getExtensionPath(), conf.getMigrationConfiguration().getExtensionPath(),
conf.getMigrationConfiguration().getFlywayPath(),
conf, conf,
false); false);
migrationWorkflow.loadMigrations(); migrationWorkflow.loadMigrations();

View File

@ -61,7 +61,7 @@ public interface MigrationDAO {
connectionType = MYSQL) connectionType = MYSQL)
@ConnectionAwareSqlUpdate( @ConnectionAwareSqlUpdate(
value = value =
"INSERT INTO server_change_log (version, migrationFileName, checksum, metrics, installed_on)" "INSERT INTO SERVER_CHANGE_LOG (version, migrationFileName, checksum, metrics, installed_on)"
+ "VALUES (:version, :migrationFileName, :checksum, (:metrics :: jsonb), current_timestamp) " + "VALUES (:version, :migrationFileName, :checksum, (:metrics :: jsonb), current_timestamp) "
+ "ON CONFLICT (version) DO UPDATE SET " + "ON CONFLICT (version) DO UPDATE SET "
+ "migrationFileName = EXCLUDED.migrationFileName, " + "migrationFileName = EXCLUDED.migrationFileName, "
@ -123,6 +123,15 @@ public interface MigrationDAO {
@SqlQuery("SELECT version FROM SERVER_CHANGE_LOG") @SqlQuery("SELECT version FROM SERVER_CHANGE_LOG")
List<String> getMigrationVersions(); List<String> getMigrationVersions();
@SqlQuery(
"SELECT version FROM SERVER_CHANGE_LOG WHERE migrationFileName LIKE '%/migrations/flyway/%' ORDER BY version")
List<String> getFlywayMigrationVersions();
@SqlQuery(
"SELECT installed_rank, version, migrationFileName, checksum, installed_on, metrics FROM SERVER_CHANGE_LOG WHERE migrationFileName LIKE '%/migrations/flyway/%' ORDER BY version")
@RegisterRowMapper(FromServerChangeLogMapper.class)
List<ServerChangeLog> getFlywayMigrationRecords();
@Getter @Getter
@Setter @Setter
class ServerMigrationSQLTable { class ServerMigrationSQLTable {

View File

@ -1,60 +0,0 @@
package org.openmetadata.service.migration;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.statement.StatementException;
import org.openmetadata.service.jdbi3.MigrationDAO;
@Slf4j
public final class Migration {
private Migration() {}
/**
* Run a query to MySQL to retrieve the last migrated Flyway version. If the Flyway table DATABASE_CHANGE_LOG does not
* exist, we will stop the Catalog App and inform users how to run Flyway.
*/
public static Optional<String> lastMigrated(Jdbi jdbi) {
try {
return jdbi.withExtension(MigrationDAO.class, MigrationDAO::getMaxVersion);
} catch (StatementException e) {
throw new IllegalArgumentException(
"Exception encountered when trying to obtain last migrated Flyway version."
+ " Make sure you have run `./bootstrap/openmetadata-ops.sh migrate` at least once.",
e);
}
}
public static String lastMigrationFile(MigrationConfiguration conf) throws IOException {
List<String> migrationFiles = getMigrationVersions(conf);
return Collections.max(migrationFiles);
}
/** Read the migrations path from the Catalog YAML config and return a list of all the files' versions. */
private static List<String> getMigrationVersions(MigrationConfiguration conf) throws IOException {
try (Stream<String> names =
Files.walk(Paths.get(conf.getFlywayPath()))
.filter(Files::isRegularFile)
.map(Path::toFile)
.map(File::getName)
.map(Migration::cleanName)) {
return names.collect(Collectors.toList());
}
}
/** Given a Flyway migration filename, e.g., v001__my_file.sql, return the version information "001". */
private static String cleanName(String name) {
return Arrays.asList(name.split("_")).get(0).replace("v", "");
}
}

View File

@ -5,7 +5,7 @@ import lombok.Getter;
import lombok.Setter; import lombok.Setter;
public class MigrationConfiguration { public class MigrationConfiguration {
@NotEmpty @Getter @Setter private String flywayPath;
@NotEmpty @Getter @Setter private String nativePath; @NotEmpty @Getter @Setter private String nativePath;
@NotEmpty @Getter @Setter private String extensionPath; @NotEmpty @Getter @Setter private String extensionPath;
@NotEmpty @Getter @Setter private String flywayPath;
} }

View File

@ -45,16 +45,24 @@ public class MigrationValidationClient {
List<String> availableOMNativeMigrations = getMigrationFilesFromPath(nativePath); List<String> availableOMNativeMigrations = getMigrationFilesFromPath(nativePath);
// If we only have OM migrations, return them // Get Flyway versions from server_change_log (they have metrics = NULL)
List<String> expectedFlywayVersions = getExpectedFlywayVersions();
// If we only have OM and Flyway migrations, return them
if (extensionPath == null || extensionPath.isEmpty()) { if (extensionPath == null || extensionPath.isEmpty()) {
return availableOMNativeMigrations; return Stream.concat(expectedFlywayVersions.stream(), availableOMNativeMigrations.stream())
.sorted()
.toList();
} }
// Otherwise, fetch the extension migration and sort the results // Otherwise, fetch the extension migration and sort all results
List<String> availableOMExtensionMigrations = getMigrationFilesFromPath(extensionPath); List<String> availableOMExtensionMigrations = getMigrationFilesFromPath(extensionPath);
return Stream.concat( return Stream.of(
availableOMNativeMigrations.stream(), availableOMExtensionMigrations.stream()) expectedFlywayVersions.stream(),
availableOMNativeMigrations.stream(),
availableOMExtensionMigrations.stream())
.flatMap(s -> s)
.sorted() .sorted()
.toList(); .toList();
} catch (Exception e) { } catch (Exception e) {
@ -69,4 +77,15 @@ public class MigrationValidationClient {
.sorted() .sorted()
.toList(); .toList();
} }
private List<String> getExpectedFlywayVersions() {
try {
// Query server_change_log for versions where migrationFileName contains 'flyway'
return migrationDAO.getFlywayMigrationVersions();
} catch (Exception e) {
// If there's an error (e.g., table doesn't exist yet), return empty list
LOG.debug("Could not fetch Flyway versions from SERVER_CHANGE_LOG: {}", e.getMessage());
return List.of();
}
}
} }

View File

@ -8,10 +8,9 @@ import org.openmetadata.service.migration.QueryStatus;
import org.openmetadata.service.migration.context.MigrationOps; import org.openmetadata.service.migration.context.MigrationOps;
/** /**
* Migration framework interface that supports three implementation approaches: * Migration framework interface that supports two implementation approaches:
* 1. Flyway (deprecated, do not add new migrations here) * 1. Native SQL migrations
* 2. Native SQL migrations * 2. Java-based migrations
* 3. Java-based migrations
* *
* <p><strong>Migration Execution Order:</strong> * <p><strong>Migration Execution Order:</strong>
* Migrations are executed in a specific sequence that must be maintained: * Migrations are executed in a specific sequence that must be maintained:

View File

@ -118,10 +118,22 @@ public class MigrationProcessImpl implements MigrationProcess {
if (!nullOrEmpty(queryList)) { if (!nullOrEmpty(queryList)) {
for (String sql : queryList) { for (String sql : queryList) {
try { try {
String previouslyRanSql = migrationDAO.getSqlQuery(hash(sql), version); String previouslyRanSql = null;
try {
previouslyRanSql = migrationDAO.getSqlQuery(hash(sql), version);
} catch (Exception dbException) {
// If SERVER_MIGRATION_SQL_LOGS table doesn't exist yet, assume query hasn't run
previouslyRanSql = null;
}
if ((previouslyRanSql == null || previouslyRanSql.isEmpty())) { if ((previouslyRanSql == null || previouslyRanSql.isEmpty())) {
handle.execute(sql); handle.execute(sql);
migrationDAO.upsertServerMigrationSQL(version, sql, hash(sql)); try {
migrationDAO.upsertServerMigrationSQL(version, sql, hash(sql));
} catch (Exception logException) {
// If logging fails (table doesn't exist yet), continue - the SQL was executed
// successfully
}
} }
queryStatusMap.put( queryStatusMap.put(
sql, new QueryStatus(QueryStatus.Status.SUCCESS, "Successfully Executed Query")); sql, new QueryStatus(QueryStatus.Status.SUCCESS, "Successfully Executed Query"));

View File

@ -23,6 +23,7 @@ import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.QueryStatus; import org.openmetadata.service.migration.QueryStatus;
import org.openmetadata.service.migration.context.MigrationContext; import org.openmetadata.service.migration.context.MigrationContext;
import org.openmetadata.service.migration.context.MigrationWorkflowContext; import org.openmetadata.service.migration.context.MigrationWorkflowContext;
import org.openmetadata.service.migration.utils.FlywayMigrationFile;
import org.openmetadata.service.migration.utils.MigrationFile; import org.openmetadata.service.migration.utils.MigrationFile;
import org.openmetadata.service.util.AsciiTable; import org.openmetadata.service.util.AsciiTable;
@ -35,6 +36,7 @@ public class MigrationWorkflow {
private final String nativeSQLScriptRootPath; private final String nativeSQLScriptRootPath;
private final ConnectionType connectionType; private final ConnectionType connectionType;
private final String extensionSQLScriptRootPath; private final String extensionSQLScriptRootPath;
private final String flywayPath;
@Getter private final OpenMetadataApplicationConfig openMetadataApplicationConfig; @Getter private final OpenMetadataApplicationConfig openMetadataApplicationConfig;
private final MigrationDAO migrationDAO; private final MigrationDAO migrationDAO;
private final Jdbi jdbi; private final Jdbi jdbi;
@ -47,6 +49,7 @@ public class MigrationWorkflow {
String nativeSQLScriptRootPath, String nativeSQLScriptRootPath,
ConnectionType connectionType, ConnectionType connectionType,
String extensionSQLScriptRootPath, String extensionSQLScriptRootPath,
String flywayPath,
OpenMetadataApplicationConfig config, OpenMetadataApplicationConfig config,
boolean forceMigrations) { boolean forceMigrations) {
this.jdbi = jdbi; this.jdbi = jdbi;
@ -55,6 +58,7 @@ public class MigrationWorkflow {
this.nativeSQLScriptRootPath = nativeSQLScriptRootPath; this.nativeSQLScriptRootPath = nativeSQLScriptRootPath;
this.connectionType = connectionType; this.connectionType = connectionType;
this.extensionSQLScriptRootPath = extensionSQLScriptRootPath; this.extensionSQLScriptRootPath = extensionSQLScriptRootPath;
this.flywayPath = flywayPath;
this.openMetadataApplicationConfig = config; this.openMetadataApplicationConfig = config;
} }
@ -65,7 +69,8 @@ public class MigrationWorkflow {
nativeSQLScriptRootPath, nativeSQLScriptRootPath,
connectionType, connectionType,
openMetadataApplicationConfig, openMetadataApplicationConfig,
extensionSQLScriptRootPath); extensionSQLScriptRootPath,
flywayPath);
// Filter Migrations to Be Run // Filter Migrations to Be Run
this.migrations = filterAndGetMigrationsToRun(availableMigrations); this.migrations = filterAndGetMigrationsToRun(availableMigrations);
} }
@ -84,27 +89,35 @@ public class MigrationWorkflow {
String nativeSQLScriptRootPath, String nativeSQLScriptRootPath,
ConnectionType connectionType, ConnectionType connectionType,
OpenMetadataApplicationConfig config, OpenMetadataApplicationConfig config,
String extensionSQLScriptRootPath) { String extensionSQLScriptRootPath,
String flywayPath) {
List<MigrationFile> availableOMNativeMigrations = List<MigrationFile> availableOMNativeMigrations =
getMigrationFilesFromPath(nativeSQLScriptRootPath, connectionType, config, false); getMigrationFilesFromPath(nativeSQLScriptRootPath, connectionType, config, false);
// If we only have OM migrations, return them // Get Flyway migrations first (they should run before native migrations)
if (extensionSQLScriptRootPath == null || extensionSQLScriptRootPath.isEmpty()) { List<FlywayMigrationFile> availableFlywayMigrations =
return availableOMNativeMigrations; FlywayMigrationFile.getFlywayMigrationFiles(
flywayPath, connectionType, config, migrationDAO);
// Get extension migrations if available
List<MigrationFile> availableExtensionMigrations = new ArrayList<>();
if (extensionSQLScriptRootPath != null && !extensionSQLScriptRootPath.isEmpty()) {
availableExtensionMigrations =
getMigrationFilesFromPath(extensionSQLScriptRootPath, connectionType, config, true);
} }
// Otherwise, fetch the extension migrations and sort the executions
List<MigrationFile> availableExtensionMigrations =
getMigrationFilesFromPath(extensionSQLScriptRootPath, connectionType, config, true);
/* /*
If we create migrations version as: Combined execution order:
- OpenMetadata: 1.1.0, 1.1.1, 1.2.0 1. Flyway migrations (legacy SQL files from Flyway)
- Extension: 1.1.0-extension, 1.2.0-extension 2. OpenMetadata native migrations
The end result will be 1.1.0, 1.1.0-extension, 1.1.1, 1.2.0, 1.2.0-extension 3. Extension migrations
All sorted by version within their respective groups
*/ */
return Stream.concat( return Stream.of(
availableOMNativeMigrations.stream(), availableExtensionMigrations.stream()) availableFlywayMigrations.stream().map(f -> (MigrationFile) f),
availableOMNativeMigrations.stream(),
availableExtensionMigrations.stream())
.flatMap(stream -> stream)
.sorted() .sorted()
.toList(); .toList();
} }
@ -123,7 +136,14 @@ public class MigrationWorkflow {
private List<MigrationProcess> filterAndGetMigrationsToRun( private List<MigrationProcess> filterAndGetMigrationsToRun(
List<MigrationFile> availableMigrations) { List<MigrationFile> availableMigrations) {
LOG.debug("Filtering Server Migrations"); LOG.debug("Filtering Server Migrations");
executedMigrations = migrationDAO.getMigrationVersions(); try {
executedMigrations = migrationDAO.getMigrationVersions();
} catch (Exception e) {
// SERVER_CHANGE_LOG table doesn't exist yet, run all migrations including Flyway
LOG.info(
"SERVER_CHANGE_LOG table doesn't exist yet, will run all migrations including Flyway");
executedMigrations = new ArrayList<>();
}
currentMaxMigrationVersion = currentMaxMigrationVersion =
executedMigrations.stream().max(MigrationWorkflow::compareVersions); executedMigrations.stream().max(MigrationWorkflow::compareVersions);
List<MigrationFile> applyMigrations; List<MigrationFile> applyMigrations;

View File

@ -1,7 +1,9 @@
package org.openmetadata.service.migration.mysql.v1110; package org.openmetadata.service.migration.mysql.v1110;
import java.util.Map; import java.util.Map;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.QueryStatus; import org.openmetadata.service.migration.QueryStatus;
import org.openmetadata.service.migration.api.MigrationProcessImpl; import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile; import org.openmetadata.service.migration.utils.MigrationFile;
@ -13,7 +15,7 @@ public class Migration extends MigrationProcessImpl {
public Migration(MigrationFile migrationFile) { public Migration(MigrationFile migrationFile) {
super(migrationFile); super(migrationFile);
this.migrationUtil = new MigrationUtil(migrationFile); this.migrationUtil = new MigrationUtil(ConnectionType.MYSQL, migrationFile);
} }
@Override @Override
@ -28,4 +30,10 @@ public class Migration extends MigrationProcessImpl {
isForceMigration)); isForceMigration));
return result; return result;
} }
@Override
@SneakyThrows
public void runDataMigration() {
this.migrationUtil.migrateFlywayHistory(handle);
}
} }

View File

@ -1,7 +1,9 @@
package org.openmetadata.service.migration.postgres.v1110; package org.openmetadata.service.migration.postgres.v1110;
import java.util.Map; import java.util.Map;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.QueryStatus; import org.openmetadata.service.migration.QueryStatus;
import org.openmetadata.service.migration.api.MigrationProcessImpl; import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile; import org.openmetadata.service.migration.utils.MigrationFile;
@ -13,7 +15,7 @@ public class Migration extends MigrationProcessImpl {
public Migration(MigrationFile migrationFile) { public Migration(MigrationFile migrationFile) {
super(migrationFile); super(migrationFile);
this.migrationUtil = new MigrationUtil(migrationFile); this.migrationUtil = new MigrationUtil(ConnectionType.POSTGRES, migrationFile);
} }
@Override @Override
@ -28,4 +30,10 @@ public class Migration extends MigrationProcessImpl {
isForceMigration)); isForceMigration));
return result; return result;
} }
@Override
@SneakyThrows
public void runDataMigration() {
this.migrationUtil.migrateFlywayHistory(handle);
}
} }

View File

@ -0,0 +1,163 @@
package org.openmetadata.service.migration.utils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.flywaydb.core.api.configuration.ClassicConfiguration;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.internal.database.postgresql.PostgreSQLParser;
import org.flywaydb.core.internal.parser.Parser;
import org.flywaydb.core.internal.parser.ParsingContext;
import org.flywaydb.core.internal.resource.filesystem.FileSystemResource;
import org.flywaydb.core.internal.sqlscript.SqlStatementIterator;
import org.flywaydb.database.mysql.MySQLParser;
import org.jetbrains.annotations.Nullable;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.util.EntityUtil;
public class FlywayMigrationFile extends MigrationFile {
private static final Pattern FLYWAY_FILE_PATTERN =
Pattern.compile("^[vV](\\d+(?:\\.\\d+)*)__.*\\.sql$");
private final File sqlFile;
public FlywayMigrationFile(
File sqlFile,
MigrationDAO migrationDAO,
ConnectionType connectionType,
OpenMetadataApplicationConfig config) {
super(createFlywayVersionDir(sqlFile), migrationDAO, connectionType, config, false);
this.sqlFile = sqlFile;
}
private static File createFlywayVersionDir(File sqlFile) {
// Create a virtual directory with semantic version name that doesn't need to exist
String flywayVersion = extractVersionFromFilename(sqlFile.getName());
String semanticVersion = String.format("0.0.%d", Integer.parseInt(flywayVersion));
// Return a virtual directory - it doesn't need to exist since we override file methods
return new File(sqlFile.getParent(), semanticVersion);
}
private static String extractVersionFromFilename(String filename) {
Matcher matcher = FLYWAY_FILE_PATTERN.matcher(filename);
if (matcher.matches()) {
return matcher.group(1);
}
throw new IllegalArgumentException("Invalid Flyway migration filename: " + filename);
}
@Override
public void parseSQLFiles() {
if (sqlFile.exists() && sqlFile.isFile()) {
try {
final ParsingContext parsingContext = new ParsingContext();
Configuration configuration = new ClassicConfiguration();
Parser parser = new PostgreSQLParser(configuration, parsingContext);
if (connectionType == ConnectionType.MYSQL) {
parser = new MySQLParser(configuration, parsingContext);
}
try (SqlStatementIterator sqlIterator =
parser.parse(
new FileSystemResource(
null, sqlFile.getAbsolutePath(), StandardCharsets.UTF_8, true))) {
while (sqlIterator.hasNext()) {
String sqlStatement = sqlIterator.next().getSql();
if (!checkIfQueryPreviouslyRan(sqlStatement)) {
schemaChanges.add(sqlStatement);
}
}
}
} catch (Exception e) {
throw new RuntimeException(
"Failed to parse Flyway migration file: " + sqlFile.getPath(), e);
}
}
}
@Override
public String getSchemaChangesFile() {
return sqlFile.getAbsolutePath();
}
@Override
public String getPostDDLScriptFile() {
// Flyway files don't have separate post DDL scripts
return "";
}
@Override
public String getMigrationsFilePath() {
return sqlFile.getAbsolutePath();
}
public static boolean isFlywayMigrationFile(File file) {
return file.isFile() && FLYWAY_FILE_PATTERN.matcher(file.getName()).matches();
}
public static List<FlywayMigrationFile> getFlywayMigrationFiles(
String flywayPath,
ConnectionType connectionType,
OpenMetadataApplicationConfig config,
MigrationDAO migrationDAO) {
List<FlywayMigrationFile> flywayMigrations = new ArrayList<>();
if (flywayPath == null || flywayPath.isEmpty()) {
return flywayMigrations;
}
File flywayDir = new File(flywayPath);
if (!flywayDir.exists() || !flywayDir.isDirectory()) {
return flywayMigrations;
}
// Get database-specific subdirectory using the actual directory names
File[] sqlFiles = getFiles(connectionType, flywayDir);
if (sqlFiles != null) {
Arrays.stream(sqlFiles)
.map(file -> new FlywayMigrationFile(file, migrationDAO, connectionType, config))
.sorted()
.forEach(flywayMigrations::add);
}
return flywayMigrations;
}
private static File @Nullable [] getFiles(ConnectionType connectionType, File flywayDir) {
String dbSubDir =
connectionType == ConnectionType.MYSQL
? "com.mysql.cj.jdbc.Driver"
: "org.postgresql.Driver";
File dbSpecificDir = new File(flywayDir, dbSubDir);
if (!dbSpecificDir.exists() || !dbSpecificDir.isDirectory()) {
// Try legacy naming convention
String legacyDbSubDir = connectionType == ConnectionType.MYSQL ? "mysql" : "postgresql";
dbSpecificDir = new File(flywayDir, legacyDbSubDir);
if (!dbSpecificDir.exists() || !dbSpecificDir.isDirectory()) {
// Try the root flyway directory
dbSpecificDir = flywayDir;
}
}
return dbSpecificDir.listFiles((dir1, name) -> FLYWAY_FILE_PATTERN.matcher(name).matches());
}
private boolean checkIfQueryPreviouslyRan(String query) {
try {
String checksum = EntityUtil.hash(query);
String sqlStatement = migrationDAO.checkIfQueryPreviouslyRan(checksum);
return sqlStatement != null;
} catch (Exception e) {
// If SERVER_MIGRATION_SQL_LOGS table doesn't exist yet, assume query hasn't run
return false;
}
}
}

View File

@ -28,9 +28,9 @@ public class MigrationFile implements Comparable<MigrationFile> {
public final Boolean isExtension; public final Boolean isExtension;
public final String dbPackageName; public final String dbPackageName;
private final MigrationDAO migrationDAO; protected final MigrationDAO migrationDAO;
private final List<String> schemaChanges; protected final List<String> schemaChanges;
private final List<String> postDDLScripts; protected final List<String> postDDLScripts;
public static final String DEFAULT_MIGRATION_PROCESS_CLASS = public static final String DEFAULT_MIGRATION_PROCESS_CLASS =
"org.openmetadata.service.migration.api.MigrationProcessImpl"; "org.openmetadata.service.migration.api.MigrationProcessImpl";
@ -70,6 +70,7 @@ public class MigrationFile implements Comparable<MigrationFile> {
if (connectionType == ConnectionType.MYSQL) { if (connectionType == ConnectionType.MYSQL) {
parser = new MySQLParser(configuration, parsingContext); parser = new MySQLParser(configuration, parsingContext);
} }
if (new File(getSchemaChangesFile()).isFile()) { if (new File(getSchemaChangesFile()).isFile()) {
try (SqlStatementIterator schemaChangesIterator = try (SqlStatementIterator schemaChangesIterator =
parser.parse( parser.parse(
@ -80,18 +81,24 @@ public class MigrationFile implements Comparable<MigrationFile> {
schemaChanges.add(sqlStatement); schemaChanges.add(sqlStatement);
} }
} }
} catch (Exception e) {
throw new RuntimeException(
"Failed to parse schema changes file: " + getSchemaChangesFile(), e);
} }
} }
if (new File(getPostDDLScriptFile()).isFile()) { if (new File(getPostDDLScriptFile()).isFile()) {
try (SqlStatementIterator schemaChangesIterator = try (SqlStatementIterator postDDLIterator =
parser.parse( parser.parse(
new FileSystemResource(null, getPostDDLScriptFile(), StandardCharsets.UTF_8, true))) { new FileSystemResource(null, getPostDDLScriptFile(), StandardCharsets.UTF_8, true))) {
while (schemaChangesIterator.hasNext()) { while (postDDLIterator.hasNext()) {
String sqlStatement = schemaChangesIterator.next().getSql(); String sqlStatement = postDDLIterator.next().getSql();
if (!checkIfQueryPreviouslyRan(sqlStatement)) { if (!checkIfQueryPreviouslyRan(sqlStatement)) {
postDDLScripts.add(sqlStatement); postDDLScripts.add(sqlStatement);
} }
} }
} catch (Exception e) {
throw new RuntimeException(
"Failed to parse post DDL script file: " + getPostDDLScriptFile(), e);
} }
} }
} }
@ -188,8 +195,13 @@ public class MigrationFile implements Comparable<MigrationFile> {
} }
private boolean checkIfQueryPreviouslyRan(String query) { private boolean checkIfQueryPreviouslyRan(String query) {
String checksum = EntityUtil.hash(query); try {
String sqlStatement = migrationDAO.checkIfQueryPreviouslyRan(checksum); String checksum = EntityUtil.hash(query);
return sqlStatement != null; String sqlStatement = migrationDAO.checkIfQueryPreviouslyRan(checksum);
return sqlStatement != null;
} catch (Exception e) {
// If SERVER_MIGRATION_SQL_LOGS table doesn't exist yet, assume query hasn't run
return false;
}
} }
} }

View File

@ -15,6 +15,7 @@ import org.openmetadata.schema.type.Recognizer;
import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.MigrationDAO; import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.QueryStatus; import org.openmetadata.service.migration.QueryStatus;
import org.openmetadata.service.migration.utils.MigrationFile; import org.openmetadata.service.migration.utils.MigrationFile;
@ -26,7 +27,11 @@ public class MigrationUtil {
"PII.NonSensitive", "data/tags/NonSensitive.json"); "PII.NonSensitive", "data/tags/NonSensitive.json");
private final MigrationFile migrationFile; private final MigrationFile migrationFile;
public MigrationUtil(MigrationFile migrationFile) { private final ConnectionType connectionType;
public static final String FLYWAY_TABLE_NAME = "DATABASE_CHANGE_LOG";
public MigrationUtil(ConnectionType connectionType, MigrationFile migrationFile) {
this.connectionType = connectionType;
this.migrationFile = migrationFile; this.migrationFile = migrationFile;
} }
@ -96,4 +101,132 @@ public class MigrationUtil {
} }
} }
} }
public void migrateFlywayHistory(Handle handle) {
try {
LOG.info("Starting v1100 migration of Flyway history to SERVER_CHANGE_LOG");
// Check if DATABASE_CHANGE_LOG table exists
boolean tableExists = checkTableExists(handle, "DATABASE_CHANGE_LOG");
if (!tableExists) {
LOG.info("Flyway DATABASE_CHANGE_LOG table does not exist, skipping migration");
return;
}
// Check if Flyway records have already been migrated
if (hasFlywayDataAlreadyMigrated(handle)) {
LOG.info(
"Flyway records have already been migrated to SERVER_CHANGE_LOG, skipping migration");
return;
}
// Insert missing v000 baseline record if not present
insertV000RecordIfMissing(handle);
// Migrate Flyway migration records to SERVER_CHANGE_LOG
int migratedCount = migrateFlywayHistoryRecords(handle);
if (migratedCount > 0) {
LOG.info(
"Successfully migrated {} Flyway migration records to SERVER_CHANGE_LOG",
migratedCount);
} else {
LOG.info("No new Flyway migration records to migrate");
}
} catch (Exception e) {
LOG.error("Error during Flyway history migration", e);
}
}
public boolean checkTableExists(Handle handle, String tableName) {
String query =
switch (connectionType) {
case MYSQL -> "SELECT COUNT(*) FROM information_schema.tables "
+ "WHERE table_schema = DATABASE() AND table_name = ?";
case POSTGRES -> "SELECT COUNT(*) FROM information_schema.tables "
+ "WHERE table_schema = current_schema() AND table_name = ?";
};
Integer count = handle.createQuery(query).bind(0, tableName).mapTo(Integer.class).one();
return count > 0;
}
public boolean hasFlywayDataAlreadyMigrated(Handle handle) {
String countQuery =
switch (connectionType) {
case MYSQL -> """
SELECT COUNT(*) FROM SERVER_CHANGE_LOG scl
INNER JOIN DATABASE_CHANGE_LOG dcl ON CONCAT('0.0.', CAST(dcl.version AS UNSIGNED)) = scl.version
WHERE scl.migrationfilename LIKE '%flyway%'
""";
case POSTGRES -> """
SELECT COUNT(*) FROM SERVER_CHANGE_LOG scl
INNER JOIN "DATABASE_CHANGE_LOG" dcl ON '0.0.' || CAST(dcl.version AS INTEGER) = scl.version
WHERE scl.migrationfilename LIKE '%flyway%'
""";
};
Integer count = handle.createQuery(countQuery).mapTo(Integer.class).one();
return count > 0;
}
private void insertV000RecordIfMissing(Handle handle) {
String insertQuery =
switch (connectionType) {
case MYSQL -> """
INSERT IGNORE INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
VALUES ('0.0.0', 'bootstrap/sql/migrations/flyway/mysql/v000__create_db_connection_info.sql', '0', NOW(), NULL)
""";
case POSTGRES -> """
INSERT INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
VALUES ('0.0.0', 'bootstrap/sql/migrations/flyway/postgres/v000__create_db_connection_info.sql', '0', current_timestamp, NULL)
ON CONFLICT (version) DO NOTHING
""";
};
int inserted = handle.createUpdate(insertQuery).execute();
if (inserted > 0) {
LOG.info("Inserted missing v0.0.0 baseline record");
}
}
private int migrateFlywayHistoryRecords(Handle handle) {
String insertQuery =
switch (connectionType) {
case MYSQL -> """
INSERT INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
SELECT CONCAT('0.0.', CAST(version AS UNSIGNED)) as version,
CASE
WHEN script LIKE 'v%__.sql' THEN CONCAT('bootstrap/sql/migrations/flyway/mysql/', script)
ELSE CONCAT('bootstrap/sql/migrations/flyway/mysql/v', version, '__', REPLACE(LOWER(description), ' ', '_'), '.sql')
END as migrationfilename,
'0' as checksum,
installed_on,
NULL as metrics
FROM DATABASE_CHANGE_LOG
WHERE CONCAT('0.0.', CAST(version AS UNSIGNED)) NOT IN (SELECT version FROM SERVER_CHANGE_LOG)
AND success = true
""";
case POSTGRES -> """
INSERT INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
SELECT '0.0.' || CAST(version AS INTEGER) as version,
CASE
WHEN script LIKE 'v%__.sql' THEN 'bootstrap/sql/migrations/flyway/postgres/' || script
ELSE 'bootstrap/sql/migrations/flyway/postgres/v' || version || '__' || REPLACE(LOWER(description), ' ', '_') || '.sql'
END as migrationfilename,
'0' as checksum,
installed_on,
NULL as metrics
FROM "DATABASE_CHANGE_LOG"
WHERE '0.0.' || CAST(version AS INTEGER) NOT IN (SELECT version FROM SERVER_CHANGE_LOG)
AND success = true
""";
};
return handle.createUpdate(insertQuery).execute();
}
} }

View File

@ -1,6 +1,5 @@
package org.openmetadata.service.util; package org.openmetadata.service.util;
import static org.flywaydb.core.internal.info.MigrationInfoDumper.dumpToAsciiTable;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.ADMIN_USER_NAME; import static org.openmetadata.service.Entity.ADMIN_USER_NAME;
import static org.openmetadata.service.Entity.FIELD_OWNERS; import static org.openmetadata.service.Entity.FIELD_OWNERS;
@ -25,7 +24,6 @@ import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.jackson.Jackson; import io.dropwizard.jackson.Jackson;
import io.dropwizard.jersey.validation.Validators; import io.dropwizard.jersey.validation.Validators;
import jakarta.validation.Validator; import jakarta.validation.Validator;
import java.io.File;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
@ -40,8 +38,7 @@ import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flywaydb.core.Flyway; import org.jdbi.v3.core.Handle;
import org.flywaydb.core.api.MigrationVersion;
import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.core.Jdbi;
import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.ServiceEntityInterface; import org.openmetadata.schema.ServiceEntityInterface;
@ -129,7 +126,6 @@ import picocli.CommandLine.Option;
public class OpenMetadataOperations implements Callable<Integer> { public class OpenMetadataOperations implements Callable<Integer> {
private OpenMetadataApplicationConfig config; private OpenMetadataApplicationConfig config;
private Flyway flyway;
private Jdbi jdbi; private Jdbi jdbi;
private SearchRepository searchRepository; private SearchRepository searchRepository;
private String nativeSQLScriptRootPath; private String nativeSQLScriptRootPath;
@ -158,54 +154,6 @@ public class OpenMetadataOperations implements Callable<Integer> {
return 0; return 0;
} }
@Command(
name = "info",
description =
"Shows the list of migrations applied and the pending migration "
+ "waiting to be applied on the target database")
public Integer info() {
try {
parseConfig();
LOG.info(dumpToAsciiTable(flyway.info().all()));
return 0;
} catch (Exception e) {
LOG.error("Failed due to ", e);
return 1;
}
}
@Command(
name = "validate",
description =
"Checks if the all the migrations haven been applied " + "on the target database.")
public Integer validate() {
try {
parseConfig();
flyway.validate();
return 0;
} catch (Exception e) {
LOG.error("Database migration validation failed due to ", e);
return 1;
}
}
@Command(
name = "repair",
description =
"Repairs the DATABASE_CHANGE_LOG table which is used to track"
+ "all the migrations on the target database This involves removing entries for the failed migrations and update"
+ "the checksum of migrations already applied on the target database")
public Integer repair() {
try {
parseConfig();
flyway.repair();
return 0;
} catch (Exception e) {
LOG.error("Repair of CHANGE_LOG failed due to ", e);
return 1;
}
}
@Command( @Command(
name = "setOpenMetadataUrl", name = "setOpenMetadataUrl",
description = "Set or update the OpenMetadata URL in the system repository") description = "Set or update the OpenMetadata URL in the system repository")
@ -561,7 +509,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
public Integer checkConnection() { public Integer checkConnection() {
try { try {
parseConfig(); parseConfig();
flyway.getConfiguration().getDataSource().getConnection(); jdbi.open().getConnection();
return 0; return 0;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to check connection due to ", e); LOG.error("Failed to check connection due to ", e);
@ -579,9 +527,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
promptUserForDelete(); promptUserForDelete();
parseConfig(); parseConfig();
LOG.info("Deleting all the OpenMetadata tables."); LOG.info("Deleting all the OpenMetadata tables.");
flyway.clean(); dropAllTables();
LOG.info("Creating the OpenMetadata Schema.");
flyway.migrate();
LOG.info("Running the Native Migrations."); LOG.info("Running the Native Migrations.");
validateAndRunSystemDataMigrations(true); validateAndRunSystemDataMigrations(true);
LOG.info("OpenMetadata Database Schema is Updated."); LOG.info("OpenMetadata Database Schema is Updated.");
@ -660,7 +606,6 @@ public class OpenMetadataOperations implements Callable<Integer> {
try { try {
LOG.info("Migrating the OpenMetadata Schema."); LOG.info("Migrating the OpenMetadata Schema.");
parseConfig(); parseConfig();
flyway.migrate();
validateAndRunSystemDataMigrations(force); validateAndRunSystemDataMigrations(force);
LOG.info("Update Search Indexes."); LOG.info("Update Search Indexes.");
searchRepository.updateIndexes(); searchRepository.updateIndexes();
@ -1725,30 +1670,6 @@ public class OpenMetadataOperations implements Callable<Integer> {
dataSourceFactory.setPassword(token); dataSourceFactory.setPassword(token);
}); });
String jdbcUrl = dataSourceFactory.getUrl();
String user = dataSourceFactory.getUser();
String password = dataSourceFactory.getPassword();
assert user != null && password != null;
String flywayRootPath = config.getMigrationConfiguration().getFlywayPath();
String location =
"filesystem:"
+ flywayRootPath
+ File.separator
+ config.getDataSourceFactory().getDriverClass();
flyway =
Flyway.configure()
.encoding(StandardCharsets.UTF_8)
.table("DATABASE_CHANGE_LOG")
.sqlMigrationPrefix("v")
.validateOnMigrate(false)
.outOfOrder(false)
.baselineOnMigrate(true)
.baselineVersion(MigrationVersion.fromVersion("000"))
.cleanOnValidationError(false)
.locations(location)
.dataSource(jdbcUrl, user, password)
.cleanDisabled(false)
.load();
nativeSQLScriptRootPath = config.getMigrationConfiguration().getNativePath(); nativeSQLScriptRootPath = config.getMigrationConfiguration().getNativePath();
extensionSQLScriptRootPath = config.getMigrationConfiguration().getExtensionPath(); extensionSQLScriptRootPath = config.getMigrationConfiguration().getExtensionPath();
@ -1773,6 +1694,43 @@ public class OpenMetadataOperations implements Callable<Integer> {
DatasourceConfig.initialize(connType.label); DatasourceConfig.initialize(connType.label);
} }
// This was before handled via flyway's clean command.
private void dropAllTables() {
try (Handle handle = jdbi.open()) {
ConnectionType connType = ConnectionType.from(config.getDataSourceFactory().getDriverClass());
if (connType == ConnectionType.MYSQL) {
handle.execute("SET FOREIGN_KEY_CHECKS = 0");
handle
.createQuery("SHOW TABLES")
.mapTo(String.class)
.list()
.forEach(
tableName -> {
try {
handle.execute("DROP TABLE IF EXISTS " + tableName);
} catch (Exception e) {
LOG.warn("Failed to drop table: " + tableName, e);
}
});
handle.execute("SET FOREIGN_KEY_CHECKS = 1");
} else if (connType == ConnectionType.POSTGRES) {
handle
.createQuery(
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
.mapTo(String.class)
.list()
.forEach(
tableName -> {
try {
handle.execute("DROP TABLE IF EXISTS \"" + tableName + "\" CASCADE");
} catch (Exception e) {
LOG.warn("Failed to drop table: " + tableName, e);
}
});
}
}
}
private void promptUserForDelete() { private void promptUserForDelete() {
LOG.info( LOG.info(
""" """
@ -1797,7 +1755,13 @@ public class OpenMetadataOperations implements Callable<Integer> {
DatasourceConfig.initialize(connType.label); DatasourceConfig.initialize(connType.label);
MigrationWorkflow workflow = MigrationWorkflow workflow =
new MigrationWorkflow( new MigrationWorkflow(
jdbi, nativeSQLScriptRootPath, connType, extensionSQLScriptRootPath, config, force); jdbi,
nativeSQLScriptRootPath,
connType,
extensionSQLScriptRootPath,
config.getMigrationConfiguration().getFlywayPath(),
config,
force);
workflow.loadMigrations(); workflow.loadMigrations();
workflow.printMigrationInfo(); workflow.printMigrationInfo();
workflow.runMigrationWorkflows(true); workflow.runMigrationWorkflows(true);

View File

@ -273,6 +273,7 @@ public abstract class JwtAuthOpenMetadataApplicationTest {
nativeMigrationSQLPath, nativeMigrationSQLPath,
connType, connType,
extensionSQLScriptRootPath, extensionSQLScriptRootPath,
"", // flywayPath - empty string as placeholder
config, config,
forceMigrations); forceMigrations);
// Initialize search repository // Initialize search repository

View File

@ -45,7 +45,6 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider; import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.flywaydb.core.Flyway;
import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.jetty.connector.JettyClientProperties; import org.glassfish.jersey.jetty.connector.JettyClientProperties;
@ -176,9 +175,6 @@ public abstract class OpenMetadataApplicationTest {
dataSourceFactory.setDriverClass(sqlContainer.getDriverClassName()); dataSourceFactory.setDriverClass(sqlContainer.getDriverClassName());
config.setDataSourceFactory(dataSourceFactory); config.setDataSourceFactory(dataSourceFactory);
final String flyWayMigrationScriptsLocation =
ResourceHelpers.resourceFilePath(
"db/sql/migrations/flyway/" + sqlContainer.getDriverClassName());
final String nativeMigrationScriptsLocation = final String nativeMigrationScriptsLocation =
ResourceHelpers.resourceFilePath("db/sql/migrations/native/"); ResourceHelpers.resourceFilePath("db/sql/migrations/native/");
@ -193,17 +189,6 @@ public abstract class OpenMetadataApplicationTest {
} catch (Exception ex) { } catch (Exception ex) {
LOG.info("Extension migrations not found"); LOG.info("Extension migrations not found");
} }
Flyway flyway =
Flyway.configure()
.dataSource(
sqlContainer.getJdbcUrl(), sqlContainer.getUsername(), sqlContainer.getPassword())
.table("DATABASE_CHANGE_LOG")
.locations("filesystem:" + flyWayMigrationScriptsLocation)
.sqlMigrationPrefix("v")
.cleanDisabled(false)
.load();
flyway.clean();
flyway.migrate();
ELASTIC_SEARCH_CONTAINER = new ElasticsearchContainer(elasticSearchContainerImage); ELASTIC_SEARCH_CONTAINER = new ElasticsearchContainer(elasticSearchContainerImage);
ELASTIC_SEARCH_CONTAINER.withPassword("password"); ELASTIC_SEARCH_CONTAINER.withPassword("password");
@ -227,8 +212,6 @@ public abstract class OpenMetadataApplicationTest {
IndexMappingLoader.init(getEsConfig()); IndexMappingLoader.init(getEsConfig());
// Migration overrides // Migration overrides
configOverrides.add(
ConfigOverride.config("migrationConfiguration.flywayPath", flyWayMigrationScriptsLocation));
configOverrides.add( configOverrides.add(
ConfigOverride.config("migrationConfiguration.nativePath", nativeMigrationScriptsLocation)); ConfigOverride.config("migrationConfiguration.nativePath", nativeMigrationScriptsLocation));
@ -274,6 +257,7 @@ public abstract class OpenMetadataApplicationTest {
nativeMigrationSQLPath, nativeMigrationSQLPath,
connType, connType,
extensionSQLScriptRootPath, extensionSQLScriptRootPath,
config.getMigrationConfiguration().getFlywayPath(),
config, config,
forceMigrations); forceMigrations);
// Initialize search repository // Initialize search repository

View File

@ -0,0 +1,167 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.migration;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import org.jdbi.v3.core.Handle;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.openmetadata.service.OpenMetadataApplicationTest;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.utils.v1110.MigrationUtil;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class FlywayMigrationIntegrationTest extends OpenMetadataApplicationTest {
private static MigrationDAO migrationDAO;
private static ConnectionType connectionType;
private static MigrationUtil migrationUtil;
@BeforeAll
public static void setup() {
migrationDAO = jdbi.onDemand(MigrationDAO.class);
connectionType =
ConnectionType.from(APP.getConfiguration().getDataSourceFactory().getDriverClass());
migrationUtil = new MigrationUtil(connectionType, null);
}
@Test
public void testFlywayMigrationVersionsQuery() {
List<String> flywayVersions = migrationDAO.getFlywayMigrationVersions();
assertNotNull(flywayVersions, "Flyway migration versions list should not be null");
// Verify that versions are ordered correctly
if (flywayVersions.size() > 1) {
for (int i = 1; i < flywayVersions.size(); i++) {
String previous = flywayVersions.get(i - 1);
String current = flywayVersions.get(i);
assertTrue(
previous.compareTo(current) <= 0,
String.format(
"Flyway versions should be ordered: %s should be <= %s", previous, current));
}
}
}
@Test
public void testServerChangeLogFlywayRecords() {
List<MigrationDAO.ServerChangeLog> flywayRecords = migrationDAO.getFlywayMigrationRecords();
for (MigrationDAO.ServerChangeLog record : flywayRecords) {
// Verify that the record has the expected Flyway characteristics
assertNotNull(record.getVersion(), "Version should not be null");
assertNotNull(record.getMigrationFileName(), "Migration file name should not be null");
assertTrue(
record.getMigrationFileName().contains("flyway"),
"Migration file name should contain 'flyway'");
// Verify version format for Flyway migrations (should be 0.0.X format)
if (record.getMigrationFileName().contains("flyway")) {
assertTrue(
record.getVersion().matches("^0\\.0\\.\\d+$"),
String.format(
"Flyway version should match 0.0.X format, but was: %s", record.getVersion()));
}
// Verify that the file name includes the full path for proper identification
assertTrue(
record.getMigrationFileName().contains("bootstrap/sql/migrations/flyway")
|| record.getMigrationFileName().contains("v0")
|| record.getMigrationFileName().endsWith(".sql"),
"Migration file name should contain proper path or be a valid SQL file");
}
}
@Test
public void testMigrationDAOFlywayVersionsConsistency() {
List<String> flywayVersions = migrationDAO.getFlywayMigrationVersions();
List<MigrationDAO.ServerChangeLog> flywayRecords = migrationDAO.getFlywayMigrationRecords();
assertEquals(
flywayRecords.size(),
flywayVersions.size(),
"Flyway versions and records should have same count");
// Verify that each version from getFlywayMigrationVersions has a corresponding record
for (int i = 0; i < flywayVersions.size(); i++) {
String version = flywayVersions.get(i);
MigrationDAO.ServerChangeLog record = flywayRecords.get(i);
assertEquals(version, record.getVersion(), "Version should match between methods");
}
}
@Test
public void testFlywayMigrationFilePathFormat() {
List<MigrationDAO.ServerChangeLog> flywayRecords = migrationDAO.getFlywayMigrationRecords();
for (MigrationDAO.ServerChangeLog record : flywayRecords) {
String filePath = record.getMigrationFileName();
assertFalse(
filePath.endsWith("org.postgresql.Driver")
|| filePath.endsWith("com.mysql.cj.jdbc.Driver"),
String.format("File path should not end with driver name: %s", filePath));
// Should contain proper Flyway path structure
assertTrue(
filePath.contains("flyway") || filePath.startsWith("v0"),
String.format("File path should contain 'flyway' or be a versioned file: %s", filePath));
}
}
@Test
public void testMigrationUtilFlywayDataCheck() {
try (Handle handle = jdbi.open()) {
boolean flywayTableExists =
migrationUtil.checkTableExists(handle, MigrationUtil.FLYWAY_TABLE_NAME);
assertFalse(flywayTableExists, "Flyway schema history table should not exist for new runs");
boolean serverChangeLogExists = migrationUtil.checkTableExists(handle, "SERVER_CHANGE_LOG");
assertTrue(serverChangeLogExists, "SERVER_CHANGE_LOG table should exist after migrations");
}
}
@Test
public void testVersionFormatValidation() {
List<String> flywayVersions = migrationDAO.getFlywayMigrationVersions();
for (String version : flywayVersions) {
assertTrue(
version.matches("^\\d+\\.\\d+\\.\\d+.*$"),
String.format("Version should follow semantic versioning format: %s", version));
if (version.startsWith("0.0.")) {
String[] parts = version.split("\\.");
assertTrue(
parts.length >= 3,
String.format("0.0.X version should have at least 3 parts: %s", version));
try {
Integer.parseInt(parts[2]);
} catch (NumberFormatException e) {
assertTrue(
parts[2].matches("\\d+.*"),
String.format("Third version part should start with a number: %s", version));
}
}
}
}
}

View File

@ -28,7 +28,13 @@ public class MigrationWorkflowTest extends OpenMetadataApplicationTest {
migrationWorkflow = migrationWorkflow =
spy( spy(
new MigrationWorkflow( new MigrationWorkflow(
jdbi, "nativePath", ConnectionType.MYSQL, "extensionPath", null, false)); jdbi,
"nativePath",
ConnectionType.MYSQL,
"extensionPath",
"flywayPath",
null,
false));
omMigrationList = omMigrationList =
List.of( List.of(
@ -83,7 +89,8 @@ public class MigrationWorkflowTest extends OpenMetadataApplicationTest {
"nativePath", "nativePath",
ConnectionType.MYSQL, ConnectionType.MYSQL,
migrationWorkflow.getOpenMetadataApplicationConfig(), migrationWorkflow.getOpenMetadataApplicationConfig(),
"extensionPath"); "extensionPath",
"flywayPath");
assertEquals( assertEquals(
List.of("1.1.0", "1.1.0-collate", "1.2.0", "1.2.1", "1.2.2-collate"), List.of("1.1.0", "1.1.0-collate", "1.2.0", "1.2.1", "1.2.2-collate"),

41
pom.xml
View File

@ -101,7 +101,6 @@
<jdbi3.version>3.37.1</jdbi3.version> <jdbi3.version>3.37.1</jdbi3.version>
<commons-cli.version>1.9.0</commons-cli.version> <commons-cli.version>1.9.0</commons-cli.version>
<commons-io.version>2.17.0</commons-io.version> <commons-io.version>2.17.0</commons-io.version>
<flyway.version>9.22.3</flyway.version>
<redshift-jdbc.version>2.1.0.30</redshift-jdbc.version> <redshift-jdbc.version>2.1.0.30</redshift-jdbc.version>
<gson.version>2.11.0</gson.version> <gson.version>2.11.0</gson.version>
<mysql.connector.version>9.3.0</mysql.connector.version> <mysql.connector.version>9.3.0</mysql.connector.version>
@ -161,6 +160,7 @@
<jakarta-el.version>5.0.0-M1</jakarta-el.version> <jakarta-el.version>5.0.0-M1</jakarta-el.version>
<mcp-sdk.version>0.11.2</mcp-sdk.version> <mcp-sdk.version>0.11.2</mcp-sdk.version>
<lettuce.version>6.7.1.RELEASE</lettuce.version> <lettuce.version>6.7.1.RELEASE</lettuce.version>
<flyway.version>9.22.3</flyway.version>
<angus-mail.version>2.0.4</angus-mail.version> <angus-mail.version>2.0.4</angus-mail.version>
</properties> </properties>
@ -359,17 +359,6 @@
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
<version>${commons-io.version}</version> <version>${commons-io.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-maven-plugin</artifactId>
<version>${flyway.version}</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>com.mysql</groupId> <groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId> <artifactId>mysql-connector-j</artifactId>
@ -381,11 +370,6 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-mysql</artifactId>
<version>${flyway.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.postgresql</groupId> <groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId> <artifactId>postgresql</artifactId>
@ -676,6 +660,29 @@
<artifactId>lettuce-core</artifactId> <artifactId>lettuce-core</artifactId>
<version>${lettuce.version}</version> <version>${lettuce.version}</version>
</dependency> </dependency>
<!-- Flyway dependencies - only for SQL parsing, not migration management -->
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
<version>${flyway.version}</version>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-maven-plugin</artifactId>
<version>${flyway.version}</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-mysql</artifactId>
<version>${flyway.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>