MINOR: Fix Migrations (#19925)

* Fix Migrations

* Fix WorkflowHandler initialization

* Refactor how to fix the migrations by passing the Config object itself

* Fix Migrations

* Remove comments

* Remove comments
This commit is contained in:
IceS2 2025-02-26 08:23:08 +01:00 committed by GitHub
parent b587dfd344
commit fd7b00d246
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 74 additions and 97 deletions

View File

@ -1,7 +1,7 @@
UPDATE workflow_definition_entity
SET json = JSON_SET(json, '$.trigger.type', 'eventBasedEntity')
WHERE JSON_EXTRACT(json, '$.trigger.type') = 'eventBasedEntityTrigger';
WHERE JSON_EXTRACT(json, '$.trigger.type') in ('eventBasedEntityTrigger', 'eventBasedEntityWorkflow');
UPDATE workflow_definition_entity
SET json = JSON_SET(json, '$.trigger.type', 'periodicBatchEntity')
WHERE JSON_EXTRACT(json, '$.trigger.type') = 'periodicBatchEntityTrigger';
WHERE JSON_EXTRACT(json, '$.trigger.type') in ('periodicBatchEntityTrigger', 'periodicBatchEntityWorkflow');

View File

@ -1,7 +1,7 @@
UPDATE workflow_definition_entity
SET json = jsonb_set(json, '{trigger,type}', '"eventBasedEntity"')
WHERE json->'trigger'->>'type' = 'eventBasedEntityTrigger';
WHERE json->'trigger'->>'type' in ('eventBasedEntityTrigger', 'eventBasedEntityWorkflow');
UPDATE workflow_definition_entity
SET json = jsonb_set(json, '{trigger,type}', '"periodicBatchEntity"')
WHERE json->'trigger'->>'type' = 'periodicBatchEntityTrigger';
WHERE json->'trigger'->>'type' in ('periodicBatchEntityTrigger', 'periodicBatchEntityWorkflow');

View File

@ -473,8 +473,7 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
conf.getMigrationConfiguration().getNativePath(),
connectionType,
conf.getMigrationConfiguration().getExtensionPath(),
conf.getPipelineServiceClientConfiguration(),
conf.getAuthenticationConfiguration(),
conf,
false);
migrationWorkflow.loadMigrations();
migrationWorkflow.validateMigrationsForServer();

View File

