2022-11-11 09:59:15 +01:00
|
|
|
from typing import Optional
|
|
|
|
|
2022-07-19 14:51:44 +02:00
|
|
|
from airflow.configuration import conf
|
|
|
|
from pydantic import SecretStr
|
|
|
|
|
2022-11-18 11:35:08 +01:00
|
|
|
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
|
|
|
|
from metadata.generated.schema.security.secrets.secretsManagerProvider import (
|
2022-07-19 14:51:44 +02:00
|
|
|
SecretsManagerProvider,
|
|
|
|
)
|
2022-08-19 16:15:40 +02:00
|
|
|
from metadata.utils.secrets.secrets_manager import SECRET_MANAGER_AIRFLOW_CONF
|
2022-07-19 14:51:44 +02:00
|
|
|
|
|
|
|
|
2022-11-11 09:59:15 +01:00
|
|
|
def build_aws_credentials() -> Optional[AWSCredentials]:
|
2022-08-09 09:00:43 +02:00
|
|
|
if conf.has_section(SECRET_MANAGER_AIRFLOW_CONF):
|
|
|
|
credentials = AWSCredentials(
|
|
|
|
awsRegion=conf.get(SECRET_MANAGER_AIRFLOW_CONF, "aws_region", fallback="")
|
|
|
|
)
|
2022-11-11 09:59:15 +01:00
|
|
|
credentials.awsAccessKeyId = SecretStr(
|
|
|
|
conf.get(SECRET_MANAGER_AIRFLOW_CONF, "aws_access_key_id", fallback="")
|
2022-08-09 09:00:43 +02:00
|
|
|
)
|
|
|
|
credentials.awsSecretAccessKey = SecretStr(
|
|
|
|
conf.get(SECRET_MANAGER_AIRFLOW_CONF, "aws_secret_access_key", fallback="")
|
|
|
|
)
|
|
|
|
return credentials
|
|
|
|
return None
|
2022-07-19 14:51:44 +02:00
|
|
|
|
|
|
|
|
2022-11-11 09:59:15 +01:00
|
|
|
def build_secrets_manager_credentials(
|
|
|
|
secrets_manager: SecretsManagerProvider,
|
|
|
|
) -> Optional[AWSCredentials]:
|
|
|
|
if secrets_manager in [
|
|
|
|
SecretsManagerProvider.aws,
|
|
|
|
SecretsManagerProvider.managed_aws,
|
|
|
|
]:
|
2022-07-19 14:51:44 +02:00
|
|
|
return build_aws_credentials()
|
2022-11-11 09:59:15 +01:00
|
|
|
if secrets_manager in [
|
|
|
|
SecretsManagerProvider.aws_ssm,
|
|
|
|
SecretsManagerProvider.managed_aws_ssm,
|
|
|
|
]:
|
2022-08-19 16:15:40 +02:00
|
|
|
return build_aws_credentials()
|
2022-07-19 14:51:44 +02:00
|
|
|
else:
|
|
|
|
return None
|