diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v150/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v150/Migration.java index f5ad63dd2df..260a3c17613 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v150/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v150/Migration.java @@ -2,6 +2,7 @@ package org.openmetadata.service.migration.mysql.v150; import static org.openmetadata.service.migration.utils.v150.MigrationUtil.createSystemDICharts; import static org.openmetadata.service.migration.utils.v150.MigrationUtil.deleteLegacyDataInsightPipelines; +import static org.openmetadata.service.migration.utils.v150.MigrationUtil.migrateAutomatorOwner; import static org.openmetadata.service.migration.utils.v150.MigrationUtil.migrateTestCaseDimension; import lombok.SneakyThrows; @@ -20,5 +21,6 @@ public class Migration extends MigrationProcessImpl { migrateTestCaseDimension(handle, collectionDAO); createSystemDICharts(); deleteLegacyDataInsightPipelines(pipelineServiceClient); + migrateAutomatorOwner(handle, collectionDAO); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v150/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v150/Migration.java index 956951cd9e5..d5b5e6c9097 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v150/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v150/Migration.java @@ -2,6 +2,7 @@ package org.openmetadata.service.migration.postgres.v150; import static org.openmetadata.service.migration.utils.v150.MigrationUtil.createSystemDICharts; import static org.openmetadata.service.migration.utils.v150.MigrationUtil.deleteLegacyDataInsightPipelines; +import static org.openmetadata.service.migration.utils.v150.MigrationUtil.migrateAutomatorOwner; import static org.openmetadata.service.migration.utils.v150.MigrationUtil.migrateTestCaseDimension; import lombok.SneakyThrows; @@ -20,5 +21,6 @@ public class Migration extends MigrationProcessImpl { migrateTestCaseDimension(handle, collectionDAO); createSystemDICharts(); deleteLegacyDataInsightPipelines(pipelineServiceClient); + migrateAutomatorOwner(handle, collectionDAO); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v150/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v150/MigrationUtil.java index 78f10da0cfd..fbcffaf05e4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v150/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v150/MigrationUtil.java @@ -3,6 +3,11 @@ package org.openmetadata.service.migration.utils.v150; import java.util.Map; import java.util.Optional; import java.util.UUID; +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.core.Handle; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; @@ -23,6 +28,78 @@ import org.openmetadata.service.util.JsonUtils; @Slf4j public class MigrationUtil { + + private static final String QUERY_AUTOMATOR = + "SELECT json FROM ingestion_pipeline_entity where appType = 'Automator'"; + private static final String ADD_OWNER_ACTION = "AddOwnerAction"; + + /** + * We need to update the `AddOwnerAction` action in the automator to have a list of owners + */ + public static void migrateAutomatorOwner(Handle handle, CollectionDAO collectionDAO) { + try { + + handle + .createQuery(QUERY_AUTOMATOR) + .mapToMap() + .forEach( + row -> { + try { + // Prepare the current json objects + JsonObject json = JsonUtils.readJson((String) row.get("json")).asJsonObject(); + JsonObject sourceConfig = json.getJsonObject("sourceConfig"); + JsonObject config = sourceConfig.getJsonObject("config"); + JsonObject appConfig = config.getJsonObject("appConfig"); + JsonArray actions = appConfig.getJsonArray("actions"); + + JsonArrayBuilder updatedActions = Json.createArrayBuilder(); + + // update the AddOwnerAction payloads to have a list of owners + actions.forEach( + action -> { + JsonObject actionObj = (JsonObject) action; + if (ADD_OWNER_ACTION.equals(actionObj.getString("type"))) { + JsonObject owner = actionObj.getJsonObject("owner"); + JsonArrayBuilder owners = Json.createArrayBuilder(); + owners.add(owner); + actionObj = + Json.createObjectBuilder(actionObj) + .add("owners", owners) + .remove("owner") + .build(); + } + updatedActions.add(actionObj); + }); + + // Recreate the json object + JsonObjectBuilder updatedAppConfig = + Json.createObjectBuilder(appConfig).add("actions", updatedActions); + + JsonObjectBuilder updatedConfig = + Json.createObjectBuilder(config).add("appConfig", updatedAppConfig); + + JsonObjectBuilder updatedSourceConfig = + Json.createObjectBuilder(sourceConfig).add("config", updatedConfig); + + JsonObject finalJsonObject = + Json.createObjectBuilder(json) + .add("sourceConfig", updatedSourceConfig) + .build(); + + // Update the Ingestion Pipeline + IngestionPipeline ingestionPipeline = + JsonUtils.readValue(finalJsonObject.toString(), IngestionPipeline.class); + collectionDAO.ingestionPipelineDAO().update(ingestionPipeline); + + } catch (Exception ex) { + LOG.warn(String.format("Error updating automator [%s] due to [%s]", row, ex)); + } + }); + } catch (Exception ex) { + LOG.warn("Error running the automator migration ", ex); + } + } + public static void deleteLegacyDataInsightPipelines( PipelineServiceClientInterface pipelineServiceClient) { // Delete Data Insights Pipeline