mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-25 06:28:22 +00:00
Fix#5917: Implementation of temp secret for testing connection (#6832)
* Implementation of temp secret for testing connection * Fix tests
This commit is contained in:
parent
75a80d237b
commit
4e176fbc66
@ -492,6 +492,11 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Valid TestServiceConnection testServiceConnection) {
|
||||
testServiceConnection =
|
||||
testServiceConnection
|
||||
.withConnection(secretsManager.storeTestConnectionObject(testServiceConnection))
|
||||
.withSecretsManagerProvider(secretsManager.getSecretsManagerProvider())
|
||||
.withClusterName(catalogApplicationConfig.getClusterName());
|
||||
HttpResponse<String> response = pipelineServiceClient.testConnection(testServiceConnection);
|
||||
return Response.status(200, response.body()).build();
|
||||
}
|
||||
|
||||
@ -14,30 +14,30 @@
|
||||
package org.openmetadata.catalog.secrets;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import java.util.Locale;
|
||||
import org.openmetadata.catalog.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.catalog.airflow.AuthConfiguration;
|
||||
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.catalog.entity.services.ServiceType;
|
||||
import org.openmetadata.catalog.exception.InvalidServiceConnectionException;
|
||||
import org.openmetadata.catalog.exception.SecretsManagerException;
|
||||
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
|
||||
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
|
||||
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
|
||||
|
||||
public abstract class AWSBasedSecretsManager extends SecretsManager {
|
||||
|
||||
public static final String AUTH_PROVIDER_SECRET_ID_SUFFIX = "auth-provider";
|
||||
public static final String AUTH_PROVIDER_SECRET_ID_PREFIX = "auth-provider";
|
||||
public static final String DATABASE_METADATA_PIPELINE_SECRET_ID_PREFIX = "database-metadata-pipeline";
|
||||
public static final String TEST_CONNECTION_TEMP_SECRET_ID_PREFIX = "test-connection-temp";
|
||||
public static final String ACCESS_KEY_ID = "accessKeyId";
|
||||
public static final String SECRET_ACCESS_KEY = "secretAccessKey";
|
||||
public static final String REGION = "region";
|
||||
public static final String DATABASE_METADATA_PIPELINE_SECRET_ID_SUFFIX = "database-metadata-pipeline";
|
||||
public static final String NULL_SECRET_STRING = "null";
|
||||
|
||||
protected AWSBasedSecretsManager(
|
||||
OpenMetadataServerConnection.SecretsManagerProvider awsProvider,
|
||||
SecretsManagerConfiguration config,
|
||||
String clusterPrefix) {
|
||||
SecretsManagerProvider awsProvider, SecretsManagerConfiguration config, String clusterPrefix) {
|
||||
super(awsProvider, clusterPrefix);
|
||||
// initialize the secret client depending on the SecretsManagerConfiguration passed
|
||||
if (config != null && config.getParameters() != null) {
|
||||
@ -61,8 +61,7 @@ public abstract class AWSBasedSecretsManager extends SecretsManager {
|
||||
@Override
|
||||
public Object encryptOrDecryptServiceConnectionConfig(
|
||||
Object connectionConfig, String connectionType, String connectionName, ServiceType serviceType, boolean encrypt) {
|
||||
String secretName =
|
||||
buildSecretId("service", serviceType.value().toLowerCase(Locale.ROOT), connectionType, connectionName);
|
||||
String secretName = buildSecretId("service", serviceType.value(), connectionType, connectionName);
|
||||
try {
|
||||
if (encrypt) {
|
||||
String connectionConfigJson = JsonUtils.pojoToJson(connectionConfig);
|
||||
@ -82,6 +81,19 @@ public abstract class AWSBasedSecretsManager extends SecretsManager {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object storeTestConnectionObject(TestServiceConnection testServiceConnection) {
|
||||
String secretName =
|
||||
buildSecretId(TEST_CONNECTION_TEMP_SECRET_ID_PREFIX, testServiceConnection.getConnectionType().value());
|
||||
try {
|
||||
String connectionConfigJson = JsonUtils.pojoToJson(testServiceConnection.getConnection());
|
||||
upsertSecret(secretName, connectionConfigJson);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new SecretsManagerException("Error parsing to JSON the service connection config: " + e.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AirflowConfiguration encryptAirflowConnection(AirflowConfiguration airflowConfiguration) {
|
||||
OpenMetadataServerConnection.AuthProvider authProvider =
|
||||
@ -117,7 +129,7 @@ public abstract class AWSBasedSecretsManager extends SecretsManager {
|
||||
throw new SecretsManagerException("Error parsing to JSON the auth config :" + e.getMessage());
|
||||
}
|
||||
if (authProviderJson != null) {
|
||||
upsertSecret(buildSecretId(AUTH_PROVIDER_SECRET_ID_SUFFIX, authProvider.value()), authProviderJson);
|
||||
upsertSecret(buildSecretId(AUTH_PROVIDER_SECRET_ID_PREFIX, authProvider.value()), authProviderJson);
|
||||
}
|
||||
airflowConfiguration.setAuthConfig(null);
|
||||
return airflowConfiguration;
|
||||
@ -125,7 +137,7 @@ public abstract class AWSBasedSecretsManager extends SecretsManager {
|
||||
|
||||
@Override
|
||||
public Object encryptOrDecryptDbtConfigSource(Object dbtConfigSource, String serviceName, boolean encrypt) {
|
||||
String secretName = buildSecretId(DATABASE_METADATA_PIPELINE_SECRET_ID_SUFFIX, serviceName);
|
||||
String secretName = buildSecretId(DATABASE_METADATA_PIPELINE_SECRET_ID_PREFIX, serviceName);
|
||||
try {
|
||||
if (encrypt) {
|
||||
String dbtConfigSourceJson = JsonUtils.pojoToJson(dbtConfigSource);
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
*/
|
||||
package org.openmetadata.catalog.secrets;
|
||||
|
||||
import static org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.SecretsManagerProvider.AWS_SSM;
|
||||
import static org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider.AWS_SSM;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
|
||||
|
||||
@ -13,7 +13,7 @@
|
||||
|
||||
package org.openmetadata.catalog.secrets;
|
||||
|
||||
import static org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.SecretsManagerProvider.AWS;
|
||||
import static org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider.AWS;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.util.Objects;
|
||||
|
||||
@ -13,13 +13,14 @@
|
||||
|
||||
package org.openmetadata.catalog.secrets;
|
||||
|
||||
import static org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.SecretsManagerProvider.LOCAL;
|
||||
import static org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider.LOCAL;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import org.openmetadata.catalog.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.catalog.airflow.AuthConfiguration;
|
||||
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.catalog.entity.services.ServiceType;
|
||||
import org.openmetadata.catalog.exception.InvalidServiceConnectionException;
|
||||
import org.openmetadata.catalog.fernet.Fernet;
|
||||
@ -89,6 +90,11 @@ public class LocalSecretsManager extends SecretsManager {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object storeTestConnectionObject(TestServiceConnection testServiceConnection) {
|
||||
return testServiceConnection.getConnection();
|
||||
}
|
||||
|
||||
private void encryptOrDecryptField(Object connConfig, String field, Class<?> clazz, boolean encrypt)
|
||||
throws InvocationTargetException, IllegalAccessException {
|
||||
try {
|
||||
|
||||
@ -21,12 +21,14 @@ import lombok.Getter;
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.catalog.airflow.AuthConfiguration;
|
||||
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.catalog.entity.services.ServiceType;
|
||||
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
|
||||
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
|
||||
import org.openmetadata.catalog.exception.SecretsManagerException;
|
||||
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline;
|
||||
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
|
||||
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
|
||||
@ -34,10 +36,9 @@ public abstract class SecretsManager {
|
||||
|
||||
@Getter private final String clusterPrefix;
|
||||
|
||||
@Getter private final OpenMetadataServerConnection.SecretsManagerProvider secretsManagerProvider;
|
||||
@Getter private final SecretsManagerProvider secretsManagerProvider;
|
||||
|
||||
protected SecretsManager(
|
||||
OpenMetadataServerConnection.SecretsManagerProvider secretsManagerProvider, String clusterPrefix) {
|
||||
protected SecretsManager(SecretsManagerProvider secretsManagerProvider, String clusterPrefix) {
|
||||
this.secretsManagerProvider = secretsManagerProvider;
|
||||
this.clusterPrefix = clusterPrefix;
|
||||
}
|
||||
@ -117,4 +118,6 @@ public abstract class SecretsManager {
|
||||
protected String extractConnectionPackageName(ServiceType serviceType) {
|
||||
return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_CAMEL, serviceType.value());
|
||||
}
|
||||
|
||||
public abstract Object storeTestConnectionObject(TestServiceConnection testServiceConnection);
|
||||
}
|
||||
|
||||
@ -14,10 +14,9 @@
|
||||
package org.openmetadata.catalog.secrets;
|
||||
|
||||
import java.util.Map;
|
||||
import javax.validation.constraints.NotEmpty;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.SecretsManagerProvider;
|
||||
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@ -25,7 +24,7 @@ public class SecretsManagerConfiguration {
|
||||
|
||||
public static final SecretsManagerProvider DEFAULT_SECRET_MANAGER = SecretsManagerProvider.LOCAL;
|
||||
|
||||
@NotEmpty private SecretsManagerProvider secretsManager;
|
||||
private SecretsManagerProvider secretsManager;
|
||||
|
||||
private Map<String, String> parameters;
|
||||
}
|
||||
|
||||
@ -13,7 +13,7 @@
|
||||
|
||||
package org.openmetadata.catalog.secrets;
|
||||
|
||||
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.SecretsManagerProvider;
|
||||
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
|
||||
|
||||
public class SecretsManagerFactory {
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@
|
||||
]
|
||||
},
|
||||
"connectionType": {
|
||||
"description": "Type of database service such as MySQL, BigQuery, Snowflake, Redshift, Postgres...",
|
||||
"description": "Type of service such as Database, Dashboard, Messaging, etc.",
|
||||
"type": "string",
|
||||
"enum": ["Database", "Dashboard", "Messaging", "Pipeline", "MlModel"],
|
||||
"javaEnums": [
|
||||
@ -46,6 +46,14 @@
|
||||
"name": "MlModel"
|
||||
}
|
||||
]
|
||||
},
|
||||
"secretsManagerProvider": {
|
||||
"$ref": "../../../entity/services/connections/metadata/secretsManagerProvider.json"
|
||||
},
|
||||
"clusterName": {
|
||||
"description": "Cluster name to differentiate OpenMetadata Server instance",
|
||||
"type": "string",
|
||||
"default": "openmetadata"
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
|
||||
@ -68,10 +68,7 @@
|
||||
]
|
||||
},
|
||||
"secretsManagerProvider": {
|
||||
"description": "OpenMetadata Secrets Manager Provider. Make sure to configure the same secrets manager providers as the ones configured on the OpenMetadata server.",
|
||||
"type": "string",
|
||||
"enum": ["local", "aws", "aws-ssm"],
|
||||
"default": "local"
|
||||
"$ref": "./secretsManagerProvider.json"
|
||||
},
|
||||
"secretsManagerCredentials": {
|
||||
"description": "OpenMetadata Secrets Manager Client credentials",
|
||||
|
||||
@ -0,0 +1,10 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/entity/services/connections/metadata/secretsManagersProvider.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "Secrets Manager Provider",
|
||||
"description": "OpenMetadata Secrets Manager Provider. Make sure to configure the same secrets manager providers as the ones configured on the OpenMetadata server.",
|
||||
"type": "string",
|
||||
"javaType": "org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider",
|
||||
"enum": ["local", "aws", "aws-ssm"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.mockConstruction;
|
||||
import static org.mockito.Mockito.never;
|
||||
@ -29,6 +30,7 @@ import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
@ -46,6 +48,7 @@ import org.openmetadata.catalog.CatalogApplicationConfig;
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.EntityInterface;
|
||||
import org.openmetadata.catalog.airflow.AirflowRESTClient;
|
||||
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
|
||||
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
@ -85,11 +88,11 @@ public class IngestionPipelineResourceUnitTest {
|
||||
CollectionDAO.EntityRelationshipDAO relationshipDAO = mock(CollectionDAO.EntityRelationshipDAO.class);
|
||||
CollectionDAO.EntityRelationshipRecord entityRelationshipRecord =
|
||||
mock(CollectionDAO.EntityRelationshipRecord.class);
|
||||
when(entityRelationshipRecord.getId()).thenReturn(UUID.randomUUID());
|
||||
when(entityRelationshipRecord.getType()).thenReturn("ingestionPipeline");
|
||||
when(relationshipDAO.findFrom(any(), any(), anyInt())).thenReturn(List.of(entityRelationshipRecord));
|
||||
lenient().when(entityRelationshipRecord.getId()).thenReturn(UUID.randomUUID());
|
||||
lenient().when(entityRelationshipRecord.getType()).thenReturn("ingestionPipeline");
|
||||
lenient().when(relationshipDAO.findFrom(any(), any(), anyInt())).thenReturn(List.of(entityRelationshipRecord));
|
||||
when(collectionDAO.ingestionPipelineDAO()).thenReturn(entityDAO);
|
||||
when(collectionDAO.relationshipDAO()).thenReturn(relationshipDAO);
|
||||
lenient().when(collectionDAO.relationshipDAO()).thenReturn(relationshipDAO);
|
||||
ingestionPipelineResource = new IngestionPipelineResource(collectionDAO, authorizer, secretsManager);
|
||||
}
|
||||
|
||||
@ -110,6 +113,21 @@ public class IngestionPipelineResourceUnitTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testTestConnectionCallSecretsManager() {
|
||||
TestServiceConnection testServiceConnection = new TestServiceConnection();
|
||||
try (MockedConstruction<AirflowRESTClient> mocked =
|
||||
mockConstruction(AirflowRESTClient.class, this::preparePipelineServiceClient)) {
|
||||
ingestionPipelineResource.initialize(catalogApplicationConfig);
|
||||
PipelineServiceClient client = mocked.constructed().get(0);
|
||||
HttpResponse<String> httpResponse = mock(HttpResponse.class);
|
||||
when(client.testConnection(any())).thenReturn(httpResponse);
|
||||
ingestionPipelineResource.testIngestion(null, null, testServiceConnection);
|
||||
verify(client).testConnection(testServiceConnection);
|
||||
verify(secretsManager).storeTestConnectionObject(testServiceConnection);
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(
|
||||
"org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceUnitTestParams#params")
|
||||
|
||||
@ -15,6 +15,7 @@ package org.openmetadata.catalog.secrets;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.reset;
|
||||
@ -28,7 +29,9 @@ import java.util.Objects;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
|
||||
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.catalog.services.connections.database.MysqlConnection;
|
||||
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import software.amazon.awssdk.services.ssm.SsmClient;
|
||||
import software.amazon.awssdk.services.ssm.model.GetParameterRequest;
|
||||
@ -67,6 +70,26 @@ public class AWSSSMSecretsManagerTest extends ExternalSecretsManagerTest {
|
||||
assertEquals(EXPECTED_CONNECTION_JSON, updateSecretCaptor.getValue().value());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEncryptTestServiceConnection() {
|
||||
String expectedSecretId = String.format("/openmetadata/%s/database", TEST_CONNECTION_SECRET_ID_PREFIX);
|
||||
mockClientGetValue(null);
|
||||
TestServiceConnection testServiceConnection =
|
||||
new TestServiceConnection()
|
||||
.withConnection(new MysqlConnection())
|
||||
.withConnectionType(TestServiceConnection.ConnectionType.Database)
|
||||
.withSecretsManagerProvider(secretsManager.getSecretsManagerProvider());
|
||||
ArgumentCaptor<PutParameterRequest> createSecretCaptor = ArgumentCaptor.forClass(PutParameterRequest.class);
|
||||
Object serviceConnection = secretsManager.storeTestConnectionObject(testServiceConnection);
|
||||
verify(ssmClient).putParameter(createSecretCaptor.capture());
|
||||
verifySecretIdGetCalls(expectedSecretId, 1);
|
||||
assertEquals(expectedSecretId, createSecretCaptor.getValue().name());
|
||||
assertEquals(
|
||||
"{\"type\":\"Mysql\",\"scheme\":\"mysql+pymysql\",\"supportsMetadataExtraction\":true,\"supportsProfiler\":true}",
|
||||
createSecretCaptor.getValue().value());
|
||||
assertNull(serviceConnection);
|
||||
}
|
||||
|
||||
@Override
|
||||
void setUpSpecific(SecretsManagerConfiguration config) {
|
||||
secretsManager = AWSSSMSecretsManager.getInstance(config, "openmetadata");
|
||||
@ -107,7 +130,7 @@ public class AWSSSMSecretsManagerTest extends ExternalSecretsManagerTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
OpenMetadataServerConnection.SecretsManagerProvider expectedSecretManagerProvider() {
|
||||
return OpenMetadataServerConnection.SecretsManagerProvider.AWS_SSM;
|
||||
SecretsManagerProvider expectedSecretManagerProvider() {
|
||||
return SecretsManagerProvider.AWS_SSM;
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@ package org.openmetadata.catalog.secrets;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.times;
|
||||
@ -26,7 +27,9 @@ import java.util.Objects;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
|
||||
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.catalog.services.connections.database.MysqlConnection;
|
||||
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
|
||||
import software.amazon.awssdk.services.secretsmanager.model.CreateSecretRequest;
|
||||
@ -49,6 +52,26 @@ public class AWSSecretsManagerTest extends ExternalSecretsManagerTest {
|
||||
assertEquals(EXPECTED_CONNECTION_JSON, createSecretCaptor.getValue().secretString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEncryptTestServiceConnection() {
|
||||
String expectedSecretId = String.format("/openmetadata/%s/database", TEST_CONNECTION_SECRET_ID_PREFIX);
|
||||
mockClientGetValue(null);
|
||||
TestServiceConnection testServiceConnection =
|
||||
new TestServiceConnection()
|
||||
.withConnection(new MysqlConnection())
|
||||
.withConnectionType(TestServiceConnection.ConnectionType.Database)
|
||||
.withSecretsManagerProvider(secretsManager.getSecretsManagerProvider());
|
||||
ArgumentCaptor<CreateSecretRequest> createSecretCaptor = ArgumentCaptor.forClass(CreateSecretRequest.class);
|
||||
Object serviceConnection = secretsManager.storeTestConnectionObject(testServiceConnection);
|
||||
verify(secretsManagerClient).createSecret(createSecretCaptor.capture());
|
||||
verifySecretIdGetCalls(expectedSecretId, 1);
|
||||
assertEquals(expectedSecretId, createSecretCaptor.getValue().name());
|
||||
assertEquals(
|
||||
"{\"type\":\"Mysql\",\"scheme\":\"mysql+pymysql\",\"supportsMetadataExtraction\":true,\"supportsProfiler\":true}",
|
||||
createSecretCaptor.getValue().secretString());
|
||||
assertNull(serviceConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEncryptDatabaseServiceConnectionConfigWhenAlreadyExist() {
|
||||
mockClientGetValue(EXPECTED_CONNECTION_JSON);
|
||||
@ -101,7 +124,7 @@ public class AWSSecretsManagerTest extends ExternalSecretsManagerTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
OpenMetadataServerConnection.SecretsManagerProvider expectedSecretManagerProvider() {
|
||||
return OpenMetadataServerConnection.SecretsManagerProvider.AWS;
|
||||
SecretsManagerProvider expectedSecretManagerProvider() {
|
||||
return SecretsManagerProvider.AWS;
|
||||
}
|
||||
}
|
||||
|
||||
@ -55,13 +55,15 @@ import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipelin
|
||||
import org.openmetadata.catalog.metadataIngestion.SourceConfig;
|
||||
import org.openmetadata.catalog.services.connections.database.MysqlConnection;
|
||||
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
|
||||
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public abstract class ExternalSecretsManagerTest {
|
||||
|
||||
static final boolean ENCRYPT = true;
|
||||
static final String AUTH_PROVIDER_SECRET_ID_SUFFIX = "auth-provider";
|
||||
static final String AUTH_PROVIDER_SECRET_ID_PREFIX = "auth-provider";
|
||||
static final String TEST_CONNECTION_SECRET_ID_PREFIX = "test-connection-temp";
|
||||
static final boolean DECRYPT = false;
|
||||
static final String EXPECTED_CONNECTION_JSON =
|
||||
"{\"type\":\"Mysql\",\"scheme\":\"mysql+pymysql\",\"password\":\"openmetadata-test\",\"supportsMetadataExtraction\":true,\"supportsProfiler\":true}";
|
||||
@ -112,7 +114,7 @@ public abstract class ExternalSecretsManagerTest {
|
||||
OpenMetadataServerConnection.AuthProvider authProvider,
|
||||
AuthConfiguration authConfig)
|
||||
throws JsonProcessingException {
|
||||
String expectedSecretId = String.format("/openmetadata/%s/%s", AUTH_PROVIDER_SECRET_ID_SUFFIX, authProvider);
|
||||
String expectedSecretId = String.format("/openmetadata/%s/%s", AUTH_PROVIDER_SECRET_ID_PREFIX, authProvider);
|
||||
AirflowConfiguration airflowConfiguration = ConfigurationFixtures.buildAirflowConfig(authProvider);
|
||||
airflowConfiguration.setAuthConfig(authConfig);
|
||||
AirflowConfiguration expectedAirflowConfiguration = ConfigurationFixtures.buildAirflowConfig(authProvider);
|
||||
@ -223,5 +225,5 @@ public abstract class ExternalSecretsManagerTest {
|
||||
ConfigurationFixtures.buildAzureClientConfig(), AZURE, ConfigurationFixtures.buildAzureAuthConfig()));
|
||||
}
|
||||
|
||||
abstract OpenMetadataServerConnection.SecretsManagerProvider expectedSecretManagerProvider();
|
||||
abstract SecretsManagerProvider expectedSecretManagerProvider();
|
||||
}
|
||||
|
||||
@ -48,6 +48,7 @@ import org.openmetadata.catalog.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.catalog.airflow.AuthConfiguration;
|
||||
import org.openmetadata.catalog.api.services.CreateDatabaseService;
|
||||
import org.openmetadata.catalog.api.services.CreateMlModelService;
|
||||
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.catalog.entity.services.ServiceType;
|
||||
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
|
||||
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
|
||||
@ -56,6 +57,7 @@ import org.openmetadata.catalog.fixtures.ConfigurationFixtures;
|
||||
import org.openmetadata.catalog.metadataIngestion.SourceConfig;
|
||||
import org.openmetadata.catalog.services.connections.database.MysqlConnection;
|
||||
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
|
||||
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
|
||||
import org.openmetadata.catalog.services.connections.mlModel.SklearnConnection;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
|
||||
@ -98,6 +100,17 @@ public class LocalSecretsManagerTest {
|
||||
testEncryptDecryptServiceConnection(ENCRYPTED_VALUE, DECRYPTED_VALUE, DECRYPT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEncryptTestServiceConnection() {
|
||||
TestServiceConnection testServiceConnection =
|
||||
new TestServiceConnection()
|
||||
.withConnection(new MysqlConnection())
|
||||
.withConnectionType(TestServiceConnection.ConnectionType.Database)
|
||||
.withSecretsManagerProvider(secretsManager.getSecretsManagerProvider());
|
||||
Object actualServiceConnection = secretsManager.storeTestConnectionObject(testServiceConnection);
|
||||
assertEquals(testServiceConnection.getConnection(), actualServiceConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEncryptServiceConnectionWithoutPassword() {
|
||||
testEncryptDecryptServiceConnectionWithoutPassword(ENCRYPT);
|
||||
@ -133,7 +146,7 @@ public class LocalSecretsManagerTest {
|
||||
|
||||
@Test
|
||||
void testReturnsExpectedSecretManagerProvider() {
|
||||
assertEquals(OpenMetadataServerConnection.SecretsManagerProvider.LOCAL, secretsManager.getSecretsManagerProvider());
|
||||
assertEquals(SecretsManagerProvider.LOCAL, secretsManager.getSecretsManagerProvider());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
||||
@ -17,7 +17,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import java.util.HashMap;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.SecretsManagerProvider;
|
||||
import org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider;
|
||||
|
||||
public class SecretsManagerFactoryTest {
|
||||
|
||||
|
||||
@ -19,7 +19,9 @@ from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union
|
||||
|
||||
from metadata.ingestion.ometa.mixins.dashboard_mixin import OMetaDashboardMixin
|
||||
from metadata.ingestion.ometa.mixins.patch_mixin import OMetaPatchMixin
|
||||
from metadata.utils.secrets.secrets_manager_factory import get_secrets_manager
|
||||
from metadata.utils.secrets.secrets_manager_factory import (
|
||||
get_secrets_manager_from_om_connection,
|
||||
)
|
||||
|
||||
try:
|
||||
from typing import get_args
|
||||
@ -167,7 +169,7 @@ class OpenMetadata(
|
||||
self.config = config
|
||||
|
||||
# Load the secrets' manager client
|
||||
self.secrets_manager_client = get_secrets_manager(
|
||||
self.secrets_manager_client = get_secrets_manager_from_om_connection(
|
||||
config, config.secretsManagerCredentials
|
||||
)
|
||||
|
||||
|
||||
@ -20,6 +20,8 @@ from metadata.clients.aws_client import AWSClient
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
AuthProvider,
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.serviceConnection import (
|
||||
@ -31,7 +33,9 @@ from metadata.utils.secrets.secrets_manager import (
|
||||
AUTH_PROVIDER_MAPPING,
|
||||
AUTH_PROVIDER_SECRET_PREFIX,
|
||||
DBT_SOURCE_CONFIG_SECRET_PREFIX,
|
||||
TEST_CONNECTION_TEMP_SECRET_PREFIX,
|
||||
SecretsManager,
|
||||
ServiceConnectionType,
|
||||
ServiceWithConnectionType,
|
||||
logger,
|
||||
)
|
||||
@ -118,6 +122,25 @@ class AWSBasedSecretsManager(SecretsManager, ABC):
|
||||
source_config_json = self.get_string_value(secret_id)
|
||||
return json.loads(source_config_json) if source_config_json else None
|
||||
|
||||
def retrieve_temp_service_test_connection(
|
||||
self,
|
||||
connection: ServiceConnectionType,
|
||||
service_type: str,
|
||||
) -> ServiceConnectionType:
|
||||
"""
|
||||
Retrieve the service connection from the AWS client stored in a temporary secret depending on the service
|
||||
type.
|
||||
:param connection: Connection of the service
|
||||
:param service_type: Service type e.g. Database
|
||||
"""
|
||||
secret_id = self.build_secret_id(
|
||||
TEST_CONNECTION_TEMP_SECRET_PREFIX, service_type
|
||||
)
|
||||
service_conn_class = self.get_service_connection_class(service_type)
|
||||
stored_connection = service_conn_class()
|
||||
source_config_json = self.get_string_value(secret_id)
|
||||
return stored_connection.parse_raw(source_config_json)
|
||||
|
||||
@abstractmethod
|
||||
def get_string_value(self, name: str) -> str:
|
||||
"""
|
||||
|
||||
@ -16,7 +16,7 @@ from typing import Optional
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
from metadata.generated.schema.entity.services.connections.metadata.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
|
||||
|
||||
@ -16,7 +16,7 @@ from typing import Optional
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
from metadata.generated.schema.entity.services.connections.metadata.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
|
||||
|
||||
@ -14,6 +14,8 @@ Secrets manager implementation for local secrets manager
|
||||
"""
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.serviceConnection import (
|
||||
@ -22,6 +24,7 @@ from metadata.generated.schema.entity.services.connections.serviceConnection imp
|
||||
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig
|
||||
from metadata.utils.secrets.secrets_manager import (
|
||||
SecretsManager,
|
||||
ServiceConnectionType,
|
||||
ServiceWithConnectionType,
|
||||
logger,
|
||||
)
|
||||
@ -51,7 +54,7 @@ class LocalSecretsManager(SecretsManager):
|
||||
service_type: str,
|
||||
) -> ServiceConnection:
|
||||
"""
|
||||
The LocalSecretsManager does not modify the ServiceConnection object
|
||||
Returns the ServiceConnection object with the service connection
|
||||
"""
|
||||
logger.debug(
|
||||
f"Retrieving service connection from {self.provider} secrets' manager for {service_type} - {service.name}"
|
||||
@ -62,7 +65,7 @@ class LocalSecretsManager(SecretsManager):
|
||||
self, source_config: SourceConfig, pipeline_name: str
|
||||
) -> object:
|
||||
"""
|
||||
Retrieve the DBT source config from the secret manager from a source config object.
|
||||
Retrieve the DBT source config if it is present in the source config object
|
||||
:param source_config: SourceConfig object
|
||||
:param pipeline_name: the pipeline's name
|
||||
:return:
|
||||
@ -77,3 +80,15 @@ class LocalSecretsManager(SecretsManager):
|
||||
and source_config.config.dbtConfigSource
|
||||
else None
|
||||
)
|
||||
|
||||
def retrieve_temp_service_test_connection(
|
||||
self,
|
||||
connection: ServiceConnectionType,
|
||||
service_type: str,
|
||||
) -> ServiceConnectionType:
|
||||
"""
|
||||
Returns the connection
|
||||
:param connection: Connection of the service
|
||||
:param service_type: Service type e.g. Database
|
||||
"""
|
||||
return connection
|
||||
|
||||
@ -17,6 +17,14 @@ from abc import abstractmethod
|
||||
from pydoc import locate
|
||||
from typing import Dict, NewType, Union
|
||||
|
||||
from metadata.generated.schema.entity.services import (
|
||||
dashboardService,
|
||||
databaseService,
|
||||
messagingService,
|
||||
metadataService,
|
||||
mlmodelService,
|
||||
pipelineService,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
AuthProvider,
|
||||
OpenMetadataConnection,
|
||||
@ -46,7 +54,7 @@ logger = ingestion_logger()
|
||||
|
||||
SECRET_MANAGER_AIRFLOW_CONF = "openmetadata_secrets_manager"
|
||||
|
||||
# new typing type wrapping types from the '__root__' field of 'ServiceConnection' class
|
||||
# new typing type wrapping services with connection field types
|
||||
ServiceWithConnectionType = NewType(
|
||||
"ServiceWithConnectionType",
|
||||
Union[
|
||||
@ -59,6 +67,19 @@ ServiceWithConnectionType = NewType(
|
||||
],
|
||||
)
|
||||
|
||||
# new typing type wrapping types from the '__root__' field of 'ServiceConnection' class
|
||||
ServiceConnectionType = NewType(
|
||||
"ServiceConnectionType",
|
||||
Union[
|
||||
dashboardService.DashboardConnection,
|
||||
databaseService.DatabaseConnection,
|
||||
messagingService.MessagingConnection,
|
||||
metadataService.MetadataConnection,
|
||||
pipelineService.PipelineConnection,
|
||||
mlmodelService.MlModelConnection,
|
||||
],
|
||||
)
|
||||
|
||||
# new typing type wrapping types from the 'securityConfig' field of 'OpenMetadataConnection' class
|
||||
AuthProviderClientType = NewType(
|
||||
"AuthProviderClientType", OpenMetadataConnection.__fields__["securityConfig"].type_
|
||||
@ -77,6 +98,8 @@ DBT_SOURCE_CONFIG_SECRET_PREFIX: str = "database-metadata-pipeline"
|
||||
|
||||
AUTH_PROVIDER_SECRET_PREFIX: str = "auth-provider"
|
||||
|
||||
TEST_CONNECTION_TEMP_SECRET_PREFIX: str = "test-connection-temp"
|
||||
|
||||
|
||||
class SecretsManager(metaclass=Singleton):
|
||||
"""
|
||||
@ -124,6 +147,20 @@ class SecretsManager(metaclass=Singleton):
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def retrieve_temp_service_test_connection(
|
||||
self,
|
||||
connection: ServiceConnectionType,
|
||||
service_type: str,
|
||||
) -> ServiceConnectionType:
|
||||
"""
|
||||
Retrieve the service connection from the secret manager stored in a temporary secret depending on the service
|
||||
type.
|
||||
:param connection: Connection of the service
|
||||
:param service_type: Service type e.g. Database
|
||||
"""
|
||||
pass
|
||||
|
||||
@property
|
||||
def secret_id_separator(self) -> str:
|
||||
"""
|
||||
@ -162,15 +199,15 @@ class SecretsManager(metaclass=Singleton):
|
||||
clazz[1]
|
||||
for clazz in inspect.getmembers(
|
||||
locate(
|
||||
f"metadata.generated.schema.entity.services.{service_type}Service"
|
||||
f"metadata.generated.schema.entity.services.{service_type.lower()}Service"
|
||||
),
|
||||
inspect.isclass,
|
||||
)
|
||||
if clazz[0].lower() == f"{service_type}connection"
|
||||
if clazz[0].lower() == f"{service_type.lower()}connection"
|
||||
)
|
||||
).__name__
|
||||
return locate(
|
||||
f"metadata.generated.schema.entity.services.{service_type}Service.{service_conn_name}"
|
||||
f"metadata.generated.schema.entity.services.{service_type.lower()}Service.{service_conn_name}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
||||
@ -16,6 +16,8 @@ from typing import Optional, Union
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
|
||||
@ -25,7 +27,7 @@ from metadata.utils.secrets.local_secrets_manager import LocalSecretsManager
|
||||
from metadata.utils.secrets.secrets_manager import SecretsManager
|
||||
|
||||
|
||||
def get_secrets_manager(
|
||||
def get_secrets_manager_from_om_connection(
|
||||
open_metadata_config: OpenMetadataConnection,
|
||||
credentials: Optional[Union[AWSCredentials]] = None,
|
||||
) -> SecretsManager:
|
||||
@ -35,13 +37,33 @@ def get_secrets_manager(
|
||||
:param credentials: optional credentials that could be required by the clients of the secrets manager implementations
|
||||
:return: a secrets manager
|
||||
"""
|
||||
if open_metadata_config.secretsManagerProvider == SecretsManagerProvider.local:
|
||||
return LocalSecretsManager(open_metadata_config.clusterName)
|
||||
elif open_metadata_config.secretsManagerProvider == SecretsManagerProvider.aws:
|
||||
return AWSSecretsManager(credentials, open_metadata_config.clusterName)
|
||||
elif open_metadata_config.secretsManagerProvider == SecretsManagerProvider.aws_ssm:
|
||||
return AWSSSMSecretsManager(credentials, open_metadata_config.clusterName)
|
||||
return get_secrets_manager(
|
||||
open_metadata_config.secretsManagerProvider,
|
||||
open_metadata_config.clusterName,
|
||||
credentials,
|
||||
)
|
||||
|
||||
|
||||
def get_secrets_manager(
|
||||
secrets_manager_provider: SecretsManagerProvider,
|
||||
cluster_name: str,
|
||||
credentials: Optional[Union[AWSCredentials]] = None,
|
||||
) -> SecretsManager:
|
||||
"""
|
||||
Method to get the secrets manager based on the arguments passed
|
||||
:param secrets_manager_provider: the secrets manager provider
|
||||
:param cluster_name: the cluster name
|
||||
:param credentials: optional credentials that could be required by the clients of the secrets manager implementations
|
||||
:return: a secrets manager
|
||||
"""
|
||||
if (
|
||||
secrets_manager_provider is None
|
||||
or secrets_manager_provider == SecretsManagerProvider.local
|
||||
):
|
||||
return LocalSecretsManager(cluster_name)
|
||||
elif secrets_manager_provider == SecretsManagerProvider.aws:
|
||||
return AWSSecretsManager(credentials, cluster_name)
|
||||
elif secrets_manager_provider == SecretsManagerProvider.aws_ssm:
|
||||
return AWSSSMSecretsManager(credentials, cluster_name)
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
f"[{open_metadata_config.secretsManagerProvider}] is not implemented."
|
||||
)
|
||||
raise NotImplementedError(f"[{secrets_manager_provider}] is not implemented.")
|
||||
|
||||
@ -16,6 +16,8 @@ from unittest.mock import Mock, patch
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
AuthProvider,
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.generated.schema.security.client.googleSSOClientConfig import (
|
||||
@ -70,7 +72,7 @@ class OMetaSecretManagerTest(TestCase):
|
||||
assert type(self.metadata.secrets_manager_client) is AWSSecretsManager
|
||||
assert type(self.metadata._auth_provider) is NoOpAuthenticationProvider
|
||||
|
||||
@patch("metadata.ingestion.ometa.ometa_api.get_secrets_manager")
|
||||
@patch("metadata.ingestion.ometa.ometa_api.get_secrets_manager_from_om_connection")
|
||||
def test_ometa_with_aws_secret_manager_with_google_auth(self, secrets_manager_mock):
|
||||
security_config = copy(self.aws_server_config)
|
||||
security_config.securityConfig = GoogleSSOClientConfig(secretKey="/fake/path")
|
||||
|
||||
@ -20,6 +20,7 @@ from unittest.mock import Mock, patch
|
||||
from metadata.generated.schema.entity.services.connections.serviceConnection import (
|
||||
ServiceConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
|
||||
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
|
||||
DatabaseServiceMetadataPipeline,
|
||||
)
|
||||
@ -30,6 +31,7 @@ from metadata.utils.singleton import Singleton
|
||||
from .test_secrets_manager import (
|
||||
AUTH_PROVIDER_CONFIG,
|
||||
DATABASE_CONNECTION,
|
||||
DATABASE_CONNECTION_CONFIG,
|
||||
DBT_SOURCE_CONFIG,
|
||||
TestSecretsManager,
|
||||
)
|
||||
@ -45,7 +47,7 @@ class AWSBasedSecretsManager(object):
|
||||
def test_aws_manager_add_service_config_connection(self, mocked_get_client):
|
||||
|
||||
aws_manager = self.build_secret_manager(
|
||||
mocked_get_client, self.build_response_value(DATABASE_CONNECTION)
|
||||
mocked_get_client, self.build_response_value(DATABASE_CONNECTION_CONFIG)
|
||||
)
|
||||
expected_service_connection = self.service_connection
|
||||
|
||||
@ -69,9 +71,10 @@ class AWSBasedSecretsManager(object):
|
||||
|
||||
with self.assertRaises(ValueError) as value_error:
|
||||
aws_manager.retrieve_service_connection(self.service, self.service_type)
|
||||
self.assertEqual(
|
||||
"[SecretString] not present in the response.", value_error.exception
|
||||
)
|
||||
self.assertTrue(
|
||||
"/openmetadata/service/database/mysql/test_service"
|
||||
in str(value_error.exception)
|
||||
)
|
||||
|
||||
@patch("metadata.clients.aws_client.AWSClient.get_client")
|
||||
def test_aws_manager_add_auth_provider_security_config(self, mocked_get_client):
|
||||
@ -93,6 +96,18 @@ class AWSBasedSecretsManager(object):
|
||||
actual_om_connection.securityConfig
|
||||
)
|
||||
|
||||
@patch("metadata.clients.aws_client.AWSClient.get_client")
|
||||
def test_aws_manager_fails_add_auth_provider_security_config(
|
||||
self, mocked_get_client
|
||||
):
|
||||
aws_manager = self.build_secret_manager(mocked_get_client, {})
|
||||
|
||||
with self.assertRaises(ValueError) as value_error:
|
||||
aws_manager.add_auth_provider_security_config(self.om_connection)
|
||||
self.assertTrue(
|
||||
"/openmetadata/auth-provider/google" in str(value_error.exception)
|
||||
)
|
||||
|
||||
@patch("metadata.clients.aws_client.AWSClient.get_client")
|
||||
def test_aws_manager_retrieve_dbt_source_config(self, mocked_get_client):
|
||||
aws_manager = self.build_secret_manager(
|
||||
@ -112,18 +127,6 @@ class AWSBasedSecretsManager(object):
|
||||
)
|
||||
self.assertEqual(self.dbt_source_config.dict(), actual_dbt_source_config)
|
||||
|
||||
@patch("metadata.clients.aws_client.AWSClient.get_client")
|
||||
def test_aws_manager_fails_add_auth_provider_security_config(
|
||||
self, mocked_get_client
|
||||
):
|
||||
aws_manager = self.build_secret_manager(mocked_get_client, {})
|
||||
|
||||
with self.assertRaises(ValueError) as value_error:
|
||||
aws_manager.add_auth_provider_security_config(self.om_connection)
|
||||
self.assertEqual(
|
||||
"[SecretString] not present in the response.", value_error.exception
|
||||
)
|
||||
|
||||
@patch("metadata.clients.aws_client.AWSClient.get_client")
|
||||
def test_aws_manager_aws_manager_fails_retrieve_dbt_source_config_when_not_stored(
|
||||
self, mocked_get_client
|
||||
@ -137,9 +140,49 @@ class AWSBasedSecretsManager(object):
|
||||
|
||||
with self.assertRaises(ValueError) as value_error:
|
||||
aws_manager.retrieve_dbt_source_config(source_config, "test-pipeline")
|
||||
self.assertEqual(
|
||||
"[SecretString] not present in the response.", value_error.exception
|
||||
self.assertTrue(
|
||||
"/openmetadata/database-metadata-pipeline/test-pipeline"
|
||||
in str(value_error.exception)
|
||||
)
|
||||
|
||||
@patch("metadata.clients.aws_client.AWSClient.get_client")
|
||||
def test_aws_manager_retrieve_temp_service_test_connection(
|
||||
self, mocked_get_client
|
||||
):
|
||||
|
||||
aws_manager = self.build_secret_manager(
|
||||
mocked_get_client, self.build_response_value(DATABASE_CONNECTION)
|
||||
)
|
||||
expected_service_connection = self.service.connection
|
||||
|
||||
actual_service_connection: DatabaseConnection = (
|
||||
aws_manager.retrieve_temp_service_test_connection(
|
||||
self.service.connection, "Database"
|
||||
)
|
||||
)
|
||||
|
||||
self.assert_client_called_once(
|
||||
aws_manager, "/openmetadata/test-connection-temp/database"
|
||||
)
|
||||
self.assertEqual(expected_service_connection, actual_service_connection)
|
||||
assert id(actual_service_connection.config) != id(
|
||||
expected_service_connection.config
|
||||
)
|
||||
|
||||
@patch("metadata.clients.aws_client.AWSClient.get_client")
|
||||
def test_aws_manager_fails_retrieve_temp_service_test_connection(
|
||||
self, mocked_get_client
|
||||
):
|
||||
aws_manager = self.build_secret_manager(mocked_get_client, {})
|
||||
|
||||
with self.assertRaises(ValueError) as value_error:
|
||||
aws_manager.retrieve_temp_service_test_connection(
|
||||
self.service.connection, self.service_type
|
||||
)
|
||||
self.assertTrue(
|
||||
"/openmetadata/test-connection-temp/database"
|
||||
in str(value_error.exception)
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def build_secret_manager(
|
||||
|
||||
@ -27,7 +27,7 @@ from .test_aws_based_secrets_manager import AWSBasedSecretsManager
|
||||
class TestAWSSecretsManager(AWSBasedSecretsManager.TestCase, ABC):
|
||||
def build_secret_manager(
|
||||
self, mocked_get_client: Mock, expected_json: Dict[str, Any]
|
||||
) -> AWSSecretsManager:
|
||||
) -> AWSSSMSecretsManager:
|
||||
self.init_mocked_get_client(mocked_get_client, expected_json)
|
||||
return AWSSSMSecretsManager(
|
||||
AWSCredentials(
|
||||
|
||||
@ -14,24 +14,27 @@ Test Local Secrets Manager
|
||||
"""
|
||||
from copy import deepcopy
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
from metadata.generated.schema.entity.services.connections.metadata.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.serviceConnection import (
|
||||
ServiceConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
|
||||
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
|
||||
DatabaseServiceMetadataPipeline,
|
||||
)
|
||||
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig
|
||||
from metadata.utils.secrets.secrets_manager_factory import get_secrets_manager
|
||||
from metadata.utils.secrets.secrets_manager_factory import (
|
||||
get_secrets_manager_from_om_connection,
|
||||
)
|
||||
|
||||
from .test_secrets_manager import TestSecretsManager
|
||||
|
||||
|
||||
class TestLocalSecretsManager(TestSecretsManager.External):
|
||||
def test_local_manager_add_service_config_connection(self):
|
||||
local_manager = get_secrets_manager(
|
||||
local_manager = get_secrets_manager_from_om_connection(
|
||||
self.build_open_metadata_connection(SecretsManagerProvider.local), None
|
||||
)
|
||||
expected_service_connection = self.service_connection
|
||||
@ -46,7 +49,7 @@ class TestLocalSecretsManager(TestSecretsManager.External):
|
||||
)
|
||||
|
||||
def test_local_manager_add_auth_provider_security_config(self):
|
||||
local_manager = get_secrets_manager(
|
||||
local_manager = get_secrets_manager_from_om_connection(
|
||||
self.build_open_metadata_connection(SecretsManagerProvider.local), None
|
||||
)
|
||||
actual_om_connection = deepcopy(self.om_connection)
|
||||
@ -58,7 +61,7 @@ class TestLocalSecretsManager(TestSecretsManager.External):
|
||||
assert id(self.auth_provider_config) == id(actual_om_connection.securityConfig)
|
||||
|
||||
def test_local_manager_retrieve_dbt_source_config(self):
|
||||
local_manager = get_secrets_manager(
|
||||
local_manager = get_secrets_manager_from_om_connection(
|
||||
self.build_open_metadata_connection(SecretsManagerProvider.local), None
|
||||
)
|
||||
source_config = SourceConfig()
|
||||
@ -71,3 +74,20 @@ class TestLocalSecretsManager(TestSecretsManager.External):
|
||||
)
|
||||
|
||||
self.assertEqual(self.dbt_source_config.dict(), actual_dbt_source_config)
|
||||
|
||||
def test_local_manager_retrieve_temp_service_test_connection(self):
|
||||
local_manager = get_secrets_manager_from_om_connection(
|
||||
self.build_open_metadata_connection(SecretsManagerProvider.local), None
|
||||
)
|
||||
expected_service_connection = self.service.connection
|
||||
|
||||
actual_service_connection: DatabaseConnection = (
|
||||
local_manager.retrieve_temp_service_test_connection(
|
||||
self.service.connection, "Database"
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(actual_service_connection, expected_service_connection)
|
||||
assert id(actual_service_connection.config) == id(
|
||||
expected_service_connection.config
|
||||
)
|
||||
|
||||
@ -21,6 +21,8 @@ from metadata.generated.schema.entity.services.connections.database.mysqlConnect
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
AuthProvider,
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.serviceConnection import (
|
||||
@ -39,7 +41,13 @@ from metadata.generated.schema.security.client.googleSSOClientConfig import (
|
||||
)
|
||||
from metadata.utils.secrets.secrets_manager import AUTH_PROVIDER_MAPPING
|
||||
|
||||
DATABASE_CONNECTION = {"username": "test", "hostPort": "localhost:3306"}
|
||||
DATABASE_CONNECTION_CONFIG = {
|
||||
"type": "Mysql",
|
||||
"username": "test",
|
||||
"hostPort": "localhost:3306",
|
||||
}
|
||||
|
||||
DATABASE_CONNECTION = {"config": DATABASE_CONNECTION_CONFIG}
|
||||
|
||||
DATABASE_SERVICE = {
|
||||
"id": uuid.uuid4(),
|
||||
@ -68,7 +76,7 @@ class TestSecretsManager(TestCase):
|
||||
service_type: str = "database"
|
||||
service: DatabaseService
|
||||
service_connection: ServiceConnection
|
||||
database_connection = MysqlConnection(**DATABASE_CONNECTION)
|
||||
database_connection = MysqlConnection(**DATABASE_CONNECTION_CONFIG)
|
||||
auth_provider_config = GoogleSSOClientConfig(**AUTH_PROVIDER_CONFIG)
|
||||
om_connection: OpenMetadataConnection
|
||||
dbt_source_config: DbtHttpConfig
|
||||
|
||||
@ -17,9 +17,14 @@ from unittest.mock import patch
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.utils.secrets.secrets_manager_factory import get_secrets_manager
|
||||
from metadata.utils.secrets.secrets_manager_factory import (
|
||||
get_secrets_manager,
|
||||
get_secrets_manager_from_om_connection,
|
||||
)
|
||||
from metadata.utils.singleton import Singleton
|
||||
|
||||
from .test_secrets_manager import TestSecretsManager
|
||||
@ -38,11 +43,26 @@ class TestSecretsManagerFactory(TestCase):
|
||||
)
|
||||
)
|
||||
om_connection.secretsManagerProvider = "aws"
|
||||
get_secrets_manager(om_connection)
|
||||
get_secrets_manager_from_om_connection(om_connection)
|
||||
self.assertEqual(
|
||||
"[any] is not implemented.", not_implemented_error.exception
|
||||
)
|
||||
|
||||
def test_get_none_secret_manager(self):
|
||||
om_connection: OpenMetadataConnection = (
|
||||
TestSecretsManager.External.build_open_metadata_connection(
|
||||
SecretsManagerProvider.local
|
||||
)
|
||||
)
|
||||
om_connection.secretsManagerProvider = None
|
||||
assert get_secrets_manager_from_om_connection(om_connection) is not None
|
||||
assert (
|
||||
get_secrets_manager(
|
||||
om_connection.secretsManagerProvider, om_connection.clusterName
|
||||
)
|
||||
is not None
|
||||
)
|
||||
|
||||
@patch("metadata.clients.aws_client.boto3")
|
||||
def test_all_providers_has_implementation(self, mocked_boto3):
|
||||
mocked_boto3.client.return_value = {}
|
||||
@ -55,4 +75,14 @@ class TestSecretsManagerFactory(TestCase):
|
||||
secretsManagerProvider=secret_manager_provider,
|
||||
hostPort="http://localhost:8585",
|
||||
)
|
||||
assert get_secrets_manager(open_metadata_connection) is not None
|
||||
assert (
|
||||
get_secrets_manager_from_om_connection(open_metadata_connection)
|
||||
is not None
|
||||
)
|
||||
assert (
|
||||
get_secrets_manager(
|
||||
open_metadata_connection.secretsManagerProvider,
|
||||
open_metadata_connection.clusterName,
|
||||
)
|
||||
is not None
|
||||
)
|
||||
|
||||
@ -14,6 +14,9 @@ from a WorkflowSource
|
||||
"""
|
||||
from flask import Response
|
||||
from openmetadata_managed_apis.api.response import ApiResponse
|
||||
from openmetadata_managed_apis.workflows.ingestion.credentials_builder import (
|
||||
build_secrets_manager_credentials,
|
||||
)
|
||||
|
||||
from metadata.generated.schema.api.services.ingestionPipelines.testServiceConnection import (
|
||||
TestServiceConnectionRequest,
|
||||
@ -23,6 +26,7 @@ from metadata.utils.connections import (
|
||||
get_connection,
|
||||
test_connection,
|
||||
)
|
||||
from metadata.utils.secrets.secrets_manager_factory import get_secrets_manager
|
||||
|
||||
|
||||
def test_source_connection(
|
||||
@ -30,9 +34,22 @@ def test_source_connection(
|
||||
) -> Response:
|
||||
"""
|
||||
Create the engine and test the connection
|
||||
:param workflow_source: Source to test
|
||||
:param test_service_connection: Service connection to test
|
||||
:return: None or exception
|
||||
"""
|
||||
secrets_manager = get_secrets_manager(
|
||||
test_service_connection.secretsManagerProvider,
|
||||
test_service_connection.clusterName,
|
||||
build_secrets_manager_credentials(
|
||||
test_service_connection.secretsManagerProvider
|
||||
),
|
||||
)
|
||||
test_service_connection.connection = (
|
||||
secrets_manager.retrieve_temp_service_test_connection(
|
||||
test_service_connection.connection,
|
||||
test_service_connection.connectionType.value,
|
||||
)
|
||||
)
|
||||
connection = get_connection(test_service_connection.connection.config)
|
||||
|
||||
try:
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
from airflow.configuration import conf
|
||||
from pydantic import SecretStr
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
from metadata.generated.schema.entity.services.connections.metadata.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
|
||||
|
||||
@ -23,7 +23,7 @@
|
||||
]
|
||||
},
|
||||
"connectionType": {
|
||||
"description": "Type of database service such as MySQL, BigQuery, Snowflake, Redshift, Postgres...",
|
||||
"description": "Type of service such as Database, Dashboard, Messaging, etc.",
|
||||
"type": "string",
|
||||
"enum": ["Database", "Dashboard", "Messaging", "Pipeline"],
|
||||
"javaEnums": [
|
||||
@ -40,6 +40,14 @@
|
||||
"name": "Pipeline"
|
||||
}
|
||||
]
|
||||
},
|
||||
"secretsManagerProvider": {
|
||||
"$ref": "../../../entity/services/connections/metadata/secretsManagerProvider.json"
|
||||
},
|
||||
"clusterName": {
|
||||
"description": "Cluster name to differentiate OpenMetadata Server instance",
|
||||
"type": "string",
|
||||
"default": "openmetadata"
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
|
||||
@ -68,10 +68,7 @@
|
||||
]
|
||||
},
|
||||
"secretsManagerProvider": {
|
||||
"description": "OpenMetadata Secrets Manager Provider. Make sure to configure the same secrets manager providers as the ones configured on the OpenMetadata server.",
|
||||
"type": "string",
|
||||
"enum": ["local", "aws", "aws-ssm"],
|
||||
"default": "local"
|
||||
"$ref": "./secretsManagerProvider.json"
|
||||
},
|
||||
"secretsManagerCredentials": {
|
||||
"description": "OpenMetadata Secrets Manager Client credentials",
|
||||
|
||||
@ -0,0 +1,10 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/entity/services/connections/metadata/secretsManagersProvider.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "Secrets Manager Provider",
|
||||
"description": "OpenMetadata Secrets Manager Provider. Make sure to configure the same secrets manager providers as the ones configured on the OpenMetadata server.",
|
||||
"type": "string",
|
||||
"javaType": "org.openmetadata.catalog.services.connections.metadata.SecretsManagerProvider",
|
||||
"enum": ["local", "aws", "aws-ssm"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user