Minor fixes related to Secrets Manager implementation (#7046)

* Minor fixes related to Secrets Manager implementation

* Fix failing test
This commit is contained in:
Nahuel 2022-08-31 14:14:55 +02:00 committed by GitHub
parent d791fe8289
commit 0762e39172
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 221 additions and 173 deletions

View File

@ -13,40 +13,37 @@
package org.openmetadata.catalog.secrets;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.openmetadata.catalog.airflow.AirflowConfiguration;
import org.openmetadata.catalog.airflow.AuthConfiguration;
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.catalog.entity.services.ServiceType;
import org.openmetadata.catalog.exception.InvalidServiceConnectionException;
import org.openmetadata.catalog.exception.SecretsManagerException;
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
import org.apache.logging.log4j.util.Strings;
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
import org.openmetadata.catalog.util.JsonUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
public abstract class AWSBasedSecretsManager extends SecretsManager {
public abstract class AWSBasedSecretsManager extends ThirdPartySecretsManager {
public static final String AUTH_PROVIDER_SECRET_ID_PREFIX = "auth-provider";
public static final String DATABASE_METADATA_PIPELINE_SECRET_ID_PREFIX = "database-metadata-pipeline";
public static final String TEST_CONNECTION_TEMP_SECRET_ID_PREFIX = "test-connection-temp";
public static final String ACCESS_KEY_ID = "accessKeyId";
public static final String SECRET_ACCESS_KEY = "secretAccessKey";
public static final String REGION = "region";
public static final String NULL_SECRET_STRING = "null";
protected AWSBasedSecretsManager(
SecretsManagerProvider awsProvider, SecretsManagerConfiguration config, String clusterPrefix) {
super(awsProvider, clusterPrefix);
// initialize the secret client depending on the SecretsManagerConfiguration passed
if (config != null && config.getParameters() != null) {
if (config != null
&& config.getParameters() != null
&& !Strings.isBlank(config.getParameters().getOrDefault(REGION, ""))) {
String region = config.getParameters().getOrDefault(REGION, "");
String accessKeyId = config.getParameters().getOrDefault(ACCESS_KEY_ID, "");
String secretAccessKey = config.getParameters().getOrDefault(SECRET_ACCESS_KEY, "");
StaticCredentialsProvider staticCredentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
initClientWithCredentials(region, staticCredentialsProvider);
AwsCredentialsProvider credentialsProvider;
if (Strings.isBlank(accessKeyId) && Strings.isBlank(secretAccessKey)) {
credentialsProvider = DefaultCredentialsProvider.create();
} else {
credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
}
initClientWithCredentials(region, credentialsProvider);
} else {
// initialized with the region loaded from the DefaultAwsRegionProviderChain and credentials loaded from the
// DefaultCredentialsProvider
@ -56,129 +53,5 @@ public abstract class AWSBasedSecretsManager extends SecretsManager {
abstract void initClientWithoutCredentials();
abstract void initClientWithCredentials(String region, StaticCredentialsProvider staticCredentialsProvider);
@Override
public Object encryptOrDecryptServiceConnectionConfig(
Object connectionConfig, String connectionType, String connectionName, ServiceType serviceType, boolean encrypt) {
String secretName = buildSecretId("service", serviceType.value(), connectionType, connectionName);
try {
if (encrypt) {
String connectionConfigJson = JsonUtils.pojoToJson(connectionConfig);
if (connectionConfigJson != null) {
upsertSecret(secretName, connectionConfigJson);
}
return null;
} else {
Class<?> clazz = createConnectionConfigClass(connectionType, extractConnectionPackageName(serviceType));
return JsonUtils.readValue(getSecret(secretName), clazz);
}
} catch (ClassNotFoundException ex) {
throw InvalidServiceConnectionException.byMessage(
connectionType, String.format("Failed to construct connection instance of %s", connectionType));
} catch (Exception e) {
throw SecretsManagerException.byMessage(getClass().getSimpleName(), secretName, e.getMessage());
}
}
@Override
public Object storeTestConnectionObject(TestServiceConnection testServiceConnection) {
String secretName =
buildSecretId(TEST_CONNECTION_TEMP_SECRET_ID_PREFIX, testServiceConnection.getConnectionType().value());
try {
String connectionConfigJson = JsonUtils.pojoToJson(testServiceConnection.getConnection());
upsertSecret(secretName, connectionConfigJson);
} catch (JsonProcessingException e) {
throw new SecretsManagerException("Error parsing to JSON the service connection config: " + e.getMessage());
}
return null;
}
@Override
public AirflowConfiguration encryptAirflowConnection(AirflowConfiguration airflowConfiguration) {
OpenMetadataServerConnection.AuthProvider authProvider =
OpenMetadataServerConnection.AuthProvider.fromValue(airflowConfiguration.getAuthProvider());
AuthConfiguration authConfig = airflowConfiguration.getAuthConfig();
String authProviderJson = null;
try {
switch (authProvider) {
case GOOGLE:
authProviderJson = JsonUtils.pojoToJson(authConfig.getGoogle());
break;
case AUTH_0:
authProviderJson = JsonUtils.pojoToJson(authConfig.getAuth0());
break;
case OKTA:
authProviderJson = JsonUtils.pojoToJson(authConfig.getOkta());
break;
case AZURE:
authProviderJson = JsonUtils.pojoToJson(authConfig.getAzure());
break;
case CUSTOM_OIDC:
authProviderJson = JsonUtils.pojoToJson(authConfig.getCustomOidc());
break;
case OPENMETADATA:
authProviderJson = JsonUtils.pojoToJson(authConfig.getOpenmetadata());
break;
case NO_AUTH:
break;
default:
throw new IllegalArgumentException("OpenMetadata doesn't support auth provider type " + authProvider.value());
}
} catch (JsonProcessingException e) {
throw new SecretsManagerException("Error parsing to JSON the auth config :" + e.getMessage());
}
if (authProviderJson != null) {
upsertSecret(buildSecretId(AUTH_PROVIDER_SECRET_ID_PREFIX, authProvider.value()), authProviderJson);
}
airflowConfiguration.setAuthConfig(null);
return airflowConfiguration;
}
@Override
public Object encryptOrDecryptDbtConfigSource(Object dbtConfigSource, String serviceName, boolean encrypt) {
String secretName = buildSecretId(DATABASE_METADATA_PIPELINE_SECRET_ID_PREFIX, serviceName);
try {
if (encrypt) {
String dbtConfigSourceJson = JsonUtils.pojoToJson(dbtConfigSource);
upsertSecret(secretName, dbtConfigSourceJson);
return null;
} else {
String dbtConfigSourceJson = getSecret(secretName);
return NULL_SECRET_STRING.equals(dbtConfigSourceJson)
? null
: JsonUtils.readValue(dbtConfigSourceJson, Object.class);
}
} catch (Exception e) {
throw SecretsManagerException.byMessage(getClass().getSimpleName(), secretName, e.getMessage());
}
}
@Override
protected Object decryptAuthProviderConfig(
OpenMetadataServerConnection.AuthProvider authProvider, AuthConfiguration authConfig) {
return null;
}
private void upsertSecret(String secretName, String secretValue) {
if (existSecret(secretName)) {
updateSecret(secretName, secretValue != null ? secretValue : NULL_SECRET_STRING);
} else {
storeSecret(secretName, secretValue != null ? secretValue : NULL_SECRET_STRING);
}
}
public boolean existSecret(String secretName) {
try {
return getSecret(secretName) != null;
} catch (Exception e) {
return false;
}
}
abstract void storeSecret(String secretName, String secretValue);
abstract void updateSecret(String secretName, String secretValue);
abstract String getSecret(String secretName);
abstract void initClientWithCredentials(String region, AwsCredentialsProvider staticCredentialsProvider);
}

View File

@ -15,7 +15,7 @@ package org.openmetadata.catalog.secrets;
import static org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider.AWS_SSM;
import com.google.common.annotations.VisibleForTesting;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.ssm.SsmClient;
import software.amazon.awssdk.services.ssm.model.GetParameterRequest;
@ -38,7 +38,7 @@ public class AWSSSMSecretsManager extends AWSBasedSecretsManager {
}
@Override
void initClientWithCredentials(String region, StaticCredentialsProvider staticCredentialsProvider) {
void initClientWithCredentials(String region, AwsCredentialsProvider staticCredentialsProvider) {
this.ssmClient =
SsmClient.builder().region(Region.of(region)).credentialsProvider(staticCredentialsProvider).build();
}

View File

@ -17,7 +17,7 @@ import static org.openmetadata.catalog.services.connections.metadata.SecretsMana
import com.google.common.annotations.VisibleForTesting;
import java.util.Objects;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
import software.amazon.awssdk.services.secretsmanager.model.CreateSecretRequest;
@ -40,7 +40,7 @@ public class AWSSecretsManager extends AWSBasedSecretsManager {
}
@Override
void initClientWithCredentials(String region, StaticCredentialsProvider staticCredentialsProvider) {
void initClientWithCredentials(String region, AwsCredentialsProvider staticCredentialsProvider) {
this.secretsClient =
SecretsManagerClient.builder().region(Region.of(region)).credentialsProvider(staticCredentialsProvider).build();
}

View File

@ -0,0 +1,161 @@
/*
* Copyright 2022 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.secrets;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.openmetadata.catalog.airflow.AirflowConfiguration;
import org.openmetadata.catalog.airflow.AuthConfiguration;
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.catalog.entity.services.ServiceType;
import org.openmetadata.catalog.exception.InvalidServiceConnectionException;
import org.openmetadata.catalog.exception.SecretsManagerException;
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
import org.openmetadata.catalog.util.JsonUtils;
public abstract class ThirdPartySecretsManager extends SecretsManager {
public static final String AUTH_PROVIDER_SECRET_ID_PREFIX = "auth-provider";
public static final String DATABASE_METADATA_PIPELINE_SECRET_ID_PREFIX = "database-metadata-pipeline";
public static final String TEST_CONNECTION_TEMP_SECRET_ID_PREFIX = "test-connection-temp";
public static final String NULL_SECRET_STRING = "null";
protected ThirdPartySecretsManager(SecretsManagerProvider secretsManagerProvider, String clusterPrefix) {
super(secretsManagerProvider, clusterPrefix);
}
@Override
public Object encryptOrDecryptServiceConnectionConfig(
Object connectionConfig, String connectionType, String connectionName, ServiceType serviceType, boolean encrypt) {
String secretName = buildSecretId("service", serviceType.value(), connectionType, connectionName);
try {
if (encrypt) {
String connectionConfigJson = JsonUtils.pojoToJson(connectionConfig);
if (connectionConfigJson != null) {
upsertSecret(secretName, connectionConfigJson);
}
return null;
} else {
Class<?> clazz = createConnectionConfigClass(connectionType, extractConnectionPackageName(serviceType));
return JsonUtils.readValue(getSecret(secretName), clazz);
}
} catch (ClassNotFoundException ex) {
throw InvalidServiceConnectionException.byMessage(
connectionType, String.format("Failed to construct connection instance of %s", connectionType));
} catch (Exception e) {
throw SecretsManagerException.byMessage(getClass().getSimpleName(), secretName, e.getMessage());
}
}
@Override
public Object storeTestConnectionObject(TestServiceConnection testServiceConnection) {
String secretName =
buildSecretId(TEST_CONNECTION_TEMP_SECRET_ID_PREFIX, testServiceConnection.getConnectionType().value());
try {
String connectionConfigJson = JsonUtils.pojoToJson(testServiceConnection.getConnection());
upsertSecret(secretName, connectionConfigJson);
} catch (JsonProcessingException e) {
throw new SecretsManagerException("Error parsing to JSON the service connection config: " + e.getMessage());
}
return null;
}
@Override
public AirflowConfiguration encryptAirflowConnection(AirflowConfiguration airflowConfiguration) {
OpenMetadataServerConnection.AuthProvider authProvider =
OpenMetadataServerConnection.AuthProvider.fromValue(airflowConfiguration.getAuthProvider());
AuthConfiguration authConfig = airflowConfiguration.getAuthConfig();
String authProviderJson = null;
try {
switch (authProvider) {
case GOOGLE:
authProviderJson = JsonUtils.pojoToJson(authConfig.getGoogle());
break;
case AUTH_0:
authProviderJson = JsonUtils.pojoToJson(authConfig.getAuth0());
break;
case OKTA:
authProviderJson = JsonUtils.pojoToJson(authConfig.getOkta());
break;
case AZURE:
authProviderJson = JsonUtils.pojoToJson(authConfig.getAzure());
break;
case CUSTOM_OIDC:
authProviderJson = JsonUtils.pojoToJson(authConfig.getCustomOidc());
break;
case OPENMETADATA:
authProviderJson = JsonUtils.pojoToJson(authConfig.getOpenmetadata());
break;
case NO_AUTH:
break;
default:
throw new IllegalArgumentException("OpenMetadata doesn't support auth provider type " + authProvider.value());
}
} catch (JsonProcessingException e) {
throw new SecretsManagerException("Error parsing to JSON the auth config :" + e.getMessage());
}
if (authProviderJson != null) {
upsertSecret(buildSecretId(AUTH_PROVIDER_SECRET_ID_PREFIX, authProvider.value()), authProviderJson);
}
airflowConfiguration.setAuthConfig(null);
return airflowConfiguration;
}
@Override
public Object encryptOrDecryptDbtConfigSource(Object dbtConfigSource, String serviceName, boolean encrypt) {
String secretName = buildSecretId(DATABASE_METADATA_PIPELINE_SECRET_ID_PREFIX, serviceName);
try {
if (encrypt) {
String dbtConfigSourceJson = JsonUtils.pojoToJson(dbtConfigSource);
upsertSecret(secretName, dbtConfigSourceJson);
return null;
} else {
String dbtConfigSourceJson = getSecret(secretName);
return NULL_SECRET_STRING.equals(dbtConfigSourceJson)
? null
: JsonUtils.readValue(dbtConfigSourceJson, Object.class);
}
} catch (Exception e) {
throw SecretsManagerException.byMessage(getClass().getSimpleName(), secretName, e.getMessage());
}
}
@Override
protected Object decryptAuthProviderConfig(
OpenMetadataServerConnection.AuthProvider authProvider, AuthConfiguration authConfig) {
return null;
}
private void upsertSecret(String secretName, String secretValue) {
if (existSecret(secretName)) {
updateSecret(secretName, secretValue != null ? secretValue : NULL_SECRET_STRING);
} else {
storeSecret(secretName, secretValue != null ? secretValue : NULL_SECRET_STRING);
}
}
public boolean existSecret(String secretName) {
try {
return getSecret(secretName) != null;
} catch (Exception e) {
return false;
}
}
abstract void storeSecret(String secretName, String secretValue);
abstract void updateSecret(String secretName, String secretValue);
abstract String getSecret(String secretName);
}

View File

@ -209,13 +209,12 @@ fernetConfiguration:
fernetKey: ${FERNET_KEY:-jJ/9sz0g0OHxsfxOoSfdFdmk3ysNmPRnH3TUAbz3IHA=}
secretsManagerConfiguration:
secretsManager: ${SECRET_MANAGER:-local} # Possible values are "local", "aws"
# secretsManager: aws
secretsManager: ${SECRET_MANAGER:-local} # Possible values are "local", "aws", "aws-ssm"
# it will use the default auth provider for the secrets' manager service if parameters are not set
# parameters:
# region:
# accessKeyId:
# secretAccessKey:
parameters:
region: ${OM_SM_REGION:-""}
accessKeyId: ${OM_SM_ACCESS_KEY_ID:-""}
secretAccessKey: ${OM_SM_ACCESS_KEY:-""}
health:
delayedShutdownHandlerEnabled: true

View File

@ -11,6 +11,7 @@
import importlib
import time
import traceback
from typing import Type, TypeVar
import click
@ -279,16 +280,23 @@ class Workflow:
if service_type is not ServiceType.Metadata and not self._is_sample_source(
self.config.source.type
):
service_name = self.config.source.serviceName
metadata = OpenMetadata(config=metadata_config)
service = metadata.get_by_name(
get_service_class_from_service_type(service_type),
self.config.source.serviceName,
)
if service:
self.config.source.serviceConnection = (
metadata.secrets_manager_client.retrieve_service_connection(
service, service_type.name.lower()
try:
service = metadata.get_by_name(
get_service_class_from_service_type(service_type),
service_name,
)
if service:
self.config.source.serviceConnection = (
metadata.secrets_manager_client.retrieve_service_connection(
service, service_type.name.lower()
)
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting dbtConfigSource for service name [{service_name}] using the secrets manager provider [{metadata.config.secretsManagerProvider}]: {exc}"
)
def _retrieve_dbt_config_source_if_needed(
@ -308,17 +316,23 @@ class Workflow:
and config.type == DatabaseMetadataConfigType.DatabaseMetadata
):
metadata = OpenMetadata(config=metadata_config)
dbt_config_source: object = (
metadata.secrets_manager_client.retrieve_dbt_source_config(
self.config.source.sourceConfig,
self.config.source.serviceName,
try:
dbt_config_source: object = (
metadata.secrets_manager_client.retrieve_dbt_source_config(
self.config.source.sourceConfig,
self.config.source.serviceName,
)
)
)
if dbt_config_source and self.config.source.sourceConfig.config:
config_dict = self.config.source.sourceConfig.config.dict()
config_dict["dbtConfigSource"] = dbt_config_source
self.config.source.sourceConfig.config = (
DatabaseServiceMetadataPipeline.parse_obj(config_dict)
if dbt_config_source:
config_dict = config.dict()
config_dict["dbtConfigSource"] = dbt_config_source
self.config.source.sourceConfig.config = (
DatabaseServiceMetadataPipeline.parse_obj(config_dict)
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting dbtConfigSource for config [{config}] using the secrets manager provider [{metadata.config.secretsManagerProvider}]: {exc}"
)
@staticmethod

View File

@ -70,7 +70,6 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.dbt_source import DBTMixin
from metadata.utils import fqn
from metadata.utils.dbt_config import get_dbt_details
from metadata.utils.helpers import pretty_print_time_duration
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()

View File

@ -550,7 +550,9 @@ const AddIngestion = ({
...data,
airflowConfig: {
...data.airflowConfig,
scheduleInterval: repeatFrequency,
scheduleInterval: isEmpty(repeatFrequency)
? undefined
: repeatFrequency,
},
loggerLevel: enableDebugLog ? LogLevels.Debug : LogLevels.Info,
sourceConfig: {