diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/run_automation.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/run_automation.py index 78b015b22bb..02bdbd944aa 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/run_automation.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/run_automation.py @@ -61,6 +61,14 @@ def get_fn(blueprint: Blueprint) -> Callable: json_request ) + # we need to instantiate the secret manager in case secrets are passed + SecretsManagerFactory( + automation_workflow.openMetadataServerConnection.secretsManagerProvider, + build_secrets_manager_credentials( + automation_workflow.openMetadataServerConnection.secretsManagerProvider + ), + ) + execute(automation_workflow) return ApiResponse.success( diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/credentials_builder.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/credentials_builder.py index 4bbc2f5520c..1632f95fe47 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/credentials_builder.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/credentials_builder.py @@ -11,10 +11,9 @@ from metadata.utils.secrets.secrets_manager import SECRET_MANAGER_AIRFLOW_CONF def build_aws_credentials() -> Optional[AWSCredentials]: - if conf.has_section(SECRET_MANAGER_AIRFLOW_CONF): - credentials = AWSCredentials( - awsRegion=conf.get(SECRET_MANAGER_AIRFLOW_CONF, "aws_region", fallback="") - ) + aws_region = conf.get(SECRET_MANAGER_AIRFLOW_CONF, "aws_region", fallback=None) + if aws_region: + credentials = AWSCredentials(awsRegion=aws_region) credentials.awsAccessKeyId = conf.get( SECRET_MANAGER_AIRFLOW_CONF, "aws_access_key_id", fallback="" ) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/secrets/SecretsManagerUpdateService.java b/openmetadata-service/src/main/java/org/openmetadata/service/secrets/SecretsManagerUpdateService.java index a7010fce34a..84e17042b96 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/secrets/SecretsManagerUpdateService.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/secrets/SecretsManagerUpdateService.java @@ -23,6 +23,7 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.ServiceConnectionEntityInterface; import org.openmetadata.schema.ServiceEntityInterface; +import org.openmetadata.schema.entity.automations.Workflow; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.schema.entity.teams.User; import org.openmetadata.service.Entity; @@ -31,6 +32,7 @@ import org.openmetadata.service.jdbi3.IngestionPipelineRepository; import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.ServiceEntityRepository; import org.openmetadata.service.jdbi3.UserRepository; +import org.openmetadata.service.jdbi3.WorkflowRepository; import org.openmetadata.service.resources.CollectionRegistry; import org.openmetadata.service.resources.CollectionRegistry.CollectionDetails; import org.openmetadata.service.resources.services.ServiceEntityResource; @@ -51,6 +53,7 @@ public class SecretsManagerUpdateService { private final SecretsManager oldSecretManager; private final UserRepository userRepository; private final IngestionPipelineRepository ingestionPipelineRepository; + private final WorkflowRepository workflowRepository; private final Map, ServiceEntityRepository> connectionTypeRepositoriesMap; @@ -61,6 +64,7 @@ public class SecretsManagerUpdateService { this.userRepository = (UserRepository) Entity.getEntityRepository(Entity.USER); this.ingestionPipelineRepository = (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + this.workflowRepository = (WorkflowRepository) Entity.getEntityRepository(Entity.WORKFLOW); // by default, it is going to be non-managed secrets manager since decrypt is the same for all of them this.oldSecretManager = SecretsManagerFactory.createSecretsManager(null, clusterName); } @@ -69,6 +73,7 @@ public class SecretsManagerUpdateService { updateServices(); updateBotUsers(); updateIngestionPipelines(); + updateWorkflows(); } private void updateServices() { @@ -90,9 +95,17 @@ public class SecretsManagerUpdateService { private void updateIngestionPipelines() { LOG.info( String.format( - "Updating bot users in case of an update on the JSON schema: [%s]", + "Updating ingestion pipelines in case of an update on the JSON schema: [%s]", secretManager.getSecretsManagerProvider().value())); - retrieveIngestionPipelines().forEach(this::updateIngestionPipelines); + retrieveIngestionPipelines().forEach(this::updateIngestionPipeline); + } + + private void updateWorkflows() { + LOG.info( + String.format( + "Updating workflows in case of an update on the JSON schema: [%s]", + secretManager.getSecretsManagerProvider().value())); + retrieveWorkflows().forEach(this::updateWorkflow); } private void updateService(ServiceEntityInterface serviceEntityInterface) { @@ -230,7 +243,22 @@ public class SecretsManagerUpdateService { } } - private void updateIngestionPipelines(IngestionPipeline ingestionPipeline) { + private List retrieveWorkflows() { + try { + return workflowRepository + .listAfter( + null, + EntityUtil.Fields.EMPTY_FIELDS, + new ListFilter(), + workflowRepository.dao.listCount(new ListFilter()), + null) + .getData(); + } catch (IOException e) { + throw new SecretsManagerUpdateException(e.getMessage(), e.getCause()); + } + } + + private void updateIngestionPipeline(IngestionPipeline ingestionPipeline) { try { IngestionPipeline ingestion = ingestionPipelineRepository.dao.findEntityById(ingestionPipeline.getId()); // we have to decrypt using the old secrets manager and encrypt again with the new one @@ -241,4 +269,16 @@ public class SecretsManagerUpdateService { throw new SecretsManagerUpdateException(e.getMessage(), e.getCause()); } } + + private void updateWorkflow(Workflow workflow) { + try { + Workflow workflowObject = workflowRepository.dao.findEntityById(workflow.getId()); + // we have to decrypt using the old secrets manager and encrypt again with the new one + workflowObject = oldSecretManager.encryptOrDecryptWorkflow(workflowObject, false); + workflowObject = secretManager.encryptOrDecryptWorkflow(workflowObject, true); + ingestionPipelineRepository.dao.update(workflowObject); + } catch (IOException e) { + throw new SecretsManagerUpdateException(e.getMessage(), e.getCause()); + } + } }