Fix: Secrets Manager not working with Airflow AWS config (#11459)

* Fix: Secrets Manager not working with Airflow config

* Address Python Checkstyle

* Minor change
This commit is contained in:
Nahuel 2023-05-08 09:54:31 +02:00 committed by GitHub
parent 057deeb0ca
commit 41ea49d11e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 7 deletions

View File

@ -61,6 +61,14 @@ def get_fn(blueprint: Blueprint) -> Callable:
json_request
)
# we need to instantiate the secret manager in case secrets are passed
SecretsManagerFactory(
automation_workflow.openMetadataServerConnection.secretsManagerProvider,
build_secrets_manager_credentials(
automation_workflow.openMetadataServerConnection.secretsManagerProvider
),
)
execute(automation_workflow)
return ApiResponse.success(

View File

@ -11,10 +11,9 @@ from metadata.utils.secrets.secrets_manager import SECRET_MANAGER_AIRFLOW_CONF
def build_aws_credentials() -> Optional[AWSCredentials]:
if conf.has_section(SECRET_MANAGER_AIRFLOW_CONF):
credentials = AWSCredentials(
awsRegion=conf.get(SECRET_MANAGER_AIRFLOW_CONF, "aws_region", fallback="")
)
aws_region = conf.get(SECRET_MANAGER_AIRFLOW_CONF, "aws_region", fallback=None)
if aws_region:
credentials = AWSCredentials(awsRegion=aws_region)
credentials.awsAccessKeyId = conf.get(
SECRET_MANAGER_AIRFLOW_CONF, "aws_access_key_id", fallback=""
)

View File

@ -23,6 +23,7 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.ServiceConnectionEntityInterface;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.service.Entity;
@ -31,6 +32,7 @@ import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.ServiceEntityRepository;
import org.openmetadata.service.jdbi3.UserRepository;
import org.openmetadata.service.jdbi3.WorkflowRepository;
import org.openmetadata.service.resources.CollectionRegistry;
import org.openmetadata.service.resources.CollectionRegistry.CollectionDetails;
import org.openmetadata.service.resources.services.ServiceEntityResource;
@ -51,6 +53,7 @@ public class SecretsManagerUpdateService {
private final SecretsManager oldSecretManager;
private final UserRepository userRepository;
private final IngestionPipelineRepository ingestionPipelineRepository;
private final WorkflowRepository workflowRepository;
private final Map<Class<? extends ServiceConnectionEntityInterface>, ServiceEntityRepository<?, ?>>
connectionTypeRepositoriesMap;
@ -61,6 +64,7 @@ public class SecretsManagerUpdateService {
this.userRepository = (UserRepository) Entity.getEntityRepository(Entity.USER);
this.ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
this.workflowRepository = (WorkflowRepository) Entity.getEntityRepository(Entity.WORKFLOW);
// by default, it is going to be non-managed secrets manager since decrypt is the same for all of them
this.oldSecretManager = SecretsManagerFactory.createSecretsManager(null, clusterName);
}
@ -69,6 +73,7 @@ public class SecretsManagerUpdateService {
updateServices();
updateBotUsers();
updateIngestionPipelines();
updateWorkflows();
}
private void updateServices() {
@ -90,9 +95,17 @@ public class SecretsManagerUpdateService {
private void updateIngestionPipelines() {
LOG.info(
String.format(
"Updating bot users in case of an update on the JSON schema: [%s]",
"Updating ingestion pipelines in case of an update on the JSON schema: [%s]",
secretManager.getSecretsManagerProvider().value()));
retrieveIngestionPipelines().forEach(this::updateIngestionPipelines);
retrieveIngestionPipelines().forEach(this::updateIngestionPipeline);
}
private void updateWorkflows() {
LOG.info(
String.format(
"Updating workflows in case of an update on the JSON schema: [%s]",
secretManager.getSecretsManagerProvider().value()));
retrieveWorkflows().forEach(this::updateWorkflow);
}
private void updateService(ServiceEntityInterface serviceEntityInterface) {
@ -230,7 +243,22 @@ public class SecretsManagerUpdateService {
}
}
private void updateIngestionPipelines(IngestionPipeline ingestionPipeline) {
private List<Workflow> retrieveWorkflows() {
try {
return workflowRepository
.listAfter(
null,
EntityUtil.Fields.EMPTY_FIELDS,
new ListFilter(),
workflowRepository.dao.listCount(new ListFilter()),
null)
.getData();
} catch (IOException e) {
throw new SecretsManagerUpdateException(e.getMessage(), e.getCause());
}
}
private void updateIngestionPipeline(IngestionPipeline ingestionPipeline) {
try {
IngestionPipeline ingestion = ingestionPipelineRepository.dao.findEntityById(ingestionPipeline.getId());
// we have to decrypt using the old secrets manager and encrypt again with the new one
@ -241,4 +269,16 @@ public class SecretsManagerUpdateService {
throw new SecretsManagerUpdateException(e.getMessage(), e.getCause());
}
}
private void updateWorkflow(Workflow workflow) {
try {
Workflow workflowObject = workflowRepository.dao.findEntityById(workflow.getId());
// we have to decrypt using the old secrets manager and encrypt again with the new one
workflowObject = oldSecretManager.encryptOrDecryptWorkflow(workflowObject, false);
workflowObject = secretManager.encryptOrDecryptWorkflow(workflowObject, true);
ingestionPipelineRepository.dao.update(workflowObject);
} catch (IOException e) {
throw new SecretsManagerUpdateException(e.getMessage(), e.getCause());
}
}
}