mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-03 06:03:12 +00:00
MINOR - Migrate extension separated from native (#16252)
* MINOR - Migrate extension separated from native * cleanup
This commit is contained in:
parent
f231e552e1
commit
71b8c797e2
@ -30,15 +30,6 @@ public interface MigrationDAO {
|
|||||||
@SingleValue
|
@SingleValue
|
||||||
Optional<String> getMaxVersion() throws StatementException;
|
Optional<String> getMaxVersion() throws StatementException;
|
||||||
|
|
||||||
@ConnectionAwareSqlQuery(
|
|
||||||
value = "SELECT MAX(version) FROM SERVER_CHANGE_LOG",
|
|
||||||
connectionType = MYSQL)
|
|
||||||
@ConnectionAwareSqlQuery(
|
|
||||||
value = "SELECT max(version) FROM SERVER_CHANGE_LOG",
|
|
||||||
connectionType = POSTGRES)
|
|
||||||
@SingleValue
|
|
||||||
Optional<String> getMaxServerMigrationVersion() throws StatementException;
|
|
||||||
|
|
||||||
@ConnectionAwareSqlQuery(
|
@ConnectionAwareSqlQuery(
|
||||||
value = "SELECT checksum FROM SERVER_CHANGE_LOG where version = :version",
|
value = "SELECT checksum FROM SERVER_CHANGE_LOG where version = :version",
|
||||||
connectionType = MYSQL)
|
connectionType = MYSQL)
|
||||||
|
@ -35,17 +35,6 @@ public final class Migration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Optional<String> lastMigratedServer(Jdbi jdbi) {
|
|
||||||
try {
|
|
||||||
return jdbi.withExtension(MigrationDAO.class, MigrationDAO::getMaxServerMigrationVersion);
|
|
||||||
} catch (StatementException e) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"Exception encountered when trying to obtain last migrated Server version."
|
|
||||||
+ " Make sure you have run `./bootstrap/bootstrap_storage.sh migrate-all` at least once.",
|
|
||||||
e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String lastMigrationFile(MigrationConfiguration conf) throws IOException {
|
public static String lastMigrationFile(MigrationConfiguration conf) throws IOException {
|
||||||
List<String> migrationFiles = getMigrationVersions(conf);
|
List<String> migrationFiles = getMigrationVersions(conf);
|
||||||
return Collections.max(migrationFiles);
|
return Collections.max(migrationFiles);
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package org.openmetadata.service.migration.api;
|
package org.openmetadata.service.migration.api;
|
||||||
|
|
||||||
|
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||||
import static org.openmetadata.service.util.OpenMetadataOperations.printToAsciiTable;
|
import static org.openmetadata.service.util.OpenMetadataOperations.printToAsciiTable;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@ -34,6 +35,7 @@ public class MigrationWorkflow {
|
|||||||
private final MigrationDAO migrationDAO;
|
private final MigrationDAO migrationDAO;
|
||||||
private final Jdbi jdbi;
|
private final Jdbi jdbi;
|
||||||
private final boolean forceMigrations;
|
private final boolean forceMigrations;
|
||||||
|
List<String> executedMigrations;
|
||||||
private Optional<String> currentMaxMigrationVersion;
|
private Optional<String> currentMaxMigrationVersion;
|
||||||
|
|
||||||
public MigrationWorkflow(
|
public MigrationWorkflow(
|
||||||
@ -73,7 +75,7 @@ public class MigrationWorkflow {
|
|||||||
ConnectionType connectionType,
|
ConnectionType connectionType,
|
||||||
String extensionSQLScriptRootPath) {
|
String extensionSQLScriptRootPath) {
|
||||||
List<MigrationFile> availableOMNativeMigrations =
|
List<MigrationFile> availableOMNativeMigrations =
|
||||||
getMigrationFilesFromPath(nativeSQLScriptRootPath, connectionType);
|
getMigrationFilesFromPath(nativeSQLScriptRootPath, connectionType, false);
|
||||||
|
|
||||||
// If we only have OM migrations, return them
|
// If we only have OM migrations, return them
|
||||||
if (extensionSQLScriptRootPath == null || extensionSQLScriptRootPath.isEmpty()) {
|
if (extensionSQLScriptRootPath == null || extensionSQLScriptRootPath.isEmpty()) {
|
||||||
@ -82,7 +84,7 @@ public class MigrationWorkflow {
|
|||||||
|
|
||||||
// Otherwise, fetch the extension migrations and sort the executions
|
// Otherwise, fetch the extension migrations and sort the executions
|
||||||
List<MigrationFile> availableExtensionMigrations =
|
List<MigrationFile> availableExtensionMigrations =
|
||||||
getMigrationFilesFromPath(extensionSQLScriptRootPath, connectionType);
|
getMigrationFilesFromPath(extensionSQLScriptRootPath, connectionType, true);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
If we create migrations version as:
|
If we create migrations version as:
|
||||||
@ -96,9 +98,10 @@ public class MigrationWorkflow {
|
|||||||
.toList();
|
.toList();
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<MigrationFile> getMigrationFilesFromPath(String path, ConnectionType connectionType) {
|
public List<MigrationFile> getMigrationFilesFromPath(
|
||||||
|
String path, ConnectionType connectionType, Boolean isExtension) {
|
||||||
return Arrays.stream(Objects.requireNonNull(new File(path).listFiles(File::isDirectory)))
|
return Arrays.stream(Objects.requireNonNull(new File(path).listFiles(File::isDirectory)))
|
||||||
.map(dir -> new MigrationFile(dir, migrationDAO, connectionType))
|
.map(dir -> new MigrationFile(dir, migrationDAO, connectionType, isExtension))
|
||||||
.sorted()
|
.sorted()
|
||||||
.toList();
|
.toList();
|
||||||
}
|
}
|
||||||
@ -106,13 +109,11 @@ public class MigrationWorkflow {
|
|||||||
private List<MigrationProcess> filterAndGetMigrationsToRun(
|
private List<MigrationProcess> filterAndGetMigrationsToRun(
|
||||||
List<MigrationFile> availableMigrations) {
|
List<MigrationFile> availableMigrations) {
|
||||||
LOG.debug("Filtering Server Migrations");
|
LOG.debug("Filtering Server Migrations");
|
||||||
currentMaxMigrationVersion = migrationDAO.getMaxServerMigrationVersion();
|
executedMigrations = migrationDAO.getMigrationVersions();
|
||||||
|
currentMaxMigrationVersion = executedMigrations.stream().max(String::compareTo);
|
||||||
List<MigrationFile> applyMigrations;
|
List<MigrationFile> applyMigrations;
|
||||||
if (currentMaxMigrationVersion.isPresent() && !forceMigrations) {
|
if (!nullOrEmpty(executedMigrations) && !forceMigrations) {
|
||||||
applyMigrations =
|
applyMigrations = getMigrationsToApply(executedMigrations, availableMigrations);
|
||||||
availableMigrations.stream()
|
|
||||||
.filter(migration -> migration.biggerThan(currentMaxMigrationVersion.get()))
|
|
||||||
.toList();
|
|
||||||
} else {
|
} else {
|
||||||
applyMigrations = availableMigrations;
|
applyMigrations = availableMigrations;
|
||||||
}
|
}
|
||||||
@ -132,6 +133,44 @@ public class MigrationWorkflow {
|
|||||||
return processes;
|
return processes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We'll take the max from native migrations and double-check if there's any extension migration
|
||||||
|
* pending to be applied
|
||||||
|
*/
|
||||||
|
public List<MigrationFile> getMigrationsToApply(
|
||||||
|
List<String> executedMigrations, List<MigrationFile> availableMigrations) {
|
||||||
|
List<MigrationFile> migrationsToApply = new ArrayList<>();
|
||||||
|
List<MigrationFile> nativeMigrationsToApply =
|
||||||
|
processNativeMigrations(executedMigrations, availableMigrations);
|
||||||
|
List<MigrationFile> extensionMigrationsToApply =
|
||||||
|
processExtensionMigrations(executedMigrations, availableMigrations);
|
||||||
|
|
||||||
|
migrationsToApply.addAll(nativeMigrationsToApply);
|
||||||
|
migrationsToApply.addAll(extensionMigrationsToApply);
|
||||||
|
return migrationsToApply;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<MigrationFile> processNativeMigrations(
|
||||||
|
List<String> executedMigrations, List<MigrationFile> availableMigrations) {
|
||||||
|
Stream<MigrationFile> availableNativeMigrations =
|
||||||
|
availableMigrations.stream().filter(migration -> !migration.isExtension);
|
||||||
|
Optional<String> maxMigration = executedMigrations.stream().max(String::compareTo);
|
||||||
|
if (maxMigration.isPresent()) {
|
||||||
|
return availableNativeMigrations
|
||||||
|
.filter(migration -> migration.biggerThan(maxMigration.get()))
|
||||||
|
.toList();
|
||||||
|
}
|
||||||
|
return availableNativeMigrations.toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<MigrationFile> processExtensionMigrations(
|
||||||
|
List<String> executedMigrations, List<MigrationFile> availableMigrations) {
|
||||||
|
return availableMigrations.stream()
|
||||||
|
.filter(migration -> migration.isExtension)
|
||||||
|
.filter(migration -> !executedMigrations.contains(migration.version))
|
||||||
|
.toList();
|
||||||
|
}
|
||||||
|
|
||||||
public void printMigrationInfo() {
|
public void printMigrationInfo() {
|
||||||
LOG.info("Following Migrations will be performed, with Force Migration : {}", forceMigrations);
|
LOG.info("Following Migrations will be performed, with Force Migration : {}", forceMigrations);
|
||||||
List<String> columns = Arrays.asList("Version", "ConnectionType", "MigrationsFilePath");
|
List<String> columns = Arrays.asList("Version", "ConnectionType", "MigrationsFilePath");
|
||||||
|
@ -22,6 +22,7 @@ public class MigrationFile implements Comparable<MigrationFile> {
|
|||||||
public final String version;
|
public final String version;
|
||||||
public final ConnectionType connectionType;
|
public final ConnectionType connectionType;
|
||||||
public final File dir;
|
public final File dir;
|
||||||
|
public final Boolean isExtension;
|
||||||
public final String dbPackageName;
|
public final String dbPackageName;
|
||||||
|
|
||||||
private final MigrationDAO migrationDAO;
|
private final MigrationDAO migrationDAO;
|
||||||
@ -30,8 +31,10 @@ public class MigrationFile implements Comparable<MigrationFile> {
|
|||||||
public static final String DEFAULT_MIGRATION_PROCESS_CLASS =
|
public static final String DEFAULT_MIGRATION_PROCESS_CLASS =
|
||||||
"org.openmetadata.service.migration.api.MigrationProcessImpl";
|
"org.openmetadata.service.migration.api.MigrationProcessImpl";
|
||||||
|
|
||||||
public MigrationFile(File dir, MigrationDAO migrationDAO, ConnectionType connectionType) {
|
public MigrationFile(
|
||||||
|
File dir, MigrationDAO migrationDAO, ConnectionType connectionType, Boolean isExtension) {
|
||||||
this.dir = dir;
|
this.dir = dir;
|
||||||
|
this.isExtension = isExtension;
|
||||||
this.version = dir.getName();
|
this.version = dir.getName();
|
||||||
this.connectionType = connectionType;
|
this.connectionType = connectionType;
|
||||||
this.migrationDAO = migrationDAO;
|
this.migrationDAO = migrationDAO;
|
||||||
|
@ -6,8 +6,10 @@ import static org.mockito.ArgumentMatchers.eq;
|
|||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.openmetadata.service.OpenMetadataApplicationTest;
|
import org.openmetadata.service.OpenMetadataApplicationTest;
|
||||||
@ -17,46 +19,89 @@ import org.openmetadata.service.migration.utils.MigrationFile;
|
|||||||
|
|
||||||
public class MigrationWorkflowTest extends OpenMetadataApplicationTest {
|
public class MigrationWorkflowTest extends OpenMetadataApplicationTest {
|
||||||
|
|
||||||
@Test
|
public static MigrationWorkflow migrationWorkflow;
|
||||||
void test_getMigrationFiles() {
|
public static List<MigrationFile> omMigrationList;
|
||||||
|
public static List<MigrationFile> collateMigrationList;
|
||||||
|
|
||||||
MigrationWorkflow migrationWorkflow =
|
@BeforeAll
|
||||||
|
public static void setup() {
|
||||||
|
migrationWorkflow =
|
||||||
spy(
|
spy(
|
||||||
new MigrationWorkflow(
|
new MigrationWorkflow(
|
||||||
jdbi, "nativePath", ConnectionType.MYSQL, "extensionPath", false));
|
jdbi, "nativePath", ConnectionType.MYSQL, "extensionPath", false));
|
||||||
|
|
||||||
List<MigrationFile> omMigrationList =
|
omMigrationList =
|
||||||
List.of(
|
List.of(
|
||||||
new MigrationFile(
|
new MigrationFile(
|
||||||
new File("/bootstrap/sql/migrations/native/1.1.0"), null, ConnectionType.MYSQL),
|
new File("/bootstrap/sql/migrations/native/1.1.0"),
|
||||||
|
null,
|
||||||
|
ConnectionType.MYSQL,
|
||||||
|
false),
|
||||||
new MigrationFile(
|
new MigrationFile(
|
||||||
new File("/bootstrap/sql/migrations/native/1.2.0"), null, ConnectionType.MYSQL),
|
new File("/bootstrap/sql/migrations/native/1.2.0"),
|
||||||
|
null,
|
||||||
|
ConnectionType.MYSQL,
|
||||||
|
false),
|
||||||
new MigrationFile(
|
new MigrationFile(
|
||||||
new File("/bootstrap/sql/migrations/native/1.2.1"), null, ConnectionType.MYSQL));
|
new File("/bootstrap/sql/migrations/native/1.2.1"),
|
||||||
|
null,
|
||||||
|
ConnectionType.MYSQL,
|
||||||
|
false));
|
||||||
|
|
||||||
List<MigrationFile> collateMigrationList =
|
collateMigrationList =
|
||||||
List.of(
|
List.of(
|
||||||
new MigrationFile(
|
new MigrationFile(
|
||||||
new File("/bootstrap-collate/sql/migrations/native/1.1.0-collate"),
|
new File("/bootstrap-collate/sql/migrations/native/1.1.0-collate"),
|
||||||
null,
|
null,
|
||||||
ConnectionType.MYSQL),
|
ConnectionType.MYSQL,
|
||||||
|
true),
|
||||||
new MigrationFile(
|
new MigrationFile(
|
||||||
new File("/bootstrap-collate/sql/migrations/native/1.2.2-collate"),
|
new File("/bootstrap-collate/sql/migrations/native/1.2.2-collate"),
|
||||||
null,
|
null,
|
||||||
ConnectionType.MYSQL));
|
ConnectionType.MYSQL,
|
||||||
|
true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test_getMigrationFiles() {
|
||||||
Mockito.doReturn(omMigrationList)
|
Mockito.doReturn(omMigrationList)
|
||||||
.when(migrationWorkflow)
|
.when(migrationWorkflow)
|
||||||
.getMigrationFilesFromPath(eq("nativePath"), any(ConnectionType.class));
|
.getMigrationFilesFromPath(eq("nativePath"), any(ConnectionType.class), eq(false));
|
||||||
Mockito.doReturn(collateMigrationList)
|
Mockito.doReturn(collateMigrationList)
|
||||||
.when(migrationWorkflow)
|
.when(migrationWorkflow)
|
||||||
.getMigrationFilesFromPath(eq("extensionPath"), any(ConnectionType.class));
|
.getMigrationFilesFromPath(eq("extensionPath"), any(ConnectionType.class), eq(true));
|
||||||
|
|
||||||
List<MigrationFile> foundList =
|
List<MigrationFile> foundList =
|
||||||
migrationWorkflow.getMigrationFiles("nativePath", ConnectionType.MYSQL, "extensionPath");
|
migrationWorkflow.getMigrationFiles("nativePath", ConnectionType.MYSQL, "extensionPath");
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
foundList.stream().map(f -> f.dir.getName()).collect(Collectors.toList()),
|
List.of("1.1.0", "1.1.0-collate", "1.2.0", "1.2.1", "1.2.2-collate"),
|
||||||
List.of("1.1.0", "1.1.0-collate", "1.2.0", "1.2.1", "1.2.2-collate"));
|
foundList.stream().map(f -> f.dir.getName()).collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test_getMigrationsToApply() {
|
||||||
|
|
||||||
|
List<MigrationFile> availableMigrations = new ArrayList<>();
|
||||||
|
availableMigrations.addAll(omMigrationList);
|
||||||
|
availableMigrations.addAll(collateMigrationList);
|
||||||
|
|
||||||
|
// If we only have executed native migrations, we'll execute the Collate ones
|
||||||
|
assertEquals(
|
||||||
|
List.of("1.1.0-collate", "1.2.2-collate"),
|
||||||
|
migrationWorkflow
|
||||||
|
.getMigrationsToApply(List.of("1.1.0", "1.2.0", "1.2.1"), availableMigrations)
|
||||||
|
.stream()
|
||||||
|
.map(f -> f.dir.getName())
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
|
||||||
|
// We might have some native migrations, but not all
|
||||||
|
assertEquals(
|
||||||
|
List.of("1.2.1", "1.2.2-collate"),
|
||||||
|
migrationWorkflow
|
||||||
|
.getMigrationsToApply(List.of("1.1.0", "1.1.0-collate", "1.2.0"), availableMigrations)
|
||||||
|
.stream()
|
||||||
|
.map(f -> f.dir.getName())
|
||||||
|
.collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user