Fix#6643: Secure in secrets manager the dbtConfigSource object (#6743)

* Secure in secrets manager the dbtConfigSource object

* Add missing tests

* Fix minor bug

* Minor code refactor
This commit is contained in:
Nahuel 2022-08-17 10:32:43 +02:00 committed by GitHub
parent 424239840f
commit 00ce67c01e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 802 additions and 67 deletions

View File

@ -21,9 +21,10 @@ import org.json.JSONObject;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.LogLevels;
import org.openmetadata.catalog.metadataIngestion.SourceConfig;
import org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResource;
import org.openmetadata.catalog.secrets.SecretsManager;
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.Relationship;
@ -37,7 +38,9 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
private static final String PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled";
private static PipelineServiceClient pipelineServiceClient;
public IngestionPipelineRepository(CollectionDAO dao) {
private final SecretsManager secretsManager;
public IngestionPipelineRepository(CollectionDAO dao, SecretsManager secretsManager) {
super(
IngestionPipelineResource.COLLECTION_PATH,
Entity.INGESTION_PIPELINE,
@ -47,6 +50,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
PATCH_FIELDS,
UPDATE_FIELDS);
this.allowEdits = true;
this.secretsManager = secretsManager;
}
@Override
@ -78,7 +82,25 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
// Don't store owner. Build it on the fly based on relationships
ingestionPipeline.withOwner(null).withService(null).withHref(null);
store(ingestionPipeline.getId(), ingestionPipeline, update);
String serviceType = service.getType();
// encrypt config in case of local secret manager
if (secretsManager.isLocal()) {
secretsManager.encryptOrDecryptDbtConfigSource(ingestionPipeline, serviceType, true);
store(ingestionPipeline.getId(), ingestionPipeline, update);
} else {
// otherwise, nullify the config since it will be kept outside OM
DatabaseServiceMetadataPipeline databaseServiceMetadataPipeline =
JsonUtils.convertValue(
ingestionPipeline.getSourceConfig().getConfig(), DatabaseServiceMetadataPipeline.class);
Object dbtConfigSource = databaseServiceMetadataPipeline.getDbtConfigSource();
databaseServiceMetadataPipeline.setDbtConfigSource(null);
ingestionPipeline.getSourceConfig().setConfig(databaseServiceMetadataPipeline);
store(ingestionPipeline.getId(), ingestionPipeline, update);
// save config in the secret manager after storing the ingestion pipeline
databaseServiceMetadataPipeline.setDbtConfigSource(dbtConfigSource);
ingestionPipeline.getSourceConfig().setConfig(databaseServiceMetadataPipeline);
secretsManager.encryptOrDecryptDbtConfigSource(ingestionPipeline, serviceType, true);
}
// Restore the relationships
ingestionPipeline.withOwner(owner).withService(service);
@ -119,7 +141,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
@Override
public void entitySpecificUpdate() throws IOException {
updateSourceConfig(original.getSourceConfig(), updated.getSourceConfig());
updateSourceConfig();
updateAirflowConfig(original.getAirflowConfig(), updated.getAirflowConfig());
updateOpenMetadataServerConnection(
original.getOpenMetadataServerConnection(), updated.getOpenMetadataServerConnection());
@ -127,13 +149,16 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
updateEnabled(original.getEnabled(), updated.getEnabled());
}
private void updateSourceConfig(SourceConfig origSource, SourceConfig updatedSource)
throws JsonProcessingException {
JSONObject origSourceConfig = new JSONObject(JsonUtils.pojoToJson(origSource.getConfig()));
JSONObject updatedSourceConfig = new JSONObject(JsonUtils.pojoToJson(updatedSource.getConfig()));
private void updateSourceConfig() throws JsonProcessingException {
secretsManager.encryptOrDecryptDbtConfigSource(original, false);
JSONObject origSourceConfig = new JSONObject(JsonUtils.pojoToJson(original.getSourceConfig().getConfig()));
JSONObject updatedSourceConfig = new JSONObject(JsonUtils.pojoToJson(updated.getSourceConfig().getConfig()));
original.getSourceConfig().setConfig(null);
if (!origSourceConfig.similar(updatedSourceConfig)) {
recordChange("sourceConfig", origSource, updatedSource);
recordChange("sourceConfig", "old-encrypted-value", "new-encrypted-value", true);
}
}

View File

@ -65,6 +65,7 @@ import org.openmetadata.catalog.jdbi3.ListFilter;
import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.resources.EntityResource;
import org.openmetadata.catalog.secrets.SecretsManager;
import org.openmetadata.catalog.security.AuthorizationException;
import org.openmetadata.catalog.security.Authorizer;
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.catalog.type.EntityHistory;
@ -95,9 +96,9 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
}
public IngestionPipelineResource(CollectionDAO dao, Authorizer authorizer, SecretsManager secretsManager) {
super(IngestionPipeline.class, new IngestionPipelineRepository(dao), authorizer);
super(IngestionPipeline.class, new IngestionPipelineRepository(dao, secretsManager), authorizer);
this.secretsManager = secretsManager;
this.ingestionPipelineRepository = new IngestionPipelineRepository(dao);
this.ingestionPipelineRepository = new IngestionPipelineRepository(dao, secretsManager);
}
public void initialize(CatalogApplicationConfig config) {
@ -174,6 +175,8 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUSES)) {
addStatus(ingestionPipelines.getData());
}
listOrEmpty(ingestionPipelines.getData())
.forEach(ingestionPipeline -> decryptOrNullify(securityContext, ingestionPipeline));
return ingestionPipelines;
}
@ -233,7 +236,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUSES)) {
ingestionPipeline = addStatus(ingestionPipeline);
}
return ingestionPipeline;
return decryptOrNullify(securityContext, ingestionPipeline);
}
@GET
@ -301,7 +304,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUSES)) {
ingestionPipeline = addStatus(ingestionPipeline);
}
return ingestionPipeline;
return decryptOrNullify(securityContext, ingestionPipeline);
}
@POST
@ -322,7 +325,9 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateIngestionPipeline create)
throws IOException {
IngestionPipeline ingestionPipeline = getIngestionPipeline(create, securityContext.getUserPrincipal().getName());
return create(uriInfo, securityContext, ingestionPipeline, true);
Response response = create(uriInfo, securityContext, ingestionPipeline, true);
decryptOrNullify(securityContext, (IngestionPipeline) response.getEntity());
return response;
}
@PATCH
@ -348,7 +353,9 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
}))
JsonPatch patch)
throws IOException {
return patchInternal(uriInfo, securityContext, id, patch);
Response response = patchInternal(uriInfo, securityContext, id, patch);
decryptOrNullify(securityContext, (IngestionPipeline) response.getEntity());
return response;
}
@PUT
@ -369,7 +376,9 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateIngestionPipeline update)
throws IOException {
IngestionPipeline ingestionPipeline = getIngestionPipeline(update, securityContext.getUserPrincipal().getName());
return createOrUpdate(uriInfo, securityContext, ingestionPipeline, true);
Response response = createOrUpdate(uriInfo, securityContext, ingestionPipeline, true);
decryptOrNullify(securityContext, (IngestionPipeline) response.getEntity());
return response;
}
@POST
@ -389,9 +398,10 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
public IngestionPipeline deployIngestion(
@Context UriInfo uriInfo, @PathParam("id") UUID id, @Context SecurityContext securityContext) throws IOException {
Fields fields = getFields(FIELD_OWNER);
IngestionPipeline pipeline = dao.get(uriInfo, id, fields);
pipelineServiceClient.deployPipeline(pipeline);
return addHref(uriInfo, dao.get(uriInfo, id, fields));
IngestionPipeline ingestionPipeline = dao.get(uriInfo, id, fields);
pipelineServiceClient.deployPipeline(ingestionPipeline);
decryptOrNullify(securityContext, ingestionPipeline);
return addHref(uriInfo, ingestionPipeline);
}
@POST
@ -412,9 +422,10 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
public IngestionPipeline triggerIngestion(
@Context UriInfo uriInfo, @PathParam("id") UUID id, @Context SecurityContext securityContext) throws IOException {
Fields fields = getFields(FIELD_OWNER);
IngestionPipeline pipeline = dao.get(uriInfo, id, fields);
pipelineServiceClient.runPipeline(pipeline.getName());
return addHref(uriInfo, dao.get(uriInfo, id, fields));
IngestionPipeline ingestionPipeline = dao.get(uriInfo, id, fields);
pipelineServiceClient.runPipeline(ingestionPipeline.getName());
decryptOrNullify(securityContext, ingestionPipeline);
return addHref(uriInfo, ingestionPipeline);
}
@POST
@ -459,6 +470,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
public Response killIngestion(
@Context UriInfo uriInfo, @PathParam("id") UUID id, @Context SecurityContext securityContext) throws IOException {
IngestionPipeline ingestionPipeline = getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED);
decryptOrNullify(securityContext, ingestionPipeline);
HttpResponse<String> response = pipelineServiceClient.killIngestion(ingestionPipeline);
return Response.status(200, response.body()).build();
}
@ -574,4 +586,19 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
}
return ingestionPipeline;
}
private IngestionPipeline decryptOrNullify(SecurityContext securityContext, IngestionPipeline ingestionPipeline) {
try {
authorizer.authorize(
securityContext,
getOperationContext,
getResourceContextById(ingestionPipeline.getId()),
secretsManager.isLocal());
} catch (AuthorizationException | IOException e) {
ingestionPipeline.getSourceConfig().setConfig(null);
return ingestionPipeline;
}
secretsManager.encryptOrDecryptDbtConfigSource(ingestionPipeline, false);
return ingestionPipeline;
}
}