@ -69,7 +69,7 @@ public class OpenMetadataApplicationConfig extends Configuration {
private static final String CERTIFICATE_PATH = "certificatePath";
public PipelineServiceClientConfiguration getPipelineServiceClientConfiguration() {
if (pipelineServiceClientConfiguration != null) {
LinkedHashMap<String, String> temporarySSLConfig =
(LinkedHashMap<String, String>) pipelineServiceClientConfiguration.getSslConfig();
if (temporarySSLConfig != null && temporarySSLConfig.containsKey(CERTIFICATE_PATH)) {
@ -77,6 +77,7 @@ public class OpenMetadataApplicationConfig extends Configuration {
temporarySSLConfig.remove(CERTIFICATE_PATH);
}
pipelineServiceClientConfiguration.setSslConfig(temporarySSLConfig);
}
return pipelineServiceClientConfiguration;
}

View File

@ -138,7 +138,9 @@ public class WorkflowHandler {
}
public static WorkflowHandler getInstance() {
if (initialized) return instance;
if (initialized) {
return instance;
}
throw new UnhandledServerException("WorkflowHandler is not initialized.");
}

View File

@ -12,7 +12,9 @@ import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.schema.api.security.AuthenticationConfiguration;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.migration.QueryStatus;
@ -26,9 +28,9 @@ public class MigrationProcessImpl implements MigrationProcess {
protected CollectionDAO collectionDAO;
protected Jdbi jdbi;
protected Handle handle;
protected PipelineServiceClientInterface pipelineServiceClient;
protected AuthenticationConfiguration authenticationConfiguration;
private final MigrationFile migrationFile;
private OpenMetadataApplicationConfig openMetadataApplicationConfig;
public @Getter MigrationContext context;
@ -42,10 +44,21 @@ public class MigrationProcessImpl implements MigrationProcess {
this.jdbi = jdbi;
this.collectionDAO = handle.attach(CollectionDAO.class);
this.migrationDAO = handle.attach(MigrationDAO.class);
this.pipelineServiceClient =
PipelineServiceClientFactory.createPipelineServiceClient(
this.migrationFile.pipelineServiceClientConfiguration);
this.authenticationConfiguration = migrationFile.authenticationConfiguration;
this.openMetadataApplicationConfig = this.migrationFile.openMetadataApplicationConfig;
this.authenticationConfiguration =
this.openMetadataApplicationConfig.getAuthenticationConfiguration();
}
public void initializeWorkflowHandler() {
WorkflowHandler.initialize(openMetadataApplicationConfig);
}
public PipelineServiceClientInterface getPipelineServiceClient() {
if (this.openMetadataApplicationConfig != null) {
return PipelineServiceClientFactory.createPipelineServiceClient(
this.openMetadataApplicationConfig.getPipelineServiceClientConfiguration());
}
return PipelineServiceClientFactory.createPipelineServiceClient(null);
}
@Override

View File

@ -17,8 +17,7 @@ import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.json.JSONObject;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.schema.api.security.AuthenticationConfiguration;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.migration.QueryStatus;
@ -35,8 +34,7 @@ public class MigrationWorkflow {
private final String nativeSQLScriptRootPath;
private final ConnectionType connectionType;
private final String extensionSQLScriptRootPath;
@Getter private final PipelineServiceClientConfiguration pipelineServiceClientConfiguration;
@Getter private final AuthenticationConfiguration authenticationConfiguration;
@Getter private final OpenMetadataApplicationConfig openMetadataApplicationConfig;
private final MigrationDAO migrationDAO;
private final Jdbi jdbi;
private final boolean forceMigrations;
@ -48,8 +46,7 @@ public class MigrationWorkflow {
String nativeSQLScriptRootPath,
ConnectionType connectionType,
String extensionSQLScriptRootPath,
PipelineServiceClientConfiguration pipelineServiceClientConfiguration,
AuthenticationConfiguration authenticationConfiguration,
OpenMetadataApplicationConfig config,
boolean forceMigrations) {
this.jdbi = jdbi;
this.migrationDAO = jdbi.onDemand(MigrationDAO.class);
@ -57,8 +54,7 @@ public class MigrationWorkflow {
this.nativeSQLScriptRootPath = nativeSQLScriptRootPath;
this.connectionType = connectionType;
this.extensionSQLScriptRootPath = extensionSQLScriptRootPath;
this.pipelineServiceClientConfiguration = pipelineServiceClientConfiguration;
this.authenticationConfiguration = authenticationConfiguration;
this.openMetadataApplicationConfig = config;
}
public void loadMigrations() {
@ -67,9 +63,8 @@ public class MigrationWorkflow {
getMigrationFiles(
nativeSQLScriptRootPath,
connectionType,
extensionSQLScriptRootPath,
pipelineServiceClientConfiguration,
authenticationConfiguration);
openMetadataApplicationConfig,
extensionSQLScriptRootPath);
// Filter Migrations to Be Run
this.migrations = filterAndGetMigrationsToRun(availableMigrations);
}
@ -87,16 +82,10 @@ public class MigrationWorkflow {
public List<MigrationFile> getMigrationFiles(
String nativeSQLScriptRootPath,
ConnectionType connectionType,
String extensionSQLScriptRootPath,
PipelineServiceClientConfiguration pipelineServiceClientConfiguration,
AuthenticationConfiguration authenticationConfiguration) {
OpenMetadataApplicationConfig config,
String extensionSQLScriptRootPath) {
List<MigrationFile> availableOMNativeMigrations =
getMigrationFilesFromPath(
nativeSQLScriptRootPath,
connectionType,
pipelineServiceClientConfiguration,
authenticationConfiguration,
false);
getMigrationFilesFromPath(nativeSQLScriptRootPath, connectionType, config, false);
// If we only have OM migrations, return them
if (extensionSQLScriptRootPath == null || extensionSQLScriptRootPath.isEmpty()) {
@ -105,12 +94,7 @@ public class MigrationWorkflow {
// Otherwise, fetch the extension migrations and sort the executions
List<MigrationFile> availableExtensionMigrations =
getMigrationFilesFromPath(
extensionSQLScriptRootPath,
connectionType,
pipelineServiceClientConfiguration,
authenticationConfiguration,
true);
getMigrationFilesFromPath(extensionSQLScriptRootPath, connectionType, config, true);
/*
If we create migrations version as:
@ -127,19 +111,10 @@ public class MigrationWorkflow {
public List<MigrationFile> getMigrationFilesFromPath(
String path,
ConnectionType connectionType,
PipelineServiceClientConfiguration pipelineServiceClientConfiguration,
AuthenticationConfiguration authenticationConfiguration,
OpenMetadataApplicationConfig config,
Boolean isExtension) {
return Arrays.stream(Objects.requireNonNull(new File(path).listFiles(File::isDirectory)))
.map(
dir ->
new MigrationFile(
dir,
migrationDAO,
connectionType,
pipelineServiceClientConfiguration,
authenticationConfiguration,
isExtension))
.map(dir -> new MigrationFile(dir, migrationDAO, connectionType, config, isExtension))
.sorted()
.toList();
}

View File

@ -23,7 +23,7 @@ public class Migration extends MigrationProcessImpl {
migratePolicies(handle, collectionDAO);
migrateTestCaseDimension(handle, collectionDAO);
createSystemDICharts();
deleteLegacyDataInsightPipelines(pipelineServiceClient);
deleteLegacyDataInsightPipelines(getPipelineServiceClient());
updateDataInsightsApplication();
migrateAutomatorOwner(handle, collectionDAO);
}

View File

@ -15,6 +15,7 @@ public class Migration extends MigrationProcessImpl {
@Override
@SneakyThrows
public void runDataMigration() {
initializeWorkflowHandler();
updateGovernanceWorkflowDefinitions();
}
}

View File

@ -23,7 +23,7 @@ public class Migration extends MigrationProcessImpl {
migratePolicies(handle, collectionDAO);
migrateTestCaseDimension(handle, collectionDAO);
createSystemDICharts();
deleteLegacyDataInsightPipelines(pipelineServiceClient);
deleteLegacyDataInsightPipelines(getPipelineServiceClient());
updateDataInsightsApplication();
migrateAutomatorOwner(handle, collectionDAO);
}

View File

@ -13,8 +13,7 @@ import org.flywaydb.core.internal.parser.ParsingContext;
import org.flywaydb.core.internal.resource.filesystem.FileSystemResource;
import org.flywaydb.core.internal.sqlscript.SqlStatementIterator;
import org.flywaydb.database.mysql.MySQLParser;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.schema.api.security.AuthenticationConfiguration;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.util.EntityUtil;
@ -23,8 +22,7 @@ public class MigrationFile implements Comparable<MigrationFile> {
public final int[] versionNumbers;
public final String version;
public final ConnectionType connectionType;
public final PipelineServiceClientConfiguration pipelineServiceClientConfiguration;
public final AuthenticationConfiguration authenticationConfiguration;
public final OpenMetadataApplicationConfig openMetadataApplicationConfig;
public final File dir;
public final Boolean isExtension;
@ -40,16 +38,14 @@ public class MigrationFile implements Comparable<MigrationFile> {
File dir,
MigrationDAO migrationDAO,
ConnectionType connectionType,
PipelineServiceClientConfiguration pipelineServiceClientConfiguration,
AuthenticationConfiguration authenticationConfiguration,
OpenMetadataApplicationConfig config,
Boolean isExtension) {
this.dir = dir;
this.isExtension = isExtension;
this.version = dir.getName();
this.connectionType = connectionType;
this.migrationDAO = migrationDAO;
this.pipelineServiceClientConfiguration = pipelineServiceClientConfiguration;
this.authenticationConfiguration = authenticationConfiguration;
this.openMetadataApplicationConfig = config;
this.dbPackageName = connectionType == ConnectionType.MYSQL ? "mysql" : "postgres";
versionNumbers = convertToNumber(version);
schemaChanges = new ArrayList<>();

View File

@ -1193,13 +1193,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
DatasourceConfig.initialize(connType.label);
MigrationWorkflow workflow =
new MigrationWorkflow(
jdbi,
nativeSQLScriptRootPath,
connType,
extensionSQLScriptRootPath,
config.getPipelineServiceClientConfiguration(),
config.getAuthenticationConfiguration(),
force);
jdbi, nativeSQLScriptRootPath, connType, extensionSQLScriptRootPath, config, force);
workflow.loadMigrations();
workflow.printMigrationInfo();
workflow.runMigrationWorkflows();

View File

@ -17,6 +17,7 @@ import static java.lang.String.format;
import es.org.elasticsearch.client.RestClient;
import es.org.elasticsearch.client.RestClientBuilder;
import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.jersey.jackson.JacksonFeature;
import io.dropwizard.testing.ConfigOverride;
import io.dropwizard.testing.ResourceHelpers;
@ -47,8 +48,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.schema.api.security.AuthenticationConfiguration;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.type.IndexMappingLanguage;
import org.openmetadata.service.jdbi3.CollectionDAO;
@ -143,6 +142,14 @@ public abstract class OpenMetadataApplicationTest {
sqlContainer.withUsername("username");
sqlContainer.start();
// Note: Added DataSourceFactory since this configuration is needed by the WorkflowHandler.
DataSourceFactory dataSourceFactory = new DataSourceFactory();
dataSourceFactory.setUrl(sqlContainer.getJdbcUrl());
dataSourceFactory.setUser(sqlContainer.getUsername());
dataSourceFactory.setPassword(sqlContainer.getPassword());
dataSourceFactory.setDriverClass(sqlContainer.getDriverClassName());
config.setDataSourceFactory(dataSourceFactory);
final String flyWayMigrationScriptsLocation =
ResourceHelpers.resourceFilePath(
"db/sql/migrations/flyway/" + sqlContainer.getDriverClassName());
@ -212,8 +219,6 @@ public abstract class OpenMetadataApplicationTest {
ConnectionType.from(sqlContainer.getDriverClassName()),
nativeMigrationScriptsLocation,
extensionMigrationScripsLocation,
null,
null,
false);
createIndices();
APP.before();
@ -226,8 +231,6 @@ public abstract class OpenMetadataApplicationTest {
ConnectionType connType,
String nativeMigrationSQLPath,
String extensionSQLScriptRootPath,
PipelineServiceClientConfiguration pipelineServiceClientConfiguration,
AuthenticationConfiguration authenticationConfiguration,
boolean forceMigrations) {
DatasourceConfig.initialize(connType.label);
MigrationWorkflow workflow =
@ -236,8 +239,7 @@ public abstract class OpenMetadataApplicationTest {
nativeMigrationSQLPath,
connType,
extensionSQLScriptRootPath,
pipelineServiceClientConfiguration,
authenticationConfiguration,
config,
forceMigrations);
// Initialize search repository
SearchRepository searchRepository = new SearchRepository(getEsConfig());

View File

@ -28,7 +28,7 @@ public class MigrationWorkflowTest extends OpenMetadataApplicationTest {
migrationWorkflow =
spy(
new MigrationWorkflow(
jdbi, "nativePath", ConnectionType.MYSQL, "extensionPath", null, null, false));
jdbi, "nativePath", ConnectionType.MYSQL, "extensionPath", null, false));
omMigrationList =
List.of(
@ -36,22 +36,19 @@ public class MigrationWorkflowTest extends OpenMetadataApplicationTest {
new File("/bootstrap/sql/migrations/native/1.1.0"),
null,
ConnectionType.MYSQL,
migrationWorkflow.getPipelineServiceClientConfiguration(),
migrationWorkflow.getAuthenticationConfiguration(),
migrationWorkflow.getOpenMetadataApplicationConfig(),
false),
new MigrationFile(
new File("/bootstrap/sql/migrations/native/1.2.0"),
null,
ConnectionType.MYSQL,
migrationWorkflow.getPipelineServiceClientConfiguration(),
migrationWorkflow.getAuthenticationConfiguration(),
migrationWorkflow.getOpenMetadataApplicationConfig(),
false),
new MigrationFile(
new File("/bootstrap/sql/migrations/native/1.2.1"),
null,
ConnectionType.MYSQL,
migrationWorkflow.getPipelineServiceClientConfiguration(),
migrationWorkflow.getAuthenticationConfiguration(),
migrationWorkflow.getOpenMetadataApplicationConfig(),
false));
collateMigrationList =
@ -60,15 +57,13 @@ public class MigrationWorkflowTest extends OpenMetadataApplicationTest {
new File("/bootstrap-collate/sql/migrations/native/1.1.0-collate"),
null,
ConnectionType.MYSQL,
migrationWorkflow.getPipelineServiceClientConfiguration(),
migrationWorkflow.getAuthenticationConfiguration(),
migrationWorkflow.getOpenMetadataApplicationConfig(),
true),
new MigrationFile(
new File("/bootstrap-collate/sql/migrations/native/1.2.2-collate"),
null,
ConnectionType.MYSQL,
migrationWorkflow.getPipelineServiceClientConfiguration(),
migrationWorkflow.getAuthenticationConfiguration(),
migrationWorkflow.getOpenMetadataApplicationConfig(),
true));
}
@ -77,19 +72,18 @@ public class MigrationWorkflowTest extends OpenMetadataApplicationTest {
Mockito.doReturn(omMigrationList)
.when(migrationWorkflow)
.getMigrationFilesFromPath(
eq("nativePath"), any(ConnectionType.class), eq(null), eq(null), eq(false));
eq("nativePath"), any(ConnectionType.class), eq(null), eq(false));
Mockito.doReturn(collateMigrationList)
.when(migrationWorkflow)
.getMigrationFilesFromPath(
eq("extensionPath"), any(ConnectionType.class), eq(null), eq(null), eq(true));
eq("extensionPath"), any(ConnectionType.class), eq(null), eq(true));
List<MigrationFile> foundList =
migrationWorkflow.getMigrationFiles(
"nativePath",
ConnectionType.MYSQL,
"extensionPath",
migrationWorkflow.getPipelineServiceClientConfiguration(),
migrationWorkflow.getAuthenticationConfiguration());
migrationWorkflow.getOpenMetadataApplicationConfig(),
"extensionPath");
assertEquals(
List.of("1.1.0", "1.1.0-collate", "1.2.0", "1.2.1", "1.2.2-collate"),