From 71b8c797e22f135036a1c3e06d34507a2d061e4d Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 15 May 2024 09:23:01 +0200 Subject: [PATCH] MINOR - Migrate extension separated from native (#16252) * MINOR - Migrate extension separated from native * cleanup --- .../service/jdbi3/MigrationDAO.java | 9 --- .../service/migration/Migration.java | 11 --- .../migration/api/MigrationWorkflow.java | 59 ++++++++++++--- .../migration/utils/MigrationFile.java | 5 +- .../service/util/MigrationWorkflowTest.java | 73 +++++++++++++++---- 5 files changed, 112 insertions(+), 45 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java index e47b7417ebb..2088513d1c0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java @@ -30,15 +30,6 @@ public interface MigrationDAO { @SingleValue Optional getMaxVersion() throws StatementException; - @ConnectionAwareSqlQuery( - value = "SELECT MAX(version) FROM SERVER_CHANGE_LOG", - connectionType = MYSQL) - @ConnectionAwareSqlQuery( - value = "SELECT max(version) FROM SERVER_CHANGE_LOG", - connectionType = POSTGRES) - @SingleValue - Optional getMaxServerMigrationVersion() throws StatementException; - @ConnectionAwareSqlQuery( value = "SELECT checksum FROM SERVER_CHANGE_LOG where version = :version", connectionType = MYSQL) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/Migration.java index f39640bbd14..eeaab97ad9f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/Migration.java @@ -35,17 +35,6 @@ public final class Migration { } } - public static Optional lastMigratedServer(Jdbi jdbi) { - try { - return jdbi.withExtension(MigrationDAO.class, MigrationDAO::getMaxServerMigrationVersion); - } catch (StatementException e) { - throw new IllegalArgumentException( - "Exception encountered when trying to obtain last migrated Server version." - + " Make sure you have run `./bootstrap/bootstrap_storage.sh migrate-all` at least once.", - e); - } - } - public static String lastMigrationFile(MigrationConfiguration conf) throws IOException { List migrationFiles = getMigrationVersions(conf); return Collections.max(migrationFiles); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java index e7de3257cbd..7a8ac99ec6a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java @@ -1,5 +1,6 @@ package org.openmetadata.service.migration.api; +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.util.OpenMetadataOperations.printToAsciiTable; import java.io.File; @@ -34,6 +35,7 @@ public class MigrationWorkflow { private final MigrationDAO migrationDAO; private final Jdbi jdbi; private final boolean forceMigrations; + List executedMigrations; private Optional currentMaxMigrationVersion; public MigrationWorkflow( @@ -73,7 +75,7 @@ public class MigrationWorkflow { ConnectionType connectionType, String extensionSQLScriptRootPath) { List availableOMNativeMigrations = - getMigrationFilesFromPath(nativeSQLScriptRootPath, connectionType); + getMigrationFilesFromPath(nativeSQLScriptRootPath, connectionType, false); // If we only have OM migrations, return them if (extensionSQLScriptRootPath == null || extensionSQLScriptRootPath.isEmpty()) { @@ -82,7 +84,7 @@ public class MigrationWorkflow { // Otherwise, fetch the extension migrations and sort the executions List availableExtensionMigrations = - getMigrationFilesFromPath(extensionSQLScriptRootPath, connectionType); + getMigrationFilesFromPath(extensionSQLScriptRootPath, connectionType, true); /* If we create migrations version as: @@ -96,9 +98,10 @@ public class MigrationWorkflow { .toList(); } - public List getMigrationFilesFromPath(String path, ConnectionType connectionType) { + public List getMigrationFilesFromPath( + String path, ConnectionType connectionType, Boolean isExtension) { return Arrays.stream(Objects.requireNonNull(new File(path).listFiles(File::isDirectory))) - .map(dir -> new MigrationFile(dir, migrationDAO, connectionType)) + .map(dir -> new MigrationFile(dir, migrationDAO, connectionType, isExtension)) .sorted() .toList(); } @@ -106,13 +109,11 @@ public class MigrationWorkflow { private List filterAndGetMigrationsToRun( List availableMigrations) { LOG.debug("Filtering Server Migrations"); - currentMaxMigrationVersion = migrationDAO.getMaxServerMigrationVersion(); + executedMigrations = migrationDAO.getMigrationVersions(); + currentMaxMigrationVersion = executedMigrations.stream().max(String::compareTo); List applyMigrations; - if (currentMaxMigrationVersion.isPresent() && !forceMigrations) { - applyMigrations = - availableMigrations.stream() - .filter(migration -> migration.biggerThan(currentMaxMigrationVersion.get())) - .toList(); + if (!nullOrEmpty(executedMigrations) && !forceMigrations) { + applyMigrations = getMigrationsToApply(executedMigrations, availableMigrations); } else { applyMigrations = availableMigrations; } @@ -132,6 +133,44 @@ public class MigrationWorkflow { return processes; } + /** + * We'll take the max from native migrations and double-check if there's any extension migration + * pending to be applied + */ + public List getMigrationsToApply( + List executedMigrations, List availableMigrations) { + List migrationsToApply = new ArrayList<>(); + List nativeMigrationsToApply = + processNativeMigrations(executedMigrations, availableMigrations); + List extensionMigrationsToApply = + processExtensionMigrations(executedMigrations, availableMigrations); + + migrationsToApply.addAll(nativeMigrationsToApply); + migrationsToApply.addAll(extensionMigrationsToApply); + return migrationsToApply; + } + + private List processNativeMigrations( + List executedMigrations, List availableMigrations) { + Stream availableNativeMigrations = + availableMigrations.stream().filter(migration -> !migration.isExtension); + Optional maxMigration = executedMigrations.stream().max(String::compareTo); + if (maxMigration.isPresent()) { + return availableNativeMigrations + .filter(migration -> migration.biggerThan(maxMigration.get())) + .toList(); + } + return availableNativeMigrations.toList(); + } + + private List processExtensionMigrations( + List executedMigrations, List availableMigrations) { + return availableMigrations.stream() + .filter(migration -> migration.isExtension) + .filter(migration -> !executedMigrations.contains(migration.version)) + .toList(); + } + public void printMigrationInfo() { LOG.info("Following Migrations will be performed, with Force Migration : {}", forceMigrations); List columns = Arrays.asList("Version", "ConnectionType", "MigrationsFilePath"); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/MigrationFile.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/MigrationFile.java index 74c31d6d6e3..36be47ec9b1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/MigrationFile.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/MigrationFile.java @@ -22,6 +22,7 @@ public class MigrationFile implements Comparable { public final String version; public final ConnectionType connectionType; public final File dir; + public final Boolean isExtension; public final String dbPackageName; private final MigrationDAO migrationDAO; @@ -30,8 +31,10 @@ public class MigrationFile implements Comparable { public static final String DEFAULT_MIGRATION_PROCESS_CLASS = "org.openmetadata.service.migration.api.MigrationProcessImpl"; - public MigrationFile(File dir, MigrationDAO migrationDAO, ConnectionType connectionType) { + public MigrationFile( + File dir, MigrationDAO migrationDAO, ConnectionType connectionType, Boolean isExtension) { this.dir = dir; + this.isExtension = isExtension; this.version = dir.getName(); this.connectionType = connectionType; this.migrationDAO = migrationDAO; diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/util/MigrationWorkflowTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/util/MigrationWorkflowTest.java index a5cadda989d..0955d7f8f65 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/util/MigrationWorkflowTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/util/MigrationWorkflowTest.java @@ -6,8 +6,10 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; import java.io.File; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.openmetadata.service.OpenMetadataApplicationTest; @@ -17,46 +19,89 @@ import org.openmetadata.service.migration.utils.MigrationFile; public class MigrationWorkflowTest extends OpenMetadataApplicationTest { - @Test - void test_getMigrationFiles() { + public static MigrationWorkflow migrationWorkflow; + public static List omMigrationList; + public static List collateMigrationList; - MigrationWorkflow migrationWorkflow = + @BeforeAll + public static void setup() { + migrationWorkflow = spy( new MigrationWorkflow( jdbi, "nativePath", ConnectionType.MYSQL, "extensionPath", false)); - List omMigrationList = + omMigrationList = List.of( new MigrationFile( - new File("/bootstrap/sql/migrations/native/1.1.0"), null, ConnectionType.MYSQL), + new File("/bootstrap/sql/migrations/native/1.1.0"), + null, + ConnectionType.MYSQL, + false), new MigrationFile( - new File("/bootstrap/sql/migrations/native/1.2.0"), null, ConnectionType.MYSQL), + new File("/bootstrap/sql/migrations/native/1.2.0"), + null, + ConnectionType.MYSQL, + false), new MigrationFile( - new File("/bootstrap/sql/migrations/native/1.2.1"), null, ConnectionType.MYSQL)); + new File("/bootstrap/sql/migrations/native/1.2.1"), + null, + ConnectionType.MYSQL, + false)); - List collateMigrationList = + collateMigrationList = List.of( new MigrationFile( new File("/bootstrap-collate/sql/migrations/native/1.1.0-collate"), null, - ConnectionType.MYSQL), + ConnectionType.MYSQL, + true), new MigrationFile( new File("/bootstrap-collate/sql/migrations/native/1.2.2-collate"), null, - ConnectionType.MYSQL)); + ConnectionType.MYSQL, + true)); + } + @Test + void test_getMigrationFiles() { Mockito.doReturn(omMigrationList) .when(migrationWorkflow) - .getMigrationFilesFromPath(eq("nativePath"), any(ConnectionType.class)); + .getMigrationFilesFromPath(eq("nativePath"), any(ConnectionType.class), eq(false)); Mockito.doReturn(collateMigrationList) .when(migrationWorkflow) - .getMigrationFilesFromPath(eq("extensionPath"), any(ConnectionType.class)); + .getMigrationFilesFromPath(eq("extensionPath"), any(ConnectionType.class), eq(true)); List foundList = migrationWorkflow.getMigrationFiles("nativePath", ConnectionType.MYSQL, "extensionPath"); assertEquals( - foundList.stream().map(f -> f.dir.getName()).collect(Collectors.toList()), - List.of("1.1.0", "1.1.0-collate", "1.2.0", "1.2.1", "1.2.2-collate")); + List.of("1.1.0", "1.1.0-collate", "1.2.0", "1.2.1", "1.2.2-collate"), + foundList.stream().map(f -> f.dir.getName()).collect(Collectors.toList())); + } + + @Test + void test_getMigrationsToApply() { + + List availableMigrations = new ArrayList<>(); + availableMigrations.addAll(omMigrationList); + availableMigrations.addAll(collateMigrationList); + + // If we only have executed native migrations, we'll execute the Collate ones + assertEquals( + List.of("1.1.0-collate", "1.2.2-collate"), + migrationWorkflow + .getMigrationsToApply(List.of("1.1.0", "1.2.0", "1.2.1"), availableMigrations) + .stream() + .map(f -> f.dir.getName()) + .collect(Collectors.toList())); + + // We might have some native migrations, but not all + assertEquals( + List.of("1.2.1", "1.2.2-collate"), + migrationWorkflow + .getMigrationsToApply(List.of("1.1.0", "1.1.0-collate", "1.2.0"), availableMigrations) + .stream() + .map(f -> f.dir.getName()) + .collect(Collectors.toList())); } }