View File

@ -18,6 +18,7 @@ import static org.openmetadata.catalog.services.connections.metadata.OpenMetadat
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import java.util.Locale;
import java.util.Objects;
import org.openmetadata.catalog.airflow.AirflowConfiguration;
import org.openmetadata.catalog.airflow.AuthConfiguration;
import org.openmetadata.catalog.entity.services.ServiceType;
@ -39,6 +40,8 @@ public class AWSSecretsManager extends SecretsManager {
public static final String ACCESS_KEY_ID = "accessKeyId";
public static final String SECRET_ACCESS_KEY = "secretAccessKey";
public static final String REGION = "region";
public static final String DATABASE_METADATA_PIPELINE_SECRET_ID_SUFFIX = "database-metadata-pipeline";
public static final String NULL_SECRET_STRING = "null";
private static AWSSecretsManager INSTANCE = null;
@ -72,13 +75,14 @@ public class AWSSecretsManager extends SecretsManager {
try {
if (encrypt) {
String connectionConfigJson = JsonUtils.pojoToJson(connectionConfig);
if (connectionConfigJson != null) {
upsertSecret(secretName, connectionConfigJson);
}
upsertSecret(secretName, connectionConfigJson);
return null;
} else {
Class<?> clazz = createConnectionConfigClass(connectionType, extractConnectionPackageName(serviceType));
return JsonUtils.readValue(getSecret(secretName), clazz);
String connectionConfigJson = getSecret(secretName);
return NULL_SECRET_STRING.equals(connectionConfigJson)
? null
: JsonUtils.readValue(getSecret(secretName), clazz);
}
} catch (ClassNotFoundException ex) {
throw InvalidServiceConnectionException.byMessage(
@ -88,6 +92,25 @@ public class AWSSecretsManager extends SecretsManager {
}
}
@Override
public Object encryptOrDecryptDbtConfigSource(Object dbtConfigSource, String ingestionPipelineName, boolean encrypt) {
String secretName = buildSecretId(DATABASE_METADATA_PIPELINE_SECRET_ID_SUFFIX, ingestionPipelineName);
try {
if (encrypt) {
String dbtConfigSourceJson = JsonUtils.pojoToJson(dbtConfigSource);
upsertSecret(secretName, dbtConfigSourceJson);
return null;
} else {
String dbtConfigSourceJson = getSecret(secretName);
return NULL_SECRET_STRING.equals(dbtConfigSourceJson)
? null
: JsonUtils.readValue(dbtConfigSourceJson, Object.class);
}
} catch (Exception e) {
throw SecretsManagerException.byMessage(getClass().getSimpleName(), secretName, e.getMessage());
}
}
@Override
public AirflowConfiguration encryptAirflowConnection(AirflowConfiguration airflowConfiguration) {
OpenMetadataServerConnection.AuthProvider authProvider =
@ -148,7 +171,7 @@ public class AWSSecretsManager extends SecretsManager {
CreateSecretRequest.builder()
.name(secretName)
.description("This secret was created by OpenMetadata")
.secretString(secretValue)
.secretString(Objects.isNull(secretValue) ? NULL_SECRET_STRING : secretValue)
.build();
this.secretsClient.createSecret(createSecretRequest);
}
@ -158,7 +181,7 @@ public class AWSSecretsManager extends SecretsManager {
UpdateSecretRequest.builder()
.secretId(secretName)
.description("This secret was created by OpenMetadata")
.secretString(secretValue)
.secretString(Objects.isNull(secretValue) ? NULL_SECRET_STRING : secretValue)
.build();
this.secretsClient.updateSecret(updateSecretRequest);
}

View File

@ -56,6 +56,11 @@ public class LocalSecretsManager extends SecretsManager {
}
}
@Override
public Object encryptOrDecryptDbtConfigSource(Object dbtConfigSource, String ingestionPipelineName, boolean encrypt) {
return dbtConfigSource;
}
@Override
public AirflowConfiguration encryptAirflowConnection(AirflowConfiguration airflowConfiguration) {
return airflowConfiguration;

View File

@ -18,11 +18,16 @@ import static java.util.Objects.isNull;
import com.google.common.base.CaseFormat;
import java.util.List;
import lombok.Getter;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.airflow.AirflowConfiguration;
import org.openmetadata.catalog.airflow.AuthConfiguration;
import org.openmetadata.catalog.entity.services.ServiceType;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.catalog.exception.SecretsManagerException;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.catalog.util.JsonUtils;
public abstract class SecretsManager {
@ -43,6 +48,28 @@ public abstract class SecretsManager {
public abstract Object encryptOrDecryptServiceConnectionConfig(
Object connectionConfig, String connectionType, String connectionName, ServiceType serviceType, boolean encrypt);
abstract Object encryptOrDecryptDbtConfigSource(
Object dbtConfigSource, String ingestionPipelineName, boolean encrypt);
public void encryptOrDecryptDbtConfigSource(IngestionPipeline ingestionPipeline, boolean encrypt) {
encryptOrDecryptDbtConfigSource(ingestionPipeline, ingestionPipeline.getService().getType(), encrypt);
}
public void encryptOrDecryptDbtConfigSource(
IngestionPipeline ingestionPipeline, String serviceType, boolean encrypt) {
// DatabaseServiceMetadataPipeline contains dbtConfigSource and must be encrypted
if (serviceType.equals(Entity.DATABASE_SERVICE)
&& ingestionPipeline.getPipelineType().equals(PipelineType.METADATA)) {
DatabaseServiceMetadataPipeline databaseServiceMetadataPipeline =
JsonUtils.convertValue(
ingestionPipeline.getSourceConfig().getConfig(), DatabaseServiceMetadataPipeline.class);
databaseServiceMetadataPipeline.setDbtConfigSource(
encryptOrDecryptDbtConfigSource(
databaseServiceMetadataPipeline.getDbtConfigSource(), ingestionPipeline.getName(), encrypt));
ingestionPipeline.getSourceConfig().setConfig(databaseServiceMetadataPipeline);
}
}
public OpenMetadataServerConnection decryptServerConnection(AirflowConfiguration airflowConfiguration) {
OpenMetadataServerConnection.AuthProvider authProvider =
OpenMetadataServerConnection.AuthProvider.fromValue(airflowConfiguration.getAuthProvider());

View File

@ -26,16 +26,19 @@ import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.ServiceConnectionEntityInterface;
import org.openmetadata.catalog.ServiceEntityInterface;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.catalog.exception.SecretsManagerMigrationException;
import org.openmetadata.catalog.jdbi3.ChangeEventRepository;
import org.openmetadata.catalog.jdbi3.IngestionPipelineRepository;
import org.openmetadata.catalog.jdbi3.ListFilter;
import org.openmetadata.catalog.jdbi3.ServiceEntityRepository;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.catalog.resources.CollectionRegistry;
import org.openmetadata.catalog.resources.events.EventResource;
import org.openmetadata.catalog.resources.services.ServiceEntityResource;
import org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResource;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.JsonUtils;
/**
* Migration service from LocalSecretManager to configured one.
@ -129,15 +132,17 @@ public class SecretsManagerMigrationService {
service.getName(),
repository.getServiceType(),
false));
newSecretManager.encryptOrDecryptServiceConnectionConfig(
service.getConnection().getConfig(),
service.getServiceType().value(),
service.getName(),
repository.getServiceType(),
true);
service
.getConnection()
.setConfig(
newSecretManager.encryptOrDecryptServiceConnectionConfig(
service.getConnection().getConfig(),
service.getServiceType().value(),
service.getName(),
repository.getServiceType(),
true));
// avoid reaching secrets manager quotas
Thread.sleep(100);
service.getConnection().setConfig(null);
repository.dao.update(service);
} catch (IOException | InterruptedException e) {
throw new SecretsManagerMigrationException(e.getMessage(), e.getCause());
@ -174,7 +179,14 @@ public class SecretsManagerMigrationService {
private void migrateIngestionPipelines(IngestionPipeline ingestionPipeline) {
try {
IngestionPipeline ingestion = ingestionPipelineRepository.dao.findEntityById(ingestionPipeline.getId());
ingestion.getOpenMetadataServerConnection().setSecurityConfig(null);
if (hasSecurityConfig(ingestionPipeline)) {
ingestion.getOpenMetadataServerConnection().setSecurityConfig(null);
}
if (hasDbtConfig(ingestionPipeline)) {
// we have to decrypt using the old secrets manager and encrypt again with the new one
oldSecretManager.encryptOrDecryptDbtConfigSource(ingestionPipeline, false);
newSecretManager.encryptOrDecryptDbtConfigSource(ingestionPipeline, true);
}
ingestionPipelineRepository.dao.update(ingestion);
} catch (IOException e) {
throw new SecretsManagerMigrationException(e.getMessage(), e.getCause());
@ -191,16 +203,28 @@ public class SecretsManagerMigrationService {
ingestionPipelineRepository.dao.listCount(new ListFilter()),
null)
.getData().stream()
.filter(
ingestionPipeline ->
!Objects.isNull(ingestionPipeline.getOpenMetadataServerConnection())
&& !Objects.isNull(ingestionPipeline.getOpenMetadataServerConnection().getSecurityConfig()))
.filter(this::hasSecurityConfig)
.filter(this::hasDbtConfig)
.collect(Collectors.toList());
} catch (IOException e) {
throw new SecretsManagerMigrationException(e.getMessage(), e.getCause());
}
}
private boolean hasSecurityConfig(IngestionPipeline ingestionPipeline) {
return !Objects.isNull(ingestionPipeline.getOpenMetadataServerConnection())
&& !Objects.isNull(ingestionPipeline.getOpenMetadataServerConnection().getSecurityConfig());
}
private boolean hasDbtConfig(IngestionPipeline ingestionPipeline) {
return ingestionPipeline.getService().getType().equals(Entity.DATABASE_SERVICE)
&& ingestionPipeline.getPipelineType().equals(PipelineType.METADATA)
&& JsonUtils.convertValue(
ingestionPipeline.getSourceConfig().getConfig(), DatabaseServiceMetadataPipeline.class)
.getDbtConfigSource()
!= null;
}
/** This method delete all the change events which could contain connection config parameters for services */
private void deleteChangeEventsForServices() {
connectionTypeRepositoriesMap.values().stream()

View File

@ -154,6 +154,7 @@
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"dbtConfigSource": {
"mask": true,
"title": "DBT Configuration Source",
"description": "Available sources to fetch DBT catalog and manifest files.",
"oneOf": [

View File

@ -11,7 +11,6 @@
"type": "object",
"properties": {
"config": {
"mask": true,
"oneOf": [
{
"$ref": "databaseServiceMetadataPipeline.json"

View File

@ -0,0 +1,109 @@
/*
* Copyright 2022 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.LinkedHashMap;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.DatabaseService;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.SourceConfig;
import org.openmetadata.catalog.secrets.SecretsManager;
@ExtendWith(MockitoExtension.class)
public class IngestionPipelineRepositoryTest {
@Mock protected CollectionDAO collectionDAO;
@Mock protected SecretsManager secretsManager;
@Mock protected CollectionDAO.IngestionPipelineDAO dao;
protected IngestionPipelineRepository ingestionPipelineRepository;
@BeforeEach
void beforeEach() {
when(collectionDAO.ingestionPipelineDAO()).thenReturn(dao);
ingestionPipelineRepository = new IngestionPipelineRepository(collectionDAO, secretsManager);
}
@AfterEach
void afterEach() {
reset(secretsManager);
}
@Test
void testStoreEntityCallCorrectlyLocalSecretManager() throws IOException {
IngestionPipeline ingestionPipeline = initStoreEntityTest(true);
ArgumentCaptor<String> serviceTypeCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> ingestionPipelineStringCaptor = ArgumentCaptor.forClass(String.class);
ingestionPipelineRepository.storeEntity(ingestionPipeline, true);
verify(secretsManager)
.encryptOrDecryptDbtConfigSource(any(IngestionPipeline.class), serviceTypeCaptor.capture(), eq(true));
verify(dao).update(isNull(), ingestionPipelineStringCaptor.capture());
assertEquals("databaseService", serviceTypeCaptor.getValue());
assertEquals(
"{\"name\":\"testPipeline\",\"sourceConfig\":{\"config\":{\"type\":\"DatabaseMetadata\",\"markDeletedTables\":true,\"includeTables\":true,\"includeViews\":true,\"includeTags\":true,\"dbtConfigSource\":{}}},\"loggerLevel\":\"INFO\",\"enabled\":true,\"version\":0.1,\"deleted\":false}",
ingestionPipelineStringCaptor.getValue());
}
@Test
void testStoreEntityCallCorrectlyAWSSecretManager() throws IOException {
IngestionPipeline ingestionPipeline = initStoreEntityTest(false);
ArgumentCaptor<String> serviceTypeCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> ingestionPipelineStringCaptor = ArgumentCaptor.forClass(String.class);
ingestionPipelineRepository.storeEntity(ingestionPipeline, true);
verify(secretsManager)
.encryptOrDecryptDbtConfigSource(any(IngestionPipeline.class), serviceTypeCaptor.capture(), eq(true));
verify(dao).update(isNull(), ingestionPipelineStringCaptor.capture());
assertEquals("databaseService", serviceTypeCaptor.getValue());
assertEquals(
"{\"name\":\"testPipeline\",\"sourceConfig\":{\"config\":{\"type\":\"DatabaseMetadata\",\"markDeletedTables\":true,\"includeTables\":true,\"includeViews\":true,\"includeTags\":true}},\"loggerLevel\":\"INFO\",\"enabled\":true,\"version\":0.1,\"deleted\":false}",
ingestionPipelineStringCaptor.getValue());
}
private IngestionPipeline initStoreEntityTest(boolean isLocal) {
when(secretsManager.isLocal()).thenReturn(isLocal);
return new IngestionPipeline()
.withName("testPipeline")
.withService(new DatabaseService().getEntityReference().withType(Entity.DATABASE_SERVICE))
.withSourceConfig(
new SourceConfig()
.withConfig(new DatabaseServiceMetadataPipeline().withDbtConfigSource(new LinkedHashMap<>())));
}
}

View File

@ -158,7 +158,11 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
IngestionPipeline expected, IngestionPipeline updated, Map<String, String> authHeaders) {
assertEquals(expected.getDisplayName(), updated.getDisplayName());
assertReference(expected.getService(), updated.getService());
assertNull(updated.getSourceConfig().getConfig());
if (Entity.DATABASE_SERVICE.equals(updated.getService().getType())) {
assertNull(
JsonUtils.convertValue(updated.getSourceConfig().getConfig(), DatabaseServiceMetadataPipeline.class)
.getDbtConfigSource());
}
}
@Override

View File

@ -14,10 +14,17 @@
package org.openmetadata.catalog.resources.services.ingestionpipelines;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -29,14 +36,23 @@ import javax.ws.rs.core.SecurityContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedConstruction.Context;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.EntityInterface;
import org.openmetadata.catalog.airflow.AirflowRESTClient;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.EntityDAO;
import org.openmetadata.catalog.jdbi3.EntityRepository;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.SourceConfig;
import org.openmetadata.catalog.secrets.SecretsManager;
import org.openmetadata.catalog.security.Authorizer;
import org.openmetadata.catalog.type.EntityReference;
@ -47,6 +63,8 @@ public class IngestionPipelineResourceUnitTest {
private static final UUID DAG_ID = UUID.randomUUID();
private static final String PIPELINE_NAME = "service_test";
private IngestionPipelineResource ingestionPipelineResource;
@Mock SecurityContext securityContext;
@ -57,11 +75,13 @@ public class IngestionPipelineResourceUnitTest {
@Mock CatalogApplicationConfig catalogApplicationConfig;
@Mock IngestionPipeline ingestionPipeline;
@Mock SecretsManager secretsManager;
@Mock CollectionDAO.IngestionPipelineDAO entityDAO;
@BeforeEach
void setUp() throws IOException {
CollectionDAO.IngestionPipelineDAO entityDAO = mock(CollectionDAO.IngestionPipelineDAO.class);
void setUp() {
reset(entityDAO, collectionDAO, secretsManager, authorizer);
CollectionDAO.EntityRelationshipDAO relationshipDAO = mock(CollectionDAO.EntityRelationshipDAO.class);
CollectionDAO.EntityRelationshipRecord entityRelationshipRecord =
mock(CollectionDAO.EntityRelationshipRecord.class);
@ -70,14 +90,15 @@ public class IngestionPipelineResourceUnitTest {
when(relationshipDAO.findFrom(any(), any(), anyInt())).thenReturn(List.of(entityRelationshipRecord));
when(collectionDAO.ingestionPipelineDAO()).thenReturn(entityDAO);
when(collectionDAO.relationshipDAO()).thenReturn(relationshipDAO);
when(entityDAO.findEntityById(any(), any())).thenReturn(ingestionPipeline);
when(entityDAO.findEntityReferenceById(any(), any())).thenReturn(mock(EntityReference.class));
when(ingestionPipeline.getId()).thenReturn(DAG_ID);
ingestionPipelineResource = new IngestionPipelineResource(collectionDAO, authorizer, mock(SecretsManager.class));
ingestionPipelineResource = new IngestionPipelineResource(collectionDAO, authorizer, secretsManager);
}
@Test
public void testLastIngestionLogsAreRetrieved() throws IOException {
void testLastIngestionLogsAreRetrieved() throws IOException {
IngestionPipeline ingestionPipeline = mock(IngestionPipeline.class);
when(ingestionPipeline.getId()).thenReturn(DAG_ID);
when(entityDAO.findEntityById(any(), any())).thenReturn(ingestionPipeline);
when(entityDAO.findEntityReferenceById(any(), any())).thenReturn(mock(EntityReference.class));
Map<String, String> expectedMap = Map.of("task", "log");
try (MockedConstruction<AirflowRESTClient> mocked =
mockConstruction(AirflowRESTClient.class, this::preparePipelineServiceClient)) {
@ -89,7 +110,107 @@ public class IngestionPipelineResourceUnitTest {
}
}
@ParameterizedTest
@MethodSource(
"org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceUnitTestParams#params")
void testGetIsEncryptedWhenSecretManagerIsConfigured(
Object config,
EntityReference service,
Class<? extends EntityInterface> serviceClass,
PipelineType pipelineType,
boolean mustBeEncrypted)
throws IOException {
UUID id = UUID.randomUUID();
IngestionPipeline ingestionPipeline = buildIngestionPipeline(config, pipelineType, id);
Entity.registerEntity(serviceClass, service.getType(), mock(EntityDAO.class), mock(EntityRepository.class));
doAnswer(
invocation -> {
if (mustBeEncrypted) {
IngestionPipeline arg0 = invocation.getArgument(0);
((DatabaseServiceMetadataPipeline) arg0.getSourceConfig().getConfig()).setDbtConfigSource(null);
}
return null;
})
.when(secretsManager)
.encryptOrDecryptDbtConfigSource(any(IngestionPipeline.class), anyBoolean());
when(entityDAO.findEntityById(eq(id), any())).thenReturn(ingestionPipeline);
when(entityDAO.findEntityReferenceById(any(), any())).thenReturn(service);
IngestionPipeline actualIngestionPipeline = ingestionPipelineResource.get(null, securityContext, id, null, null);
verifySecretManagerIsCalled(mustBeEncrypted, ingestionPipeline);
assertIngestionPipelineDbtConfigIsEncrypted(mustBeEncrypted, actualIngestionPipeline);
}
@ParameterizedTest
@MethodSource(
"org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceUnitTestParams#params")
void testGetByNameIsEncryptedWhenSecretManagerIsConfigured(
Object config,
EntityReference service,
Class<? extends EntityInterface> serviceClass,
PipelineType pipelineType,
boolean mustBeEncrypted)
throws IOException {
UUID id = UUID.randomUUID();
IngestionPipeline ingestionPipeline = buildIngestionPipeline(config, pipelineType, id);
Entity.registerEntity(serviceClass, service.getType(), mock(EntityDAO.class), mock(EntityRepository.class));
when(entityDAO.findEntityByName(eq(PIPELINE_NAME), any())).thenReturn(ingestionPipeline);
when(entityDAO.findEntityReferenceById(any(), any())).thenReturn(service);
doAnswer(
invocation -> {
if (mustBeEncrypted) {
IngestionPipeline arg0 = invocation.getArgument(0);
((DatabaseServiceMetadataPipeline) arg0.getSourceConfig().getConfig()).setDbtConfigSource(null);
}
return null;
})
.when(secretsManager)
.encryptOrDecryptDbtConfigSource(any(IngestionPipeline.class), anyBoolean());
IngestionPipeline actualIngestionPipeline =
ingestionPipelineResource.getByName(null, PIPELINE_NAME, securityContext, null, null);
verifySecretManagerIsCalled(mustBeEncrypted, ingestionPipeline);
assertIngestionPipelineDbtConfigIsEncrypted(mustBeEncrypted, actualIngestionPipeline);
}
private void preparePipelineServiceClient(AirflowRESTClient mockPipelineServiceClient, Context context) {
when(mockPipelineServiceClient.getLastIngestionLogs(any())).thenReturn(Map.of("task", "log"));
}
private IngestionPipeline buildIngestionPipeline(Object config, PipelineType pipelineType, UUID id) {
return new IngestionPipeline()
.withId(id)
.withPipelineType(pipelineType)
.withSourceConfig(new SourceConfig().withConfig(config))
.withName(PIPELINE_NAME);
}
private void verifySecretManagerIsCalled(boolean mustBeEncrypted, IngestionPipeline ingestionPipeline) {
if (mustBeEncrypted) {
verify(secretsManager).encryptOrDecryptDbtConfigSource(ingestionPipeline, false);
} else {
verify(secretsManager, never()).encryptOrDecryptDbtConfigSource(any(), any(), anyBoolean());
}
}
private void assertIngestionPipelineDbtConfigIsEncrypted(
boolean mustBeEncrypted, IngestionPipeline actualIngestionPipeline) {
if (mustBeEncrypted) {
assertNull(
((DatabaseServiceMetadataPipeline) actualIngestionPipeline.getSourceConfig().getConfig())
.getDbtConfigSource());
} else {
assertNotNull(actualIngestionPipeline.getSourceConfig().getConfig());
}
}
}

View File

@ -0,0 +1,124 @@
/*
* Copyright 2022 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.services.ingestionpipelines;
import java.util.LinkedHashMap;
import java.util.stream.Stream;
import org.junit.jupiter.params.provider.Arguments;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.services.CreateDashboardService;
import org.openmetadata.catalog.api.services.CreateDatabaseService;
import org.openmetadata.catalog.api.services.CreateMessagingService;
import org.openmetadata.catalog.api.services.CreateMlModelService;
import org.openmetadata.catalog.api.services.CreatePipelineService;
import org.openmetadata.catalog.entity.services.DashboardService;
import org.openmetadata.catalog.entity.services.DatabaseService;
import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.entity.services.MlModelService;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.catalog.metadataIngestion.DashboardServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceProfilerPipeline;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceQueryLineagePipeline;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceQueryUsagePipeline;
import org.openmetadata.catalog.metadataIngestion.MessagingServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.MlmodelServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.PipelineServiceMetadataPipeline;
import org.openmetadata.catalog.type.EntityReference;
public class IngestionPipelineResourceUnitTestParams {
public static final EntityReference DATABASE_SERVICE_ENTITY =
new DatabaseService()
.withServiceType(CreateDatabaseService.DatabaseServiceType.Mysql)
.getEntityReference()
.withType(Entity.DATABASE_SERVICE);
public static final EntityReference PIPELINE_SERVICE_ENTITY =
new PipelineService()
.withServiceType(CreatePipelineService.PipelineServiceType.Airbyte)
.getEntityReference()
.withType(Entity.PIPELINE_SERVICE);
public static final EntityReference MESSAGING_SERVICE_ENTITY =
new MessagingService()
.withServiceType(CreateMessagingService.MessagingServiceType.Kafka)
.getEntityReference()
.withType(Entity.MESSAGING_SERVICE);
public static final EntityReference DASHBOARD_SERVICE_ENTITY =
new DashboardService()
.withServiceType(CreateDashboardService.DashboardServiceType.Looker)
.getEntityReference()
.withType(Entity.DASHBOARD_SERVICE);
public static final EntityReference MLMODEL_SERVICE_ENTITY =
new MlModelService()
.withServiceType(CreateMlModelService.MlModelServiceType.Mlflow)
.getEntityReference()
.withType(Entity.MLMODEL_SERVICE);
public static Stream<Arguments> params() {
return Stream.of(
Arguments.of(
new DatabaseServiceMetadataPipeline().withDbtConfigSource(new LinkedHashMap<>()),
DATABASE_SERVICE_ENTITY,
DatabaseService.class,
PipelineType.METADATA,
true),
Arguments.of(
new DatabaseServiceQueryUsagePipeline(),
DATABASE_SERVICE_ENTITY,
DatabaseService.class,
PipelineType.USAGE,
false),
Arguments.of(
new DatabaseServiceQueryLineagePipeline(),
DATABASE_SERVICE_ENTITY,
DatabaseService.class,
PipelineType.LINEAGE,
false),
Arguments.of(
new DashboardServiceMetadataPipeline(),
DASHBOARD_SERVICE_ENTITY,
DatabaseService.class,
PipelineType.METADATA,
false),
Arguments.of(
new MessagingServiceMetadataPipeline(),
MESSAGING_SERVICE_ENTITY,
DatabaseService.class,
PipelineType.METADATA,
false),
Arguments.of(
new DatabaseServiceProfilerPipeline(),
DATABASE_SERVICE_ENTITY,
DatabaseService.class,
PipelineType.PROFILER,
false),
Arguments.of(
new PipelineServiceMetadataPipeline(),
PIPELINE_SERVICE_ENTITY,
DatabaseService.class,
PipelineType.METADATA,
false),
Arguments.of(
new MlmodelServiceMetadataPipeline(),
MLMODEL_SERVICE_ENTITY,
DatabaseService.class,
PipelineType.METADATA,
false));
}
}

View File

@ -18,8 +18,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
@ -45,13 +48,19 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.catalog.EntityInterface;
import org.openmetadata.catalog.airflow.AirflowConfiguration;
import org.openmetadata.catalog.airflow.AuthConfiguration;
import org.openmetadata.catalog.api.services.CreateDatabaseService;
import org.openmetadata.catalog.entity.services.ServiceType;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.catalog.fixtures.ConfigurationFixtures;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.SourceConfig;
import org.openmetadata.catalog.services.connections.database.MysqlConnection;
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.JsonUtils;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
import software.amazon.awssdk.services.secretsmanager.model.CreateSecretRequest;
@ -178,6 +187,46 @@ public class AWSSecretsManagerTest {
assertEquals(OpenMetadataServerConnection.SecretsManagerProvider.AWS, secretsManager.getSecretsManagerProvider());
}
@ParameterizedTest
@MethodSource(
"org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceUnitTestParams#params")
public void testEncryptAndDecryptDbtConfigSource(
Object config,
EntityReference service,
Class<? extends EntityInterface> serviceClass,
PipelineType pipelineType,
boolean mustBeEncrypted) {
SourceConfig sourceConfigMock = mock(SourceConfig.class);
IngestionPipeline mockedIngestionPipeline = mock(IngestionPipeline.class);
when(mockedIngestionPipeline.getService()).thenReturn(service);
lenient().when(mockedIngestionPipeline.getPipelineType()).thenReturn(pipelineType);
if (mustBeEncrypted) {
when(mockedIngestionPipeline.getSourceConfig()).thenReturn(sourceConfigMock);
when(sourceConfigMock.getConfig()).thenReturn(config);
when(mockedIngestionPipeline.getName()).thenReturn("test-pipeline");
when(secretsManagerClient.getSecretValue(any(GetSecretValueRequest.class)))
.thenReturn(GetSecretValueResponse.builder().secretString("{}}").build());
}
secretsManager.encryptOrDecryptDbtConfigSource(mockedIngestionPipeline, true);
secretsManager.encryptOrDecryptDbtConfigSource(mockedIngestionPipeline, false);
if (!mustBeEncrypted) {
verify(mockedIngestionPipeline, never()).setSourceConfig(any());
verify(sourceConfigMock, never()).setConfig(any());
} else {
ArgumentCaptor<Object> configCaptor = ArgumentCaptor.forClass(Object.class);
verify(mockedIngestionPipeline, times(4)).getSourceConfig();
verify(sourceConfigMock, times(2)).setConfig(configCaptor.capture());
assertNull(((DatabaseServiceMetadataPipeline) configCaptor.getAllValues().get(0)).getDbtConfigSource());
assertEquals(configCaptor.getAllValues().get(1), config);
assertNotSame(configCaptor.getAllValues().get(1), config);
}
}
private void testEncryptDecryptServiceConnection(boolean decrypt) {
MysqlConnection mysqlConnection = new MysqlConnection();
mysqlConnection.setPassword("openmetadata-test");

View File

@ -15,10 +15,15 @@ package org.openmetadata.catalog.secrets;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.AuthProvider.AUTH_0;
import static org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.AuthProvider.AZURE;
import static org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.AuthProvider.CUSTOM_OIDC;
@ -35,18 +40,24 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.catalog.EntityInterface;
import org.openmetadata.catalog.airflow.AirflowConfiguration;
import org.openmetadata.catalog.airflow.AuthConfiguration;
import org.openmetadata.catalog.api.services.CreateDatabaseService;
import org.openmetadata.catalog.api.services.CreateMlModelService;
import org.openmetadata.catalog.entity.services.ServiceType;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.catalog.fernet.Fernet;
import org.openmetadata.catalog.fixtures.ConfigurationFixtures;
import org.openmetadata.catalog.metadataIngestion.SourceConfig;
import org.openmetadata.catalog.services.connections.database.MysqlConnection;
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.catalog.services.connections.mlModel.SklearnConnection;
import org.openmetadata.catalog.type.EntityReference;
@ExtendWith(MockitoExtension.class)
public class LocalSecretsManagerTest {
@ -125,6 +136,42 @@ public class LocalSecretsManagerTest {
assertEquals(OpenMetadataServerConnection.SecretsManagerProvider.LOCAL, secretsManager.getSecretsManagerProvider());
}
@ParameterizedTest
@MethodSource(
"org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceUnitTestParams#params")
public void testEncryptAndDecryptDbtConfigSource(
Object config,
EntityReference service,
Class<? extends EntityInterface> serviceClass,
PipelineType pipelineType,
boolean mustBeEncrypted) {
SourceConfig sourceConfigMock = mock(SourceConfig.class);
IngestionPipeline mockedIngestionPipeline = mock(IngestionPipeline.class);
when(mockedIngestionPipeline.getService()).thenReturn(service);
lenient().when(mockedIngestionPipeline.getPipelineType()).thenReturn(pipelineType);
if (mustBeEncrypted) {
when(mockedIngestionPipeline.getSourceConfig()).thenReturn(sourceConfigMock);
when(sourceConfigMock.getConfig()).thenReturn(config);
}
secretsManager.encryptOrDecryptDbtConfigSource(mockedIngestionPipeline, true);
secretsManager.encryptOrDecryptDbtConfigSource(mockedIngestionPipeline, false);
if (!mustBeEncrypted) {
verify(mockedIngestionPipeline, never()).setSourceConfig(any());
verify(sourceConfigMock, never()).setConfig(any());
} else {
ArgumentCaptor<Object> configCaptor = ArgumentCaptor.forClass(Object.class);
verify(mockedIngestionPipeline, times(4)).getSourceConfig();
verify(sourceConfigMock, times(2)).setConfig(configCaptor.capture());
assertEquals(configCaptor.getAllValues().get(0), config);
assertEquals(configCaptor.getAllValues().get(1), config);
}
}
@ParameterizedTest
@MethodSource("testDecryptAuthProviderConfigParams")
void testDecryptAuthProviderConfig(

View File

@ -19,6 +19,10 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseMetadataConfigType,
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
@ -81,6 +85,8 @@ class Workflow:
self._retrieve_service_connection_if_needed(metadata_config, service_type)
self._retrieve_dbt_config_source_if_needed(metadata_config, service_type)
self.source: Source = source_class.create(
self.config.source.dict(), metadata_config
)
@ -248,16 +254,13 @@ class Workflow:
) -> None:
"""
We override the current `serviceConnection` source config object if source workflow service already exists
in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it
from the service object itself through the default `SecretsManager`.
in OM. When secrets' manager is configured, we retrieve the service connection from the secrets' manager.
Otherwise, we get the service connection from the service object itself through the default `SecretsManager`.
:param metadata_config: OpenMetadata connection config
:param service_type: source workflow service type
:return:
"""
# We override the current serviceConnection source object if source workflow service already exists in OM.
# We retrieve the service connection from the secrets' manager when it is configured. Otherwise, we get it
# from the service object itself.
if service_type is not ServiceType.Metadata and not self._is_sample_source(
self.config.source.type
):
@ -273,6 +276,36 @@ class Workflow:
)
)
def _retrieve_dbt_config_source_if_needed(
self, metadata_config: OpenMetadataConnection, service_type: ServiceType
) -> None:
"""
We override the current `config` source config object if it is a metadata ingestion type. When secrets' manager
is configured, we retrieve the config from the secrets' manager. Otherwise, we get the config from the source
config object itself through the default `SecretsManager`.
:return:
"""
config = self.config.source.sourceConfig.config
if (
service_type is ServiceType.Database
and config
and config.type == DatabaseMetadataConfigType.DatabaseMetadata
):
metadata = OpenMetadata(config=metadata_config)
dbt_config_source: object = (
metadata.secrets_manager_client.retrieve_dbt_source_config(
self.config.source.sourceConfig,
f"{self.config.source.serviceName}_metadata",
)
)
if dbt_config_source and self.config.source.sourceConfig.config:
config_dict = self.config.source.sourceConfig.config.dict()
config_dict["dbtConfigSource"] = dbt_config_source
self.config.source.sourceConfig.config = (
DatabaseServiceMetadataPipeline.parse_obj(config_dict)
)
@staticmethod
def _is_sample_source(service_type):
def _is_sample_source(service_type: str) -> bool:
return service_type in SAMPLE_SOURCE

View File

@ -32,6 +32,7 @@ from metadata.generated.schema.entity.services.messagingService import Messaging
from metadata.generated.schema.entity.services.metadataService import MetadataService
from metadata.generated.schema.entity.services.mlmodelService import MlModelService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig
from metadata.generated.schema.security.client import (
auth0SSOClientConfig,
azureSSOClientConfig,
@ -96,7 +97,7 @@ class SecretsManager(metaclass=Singleton):
service_type: str,
) -> ServiceConnection:
"""
Retrieve the service connection from the secret manager to a given service connection object.
Retrieve the service connection from the secret manager from a given service connection object.
:param service: Service connection object e.g. DatabaseConnection
:param service_type: Service type e.g. databaseService
"""
@ -110,6 +111,17 @@ class SecretsManager(metaclass=Singleton):
"""
pass
@abstractmethod
def retrieve_dbt_source_config(
self, source_config: SourceConfig, pipeline_name: str
) -> object:
"""
Retrieve the DBT source config from the secret manager from a source config object.
:param source_config: SourceConfig object
:param pipeline_name: the pipeline's name
:return:
"""
@property
def secret_id_separator(self) -> str:
return "/"
@ -179,6 +191,9 @@ class LocalSecretsManager(SecretsManager):
"""
The LocalSecretsManager does not modify the OpenMetadataConnection object
"""
logger.debug(
f"Adding auth provider security config using {SecretsManagerProvider.local.name} secrets' manager"
)
pass
def retrieve_service_connection(
@ -189,8 +204,25 @@ class LocalSecretsManager(SecretsManager):
"""
The LocalSecretsManager does not modify the ServiceConnection object
"""
logger.debug(
f"Retrieving service connection from {SecretsManagerProvider.local.name} secrets' manager for {service_type} - {service.name}"
)
return ServiceConnection(__root__=service.connection)
def retrieve_dbt_source_config(
self, source_config: SourceConfig, pipeline_name: str
) -> object:
logger.debug(
f"Retrieving source_config from {SecretsManagerProvider.local.name} secrets' manager for {pipeline_name}"
)
return (
source_config.config.dbtConfigSource.dict()
if source_config
and source_config.config
and source_config.config.dbtConfigSource
else None
)
class AWSSecretsManager(SecretsManager):
def __init__(self, credentials: AWSCredentials, cluster_prefix: str):
@ -212,6 +244,9 @@ class AWSSecretsManager(SecretsManager):
service: ServiceWithConnectionType,
service_type: str,
) -> ServiceConnection:
logger.debug(
f"Retrieving service connection from {SecretsManagerProvider.aws.name} secrets' manager for {service_type} - {service.name}"
)
service_connection_type = service.serviceType.value
service_name = service.name.__root__
secret_id = self.build_secret_id(
@ -229,6 +264,9 @@ class AWSSecretsManager(SecretsManager):
return ServiceConnection(__root__=service_connection)
def add_auth_provider_security_config(self, config: OpenMetadataConnection) -> None:
logger.debug(
f"Adding auth provider security config using {SecretsManagerProvider.aws.name} secrets' manager"
)
if config.authProvider == AuthProvider.no_auth:
return config
secret_id = self.build_secret_id(
@ -244,6 +282,16 @@ class AWSSecretsManager(SecretsManager):
f"No client implemented for auth provider: [{config.authProvider}]"
)
def retrieve_dbt_source_config(
self, source_config: SourceConfig, pipeline_name: str
) -> object:
logger.debug(
f"Retrieving source_config from {SecretsManagerProvider.local.name} secrets' manager for {pipeline_name}"
)
secret_id = self.build_secret_id("database-metadata-pipeline", pipeline_name)
source_config_json = self._get_string_value(secret_id)
return json.loads(source_config_json) if source_config_json else None
def _get_string_value(self, name: str) -> str:
"""
:param name: The secret name to retrieve. Current stage is always retrieved.
@ -263,7 +311,11 @@ class AWSSecretsManager(SecretsManager):
raise
else:
if "SecretString" in response:
return response["SecretString"]
return (
response["SecretString"]
if response["SecretString"] != "null"
else None
)
else:
raise ValueError("[SecretString] not present in the response.")

View File

@ -35,13 +35,18 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
DbtHttpConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig
from metadata.generated.schema.security.client.googleSSOClientConfig import (
GoogleSSOClientConfig,
)
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
from metadata.utils.secrets_manager import (
AUTH_PROVIDER_MAPPING,
SecretsManager,
AWSSecretsManager,
Singleton,
get_secrets_manager,
)
@ -57,6 +62,11 @@ DATABASE_SERVICE = {
AUTH_PROVIDER_CONFIG = {"secretKey": "/fake/path"}
DBT_SOURCE_CONFIG = {
"dbtCatalogHttpPath": "/fake/path",
"dbtManifestHttpPath": "/fake/path",
}
class TestSecretsManager(TestCase):
service_type: str = "database"
@ -65,6 +75,7 @@ class TestSecretsManager(TestCase):
database_connection = MysqlConnection(**DATABASE_CONNECTION)
auth_provider_config = GoogleSSOClientConfig(**AUTH_PROVIDER_CONFIG)
om_connection: OpenMetadataConnection
dbt_source_config: DbtHttpConfig
@classmethod
def setUpClass(cls) -> None:
@ -75,6 +86,7 @@ class TestSecretsManager(TestCase):
authProvider=AuthProvider.google,
hostPort="http://localhost:8585/api",
)
cls.dbt_source_config = DbtHttpConfig.parse_obj(DBT_SOURCE_CONFIG)
@classmethod
def setUp(cls) -> None:
@ -107,6 +119,21 @@ class TestSecretsManager(TestCase):
self.assertEqual(self.auth_provider_config, actual_om_connection.securityConfig)
assert id(self.auth_provider_config) == id(actual_om_connection.securityConfig)
def test_local_manager_retrieve_dbt_source_config(self):
local_manager = get_secrets_manager(
self._build_open_metadata_connection(SecretsManagerProvider.local), None
)
source_config = SourceConfig()
source_config.config = DatabaseServiceMetadataPipeline(
dbtConfigSource=self.dbt_source_config
)
actual_dbt_source_config = local_manager.retrieve_dbt_source_config(
source_config, "test-pipeline"
)
self.assertEqual(self.dbt_source_config.dict(), actual_dbt_source_config)
@patch("metadata.utils.secrets_manager.boto3")
def test_aws_manager_add_service_config_connection(self, boto3_mock):
aws_manager = self._build_secret_manager(
@ -130,7 +157,9 @@ class TestSecretsManager(TestCase):
)
@patch("metadata.utils.secrets_manager.boto3")
def test_aws_manager_fails_add_auth_provider_security_config(self, mocked_boto3):
def test_aws_manager_fails_add_service_config_connection_when_not_stored(
self, mocked_boto3
):
aws_manager = self._build_secret_manager(mocked_boto3, {})
with self.assertRaises(ValueError) as value_error:
@ -157,13 +186,50 @@ class TestSecretsManager(TestCase):
assert id(self.auth_provider_config) != id(actual_om_connection.securityConfig)
@patch("metadata.utils.secrets_manager.boto3")
def test_aws_manager_fails_add_service_config_connection_when_not_stored(
def test_aws_manager_retrieve_dbt_source_config(self, boto3_mock):
aws_manager = self._build_secret_manager(
boto3_mock, {"SecretString": json.dumps(DBT_SOURCE_CONFIG)}
)
source_config = SourceConfig()
source_config.config = DatabaseServiceMetadataPipeline(
dbtConfigSource=self.dbt_source_config
)
actual_dbt_source_config = aws_manager.retrieve_dbt_source_config(
source_config, "test-pipeline"
)
expected_call = {
"SecretId": "/openmetadata/database-metadata-pipeline/test-pipeline"
}
aws_manager.secretsmanager_client.get_secret_value.assert_called_once_with(
**expected_call
)
self.assertEqual(self.dbt_source_config.dict(), actual_dbt_source_config)
@patch("metadata.utils.secrets_manager.boto3")
def test_aws_manager_fails_add_auth_provider_security_config(self, mocked_boto3):
aws_manager = self._build_secret_manager(mocked_boto3, {})
with self.assertRaises(ValueError) as value_error:
aws_manager.add_auth_provider_security_config(self.om_connection)
self.assertEqual(
"[SecretString] not present in the response.", value_error.exception
)
@patch("metadata.utils.secrets_manager.boto3")
def test_aws_manager_aws_manager_fails_retrieve_dbt_source_config_when_not_stored(
self, mocked_boto3
):
aws_manager = self._build_secret_manager(mocked_boto3, {})
source_config = SourceConfig()
source_config.config = DatabaseServiceMetadataPipeline(
dbtConfigSource=self.dbt_source_config
)
with self.assertRaises(ValueError) as value_error:
aws_manager.retrieve_service_connection(self.service, self.service_type)
aws_manager.retrieve_dbt_source_config(source_config, "test-pipeline")
self.assertEqual(
"[SecretString] not present in the response.", value_error.exception
)
@ -188,17 +254,16 @@ class TestSecretsManager(TestCase):
def _build_secret_manager(
self, mocked_boto3: Mock, expected_json: Dict[str, Any]
) -> SecretsManager:
) -> AWSSecretsManager:
self._init_boto3_mock(mocked_boto3, expected_json)
aws_manager = get_secrets_manager(
self._build_open_metadata_connection(SecretsManagerProvider.aws),
return AWSSecretsManager(
AWSCredentials(
awsAccessKeyId="fake_key",
awsSecretAccessKey="fake_access",
awsRegion="fake-region",
),
"openmetadata",
)
return aws_manager
@staticmethod
def _build_open_metadata_connection(

View File

@ -154,6 +154,7 @@
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"dbtConfigSource": {
"mask": true,
"title": "DBT Configuration Source",
"description": "Available sources to fetch DBT catalog and manifest files.",
"oneOf": [

View File

@ -11,7 +11,6 @@
"type": "object",
"properties": {
"config": {
"mask": true,
"oneOf": [
{
"$ref": "databaseServiceMetadataPipeline.json"