diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java index 71e5ee7dc01..526f8876708 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java @@ -21,9 +21,10 @@ import org.json.JSONObject; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline; import org.openmetadata.catalog.metadataIngestion.LogLevels; -import org.openmetadata.catalog.metadataIngestion.SourceConfig; import org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResource; +import org.openmetadata.catalog.secrets.SecretsManager; import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.Relationship; @@ -37,7 +38,9 @@ public class IngestionPipelineRepository extends EntityRepository decryptOrNullify(securityContext, ingestionPipeline)); return ingestionPipelines; } @@ -233,7 +236,7 @@ public class IngestionPipelineResource extends EntityResource response = pipelineServiceClient.killIngestion(ingestionPipeline); return Response.status(200, response.body()).build(); } @@ -574,4 +586,19 @@ public class IngestionPipelineResource extends EntityResource clazz = createConnectionConfigClass(connectionType, extractConnectionPackageName(serviceType)); - return JsonUtils.readValue(getSecret(secretName), clazz); + String connectionConfigJson = getSecret(secretName); + return NULL_SECRET_STRING.equals(connectionConfigJson) + ? null + : JsonUtils.readValue(getSecret(secretName), clazz); } } catch (ClassNotFoundException ex) { throw InvalidServiceConnectionException.byMessage( @@ -88,6 +92,25 @@ public class AWSSecretsManager extends SecretsManager { } } + @Override + public Object encryptOrDecryptDbtConfigSource(Object dbtConfigSource, String ingestionPipelineName, boolean encrypt) { + String secretName = buildSecretId(DATABASE_METADATA_PIPELINE_SECRET_ID_SUFFIX, ingestionPipelineName); + try { + if (encrypt) { + String dbtConfigSourceJson = JsonUtils.pojoToJson(dbtConfigSource); + upsertSecret(secretName, dbtConfigSourceJson); + return null; + } else { + String dbtConfigSourceJson = getSecret(secretName); + return NULL_SECRET_STRING.equals(dbtConfigSourceJson) + ? null + : JsonUtils.readValue(dbtConfigSourceJson, Object.class); + } + } catch (Exception e) { + throw SecretsManagerException.byMessage(getClass().getSimpleName(), secretName, e.getMessage()); + } + } + @Override public AirflowConfiguration encryptAirflowConnection(AirflowConfiguration airflowConfiguration) { OpenMetadataServerConnection.AuthProvider authProvider = @@ -148,7 +171,7 @@ public class AWSSecretsManager extends SecretsManager { CreateSecretRequest.builder() .name(secretName) .description("This secret was created by OpenMetadata") - .secretString(secretValue) + .secretString(Objects.isNull(secretValue) ? NULL_SECRET_STRING : secretValue) .build(); this.secretsClient.createSecret(createSecretRequest); } @@ -158,7 +181,7 @@ public class AWSSecretsManager extends SecretsManager { UpdateSecretRequest.builder() .secretId(secretName) .description("This secret was created by OpenMetadata") - .secretString(secretValue) + .secretString(Objects.isNull(secretValue) ? NULL_SECRET_STRING : secretValue) .build(); this.secretsClient.updateSecret(updateSecretRequest); } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/LocalSecretsManager.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/LocalSecretsManager.java index d20c5d14c7f..aca3e5fce02 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/LocalSecretsManager.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/LocalSecretsManager.java @@ -56,6 +56,11 @@ public class LocalSecretsManager extends SecretsManager { } } + @Override + public Object encryptOrDecryptDbtConfigSource(Object dbtConfigSource, String ingestionPipelineName, boolean encrypt) { + return dbtConfigSource; + } + @Override public AirflowConfiguration encryptAirflowConnection(AirflowConfiguration airflowConfiguration) { return airflowConfiguration; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/SecretsManager.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/SecretsManager.java index a965930d7ab..d2e68ce4812 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/SecretsManager.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/SecretsManager.java @@ -18,11 +18,16 @@ import static java.util.Objects.isNull; import com.google.common.base.CaseFormat; import java.util.List; import lombok.Getter; +import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.airflow.AirflowConfiguration; import org.openmetadata.catalog.airflow.AuthConfiguration; import org.openmetadata.catalog.entity.services.ServiceType; +import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType; import org.openmetadata.catalog.exception.SecretsManagerException; +import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline; import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection; +import org.openmetadata.catalog.util.JsonUtils; public abstract class SecretsManager { @@ -43,6 +48,28 @@ public abstract class SecretsManager { public abstract Object encryptOrDecryptServiceConnectionConfig( Object connectionConfig, String connectionType, String connectionName, ServiceType serviceType, boolean encrypt); + abstract Object encryptOrDecryptDbtConfigSource( + Object dbtConfigSource, String ingestionPipelineName, boolean encrypt); + + public void encryptOrDecryptDbtConfigSource(IngestionPipeline ingestionPipeline, boolean encrypt) { + encryptOrDecryptDbtConfigSource(ingestionPipeline, ingestionPipeline.getService().getType(), encrypt); + } + + public void encryptOrDecryptDbtConfigSource( + IngestionPipeline ingestionPipeline, String serviceType, boolean encrypt) { + // DatabaseServiceMetadataPipeline contains dbtConfigSource and must be encrypted + if (serviceType.equals(Entity.DATABASE_SERVICE) + && ingestionPipeline.getPipelineType().equals(PipelineType.METADATA)) { + DatabaseServiceMetadataPipeline databaseServiceMetadataPipeline = + JsonUtils.convertValue( + ingestionPipeline.getSourceConfig().getConfig(), DatabaseServiceMetadataPipeline.class); + databaseServiceMetadataPipeline.setDbtConfigSource( + encryptOrDecryptDbtConfigSource( + databaseServiceMetadataPipeline.getDbtConfigSource(), ingestionPipeline.getName(), encrypt)); + ingestionPipeline.getSourceConfig().setConfig(databaseServiceMetadataPipeline); + } + } + public OpenMetadataServerConnection decryptServerConnection(AirflowConfiguration airflowConfiguration) { OpenMetadataServerConnection.AuthProvider authProvider = OpenMetadataServerConnection.AuthProvider.fromValue(airflowConfiguration.getAuthProvider()); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/SecretsManagerMigrationService.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/SecretsManagerMigrationService.java index 8e2facc5d2e..ee6cd202b9f 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/SecretsManagerMigrationService.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/secrets/SecretsManagerMigrationService.java @@ -26,16 +26,19 @@ import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.ServiceConnectionEntityInterface; import org.openmetadata.catalog.ServiceEntityInterface; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType; import org.openmetadata.catalog.exception.SecretsManagerMigrationException; import org.openmetadata.catalog.jdbi3.ChangeEventRepository; import org.openmetadata.catalog.jdbi3.IngestionPipelineRepository; import org.openmetadata.catalog.jdbi3.ListFilter; import org.openmetadata.catalog.jdbi3.ServiceEntityRepository; +import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline; import org.openmetadata.catalog.resources.CollectionRegistry; import org.openmetadata.catalog.resources.events.EventResource; import org.openmetadata.catalog.resources.services.ServiceEntityResource; import org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResource; import org.openmetadata.catalog.util.EntityUtil; +import org.openmetadata.catalog.util.JsonUtils; /** * Migration service from LocalSecretManager to configured one. @@ -129,15 +132,17 @@ public class SecretsManagerMigrationService { service.getName(), repository.getServiceType(), false)); - newSecretManager.encryptOrDecryptServiceConnectionConfig( - service.getConnection().getConfig(), - service.getServiceType().value(), - service.getName(), - repository.getServiceType(), - true); + service + .getConnection() + .setConfig( + newSecretManager.encryptOrDecryptServiceConnectionConfig( + service.getConnection().getConfig(), + service.getServiceType().value(), + service.getName(), + repository.getServiceType(), + true)); // avoid reaching secrets manager quotas Thread.sleep(100); - service.getConnection().setConfig(null); repository.dao.update(service); } catch (IOException | InterruptedException e) { throw new SecretsManagerMigrationException(e.getMessage(), e.getCause()); @@ -174,7 +179,14 @@ public class SecretsManagerMigrationService { private void migrateIngestionPipelines(IngestionPipeline ingestionPipeline) { try { IngestionPipeline ingestion = ingestionPipelineRepository.dao.findEntityById(ingestionPipeline.getId()); - ingestion.getOpenMetadataServerConnection().setSecurityConfig(null); + if (hasSecurityConfig(ingestionPipeline)) { + ingestion.getOpenMetadataServerConnection().setSecurityConfig(null); + } + if (hasDbtConfig(ingestionPipeline)) { + // we have to decrypt using the old secrets manager and encrypt again with the new one + oldSecretManager.encryptOrDecryptDbtConfigSource(ingestionPipeline, false); + newSecretManager.encryptOrDecryptDbtConfigSource(ingestionPipeline, true); + } ingestionPipelineRepository.dao.update(ingestion); } catch (IOException e) { throw new SecretsManagerMigrationException(e.getMessage(), e.getCause()); @@ -191,16 +203,28 @@ public class SecretsManagerMigrationService { ingestionPipelineRepository.dao.listCount(new ListFilter()), null) .getData().stream() - .filter( - ingestionPipeline -> - !Objects.isNull(ingestionPipeline.getOpenMetadataServerConnection()) - && !Objects.isNull(ingestionPipeline.getOpenMetadataServerConnection().getSecurityConfig())) + .filter(this::hasSecurityConfig) + .filter(this::hasDbtConfig) .collect(Collectors.toList()); } catch (IOException e) { throw new SecretsManagerMigrationException(e.getMessage(), e.getCause()); } } + private boolean hasSecurityConfig(IngestionPipeline ingestionPipeline) { + return !Objects.isNull(ingestionPipeline.getOpenMetadataServerConnection()) + && !Objects.isNull(ingestionPipeline.getOpenMetadataServerConnection().getSecurityConfig()); + } + + private boolean hasDbtConfig(IngestionPipeline ingestionPipeline) { + return ingestionPipeline.getService().getType().equals(Entity.DATABASE_SERVICE) + && ingestionPipeline.getPipelineType().equals(PipelineType.METADATA) + && JsonUtils.convertValue( + ingestionPipeline.getSourceConfig().getConfig(), DatabaseServiceMetadataPipeline.class) + .getDbtConfigSource() + != null; + } + /** This method delete all the change events which could contain connection config parameters for services */ private void deleteChangeEventsForServices() { connectionTypeRepositoriesMap.values().stream() diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index bf523db48a5..c3d651e1e32 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -154,6 +154,7 @@ "$ref": "../type/filterPattern.json#/definitions/filterPattern" }, "dbtConfigSource": { + "mask": true, "title": "DBT Configuration Source", "description": "Available sources to fetch DBT catalog and manifest files.", "oneOf": [ diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/workflow.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/workflow.json index d20b4374b02..0e3e2d4cb81 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/workflow.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/workflow.json @@ -11,7 +11,6 @@ "type": "object", "properties": { "config": { - "mask": true, "oneOf": [ { "$ref": "databaseServiceMetadataPipeline.json" diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepositoryTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepositoryTest.java new file mode 100644 index 00000000000..201614c6ded --- /dev/null +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepositoryTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2022 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.LinkedHashMap; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.entity.services.DatabaseService; +import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline; +import org.openmetadata.catalog.metadataIngestion.SourceConfig; +import org.openmetadata.catalog.secrets.SecretsManager; + +@ExtendWith(MockitoExtension.class) +public class IngestionPipelineRepositoryTest { + + @Mock protected CollectionDAO collectionDAO; + + @Mock protected SecretsManager secretsManager; + + @Mock protected CollectionDAO.IngestionPipelineDAO dao; + + protected IngestionPipelineRepository ingestionPipelineRepository; + + @BeforeEach + void beforeEach() { + when(collectionDAO.ingestionPipelineDAO()).thenReturn(dao); + ingestionPipelineRepository = new IngestionPipelineRepository(collectionDAO, secretsManager); + } + + @AfterEach + void afterEach() { + reset(secretsManager); + } + + @Test + void testStoreEntityCallCorrectlyLocalSecretManager() throws IOException { + IngestionPipeline ingestionPipeline = initStoreEntityTest(true); + + ArgumentCaptor serviceTypeCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor ingestionPipelineStringCaptor = ArgumentCaptor.forClass(String.class); + + ingestionPipelineRepository.storeEntity(ingestionPipeline, true); + + verify(secretsManager) + .encryptOrDecryptDbtConfigSource(any(IngestionPipeline.class), serviceTypeCaptor.capture(), eq(true)); + verify(dao).update(isNull(), ingestionPipelineStringCaptor.capture()); + + assertEquals("databaseService", serviceTypeCaptor.getValue()); + assertEquals( + "{\"name\":\"testPipeline\",\"sourceConfig\":{\"config\":{\"type\":\"DatabaseMetadata\",\"markDeletedTables\":true,\"includeTables\":true,\"includeViews\":true,\"includeTags\":true,\"dbtConfigSource\":{}}},\"loggerLevel\":\"INFO\",\"enabled\":true,\"version\":0.1,\"deleted\":false}", + ingestionPipelineStringCaptor.getValue()); + } + + @Test + void testStoreEntityCallCorrectlyAWSSecretManager() throws IOException { + IngestionPipeline ingestionPipeline = initStoreEntityTest(false); + + ArgumentCaptor serviceTypeCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor ingestionPipelineStringCaptor = ArgumentCaptor.forClass(String.class); + + ingestionPipelineRepository.storeEntity(ingestionPipeline, true); + + verify(secretsManager) + .encryptOrDecryptDbtConfigSource(any(IngestionPipeline.class), serviceTypeCaptor.capture(), eq(true)); + verify(dao).update(isNull(), ingestionPipelineStringCaptor.capture()); + + assertEquals("databaseService", serviceTypeCaptor.getValue()); + assertEquals( + "{\"name\":\"testPipeline\",\"sourceConfig\":{\"config\":{\"type\":\"DatabaseMetadata\",\"markDeletedTables\":true,\"includeTables\":true,\"includeViews\":true,\"includeTags\":true}},\"loggerLevel\":\"INFO\",\"enabled\":true,\"version\":0.1,\"deleted\":false}", + ingestionPipelineStringCaptor.getValue()); + } + + private IngestionPipeline initStoreEntityTest(boolean isLocal) { + when(secretsManager.isLocal()).thenReturn(isLocal); + return new IngestionPipeline() + .withName("testPipeline") + .withService(new DatabaseService().getEntityReference().withType(Entity.DATABASE_SERVICE)) + .withSourceConfig( + new SourceConfig() + .withConfig(new DatabaseServiceMetadataPipeline().withDbtConfigSource(new LinkedHashMap<>()))); + } +} diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceTest.java index 10f0d193b50..77e90eb0a23 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceTest.java @@ -158,7 +158,11 @@ public class IngestionPipelineResourceTest extends EntityResourceTest authHeaders) { assertEquals(expected.getDisplayName(), updated.getDisplayName()); assertReference(expected.getService(), updated.getService()); - assertNull(updated.getSourceConfig().getConfig()); + if (Entity.DATABASE_SERVICE.equals(updated.getService().getType())) { + assertNull( + JsonUtils.convertValue(updated.getSourceConfig().getConfig(), DatabaseServiceMetadataPipeline.class) + .getDbtConfigSource()); + } } @Override diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTest.java index 1703e91733b..7ebf06f813d 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTest.java @@ -14,10 +14,17 @@ package org.openmetadata.catalog.resources.services.ingestionpipelines; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -29,14 +36,23 @@ import javax.ws.rs.core.SecurityContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.MockedConstruction; import org.mockito.MockedConstruction.Context; import org.mockito.junit.jupiter.MockitoExtension; import org.openmetadata.catalog.CatalogApplicationConfig; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.EntityInterface; import org.openmetadata.catalog.airflow.AirflowRESTClient; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType; import org.openmetadata.catalog.jdbi3.CollectionDAO; +import org.openmetadata.catalog.jdbi3.EntityDAO; +import org.openmetadata.catalog.jdbi3.EntityRepository; +import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline; +import org.openmetadata.catalog.metadataIngestion.SourceConfig; import org.openmetadata.catalog.secrets.SecretsManager; import org.openmetadata.catalog.security.Authorizer; import org.openmetadata.catalog.type.EntityReference; @@ -47,6 +63,8 @@ public class IngestionPipelineResourceUnitTest { private static final UUID DAG_ID = UUID.randomUUID(); + private static final String PIPELINE_NAME = "service_test"; + private IngestionPipelineResource ingestionPipelineResource; @Mock SecurityContext securityContext; @@ -57,11 +75,13 @@ public class IngestionPipelineResourceUnitTest { @Mock CatalogApplicationConfig catalogApplicationConfig; - @Mock IngestionPipeline ingestionPipeline; + @Mock SecretsManager secretsManager; + + @Mock CollectionDAO.IngestionPipelineDAO entityDAO; @BeforeEach - void setUp() throws IOException { - CollectionDAO.IngestionPipelineDAO entityDAO = mock(CollectionDAO.IngestionPipelineDAO.class); + void setUp() { + reset(entityDAO, collectionDAO, secretsManager, authorizer); CollectionDAO.EntityRelationshipDAO relationshipDAO = mock(CollectionDAO.EntityRelationshipDAO.class); CollectionDAO.EntityRelationshipRecord entityRelationshipRecord = mock(CollectionDAO.EntityRelationshipRecord.class); @@ -70,14 +90,15 @@ public class IngestionPipelineResourceUnitTest { when(relationshipDAO.findFrom(any(), any(), anyInt())).thenReturn(List.of(entityRelationshipRecord)); when(collectionDAO.ingestionPipelineDAO()).thenReturn(entityDAO); when(collectionDAO.relationshipDAO()).thenReturn(relationshipDAO); - when(entityDAO.findEntityById(any(), any())).thenReturn(ingestionPipeline); - when(entityDAO.findEntityReferenceById(any(), any())).thenReturn(mock(EntityReference.class)); - when(ingestionPipeline.getId()).thenReturn(DAG_ID); - ingestionPipelineResource = new IngestionPipelineResource(collectionDAO, authorizer, mock(SecretsManager.class)); + ingestionPipelineResource = new IngestionPipelineResource(collectionDAO, authorizer, secretsManager); } @Test - public void testLastIngestionLogsAreRetrieved() throws IOException { + void testLastIngestionLogsAreRetrieved() throws IOException { + IngestionPipeline ingestionPipeline = mock(IngestionPipeline.class); + when(ingestionPipeline.getId()).thenReturn(DAG_ID); + when(entityDAO.findEntityById(any(), any())).thenReturn(ingestionPipeline); + when(entityDAO.findEntityReferenceById(any(), any())).thenReturn(mock(EntityReference.class)); Map expectedMap = Map.of("task", "log"); try (MockedConstruction mocked = mockConstruction(AirflowRESTClient.class, this::preparePipelineServiceClient)) { @@ -89,7 +110,107 @@ public class IngestionPipelineResourceUnitTest { } } + @ParameterizedTest + @MethodSource( + "org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceUnitTestParams#params") + void testGetIsEncryptedWhenSecretManagerIsConfigured( + Object config, + EntityReference service, + Class serviceClass, + PipelineType pipelineType, + boolean mustBeEncrypted) + throws IOException { + UUID id = UUID.randomUUID(); + + IngestionPipeline ingestionPipeline = buildIngestionPipeline(config, pipelineType, id); + + Entity.registerEntity(serviceClass, service.getType(), mock(EntityDAO.class), mock(EntityRepository.class)); + + doAnswer( + invocation -> { + if (mustBeEncrypted) { + IngestionPipeline arg0 = invocation.getArgument(0); + ((DatabaseServiceMetadataPipeline) arg0.getSourceConfig().getConfig()).setDbtConfigSource(null); + } + return null; + }) + .when(secretsManager) + .encryptOrDecryptDbtConfigSource(any(IngestionPipeline.class), anyBoolean()); + + when(entityDAO.findEntityById(eq(id), any())).thenReturn(ingestionPipeline); + when(entityDAO.findEntityReferenceById(any(), any())).thenReturn(service); + + IngestionPipeline actualIngestionPipeline = ingestionPipelineResource.get(null, securityContext, id, null, null); + + verifySecretManagerIsCalled(mustBeEncrypted, ingestionPipeline); + assertIngestionPipelineDbtConfigIsEncrypted(mustBeEncrypted, actualIngestionPipeline); + } + + @ParameterizedTest + @MethodSource( + "org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceUnitTestParams#params") + void testGetByNameIsEncryptedWhenSecretManagerIsConfigured( + Object config, + EntityReference service, + Class serviceClass, + PipelineType pipelineType, + boolean mustBeEncrypted) + throws IOException { + UUID id = UUID.randomUUID(); + + IngestionPipeline ingestionPipeline = buildIngestionPipeline(config, pipelineType, id); + + Entity.registerEntity(serviceClass, service.getType(), mock(EntityDAO.class), mock(EntityRepository.class)); + + when(entityDAO.findEntityByName(eq(PIPELINE_NAME), any())).thenReturn(ingestionPipeline); + when(entityDAO.findEntityReferenceById(any(), any())).thenReturn(service); + + doAnswer( + invocation -> { + if (mustBeEncrypted) { + IngestionPipeline arg0 = invocation.getArgument(0); + ((DatabaseServiceMetadataPipeline) arg0.getSourceConfig().getConfig()).setDbtConfigSource(null); + } + return null; + }) + .when(secretsManager) + .encryptOrDecryptDbtConfigSource(any(IngestionPipeline.class), anyBoolean()); + + IngestionPipeline actualIngestionPipeline = + ingestionPipelineResource.getByName(null, PIPELINE_NAME, securityContext, null, null); + + verifySecretManagerIsCalled(mustBeEncrypted, ingestionPipeline); + assertIngestionPipelineDbtConfigIsEncrypted(mustBeEncrypted, actualIngestionPipeline); + } + private void preparePipelineServiceClient(AirflowRESTClient mockPipelineServiceClient, Context context) { when(mockPipelineServiceClient.getLastIngestionLogs(any())).thenReturn(Map.of("task", "log")); } + + private IngestionPipeline buildIngestionPipeline(Object config, PipelineType pipelineType, UUID id) { + return new IngestionPipeline() + .withId(id) + .withPipelineType(pipelineType) + .withSourceConfig(new SourceConfig().withConfig(config)) + .withName(PIPELINE_NAME); + } + + private void verifySecretManagerIsCalled(boolean mustBeEncrypted, IngestionPipeline ingestionPipeline) { + if (mustBeEncrypted) { + verify(secretsManager).encryptOrDecryptDbtConfigSource(ingestionPipeline, false); + } else { + verify(secretsManager, never()).encryptOrDecryptDbtConfigSource(any(), any(), anyBoolean()); + } + } + + private void assertIngestionPipelineDbtConfigIsEncrypted( + boolean mustBeEncrypted, IngestionPipeline actualIngestionPipeline) { + if (mustBeEncrypted) { + assertNull( + ((DatabaseServiceMetadataPipeline) actualIngestionPipeline.getSourceConfig().getConfig()) + .getDbtConfigSource()); + } else { + assertNotNull(actualIngestionPipeline.getSourceConfig().getConfig()); + } + } } diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTestParams.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTestParams.java new file mode 100644 index 00000000000..259310c8b78 --- /dev/null +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/ingestionpipelines/IngestionPipelineResourceUnitTestParams.java @@ -0,0 +1,124 @@ +/* + * Copyright 2022 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.resources.services.ingestionpipelines; + +import java.util.LinkedHashMap; +import java.util.stream.Stream; +import org.junit.jupiter.params.provider.Arguments; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.api.services.CreateDashboardService; +import org.openmetadata.catalog.api.services.CreateDatabaseService; +import org.openmetadata.catalog.api.services.CreateMessagingService; +import org.openmetadata.catalog.api.services.CreateMlModelService; +import org.openmetadata.catalog.api.services.CreatePipelineService; +import org.openmetadata.catalog.entity.services.DashboardService; +import org.openmetadata.catalog.entity.services.DatabaseService; +import org.openmetadata.catalog.entity.services.MessagingService; +import org.openmetadata.catalog.entity.services.MlModelService; +import org.openmetadata.catalog.entity.services.PipelineService; +import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType; +import org.openmetadata.catalog.metadataIngestion.DashboardServiceMetadataPipeline; +import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline; +import org.openmetadata.catalog.metadataIngestion.DatabaseServiceProfilerPipeline; +import org.openmetadata.catalog.metadataIngestion.DatabaseServiceQueryLineagePipeline; +import org.openmetadata.catalog.metadataIngestion.DatabaseServiceQueryUsagePipeline; +import org.openmetadata.catalog.metadataIngestion.MessagingServiceMetadataPipeline; +import org.openmetadata.catalog.metadataIngestion.MlmodelServiceMetadataPipeline; +import org.openmetadata.catalog.metadataIngestion.PipelineServiceMetadataPipeline; +import org.openmetadata.catalog.type.EntityReference; + +public class IngestionPipelineResourceUnitTestParams { + + public static final EntityReference DATABASE_SERVICE_ENTITY = + new DatabaseService() + .withServiceType(CreateDatabaseService.DatabaseServiceType.Mysql) + .getEntityReference() + .withType(Entity.DATABASE_SERVICE); + + public static final EntityReference PIPELINE_SERVICE_ENTITY = + new PipelineService() + .withServiceType(CreatePipelineService.PipelineServiceType.Airbyte) + .getEntityReference() + .withType(Entity.PIPELINE_SERVICE); + + public static final EntityReference MESSAGING_SERVICE_ENTITY = + new MessagingService() + .withServiceType(CreateMessagingService.MessagingServiceType.Kafka) + .getEntityReference() + .withType(Entity.MESSAGING_SERVICE); + + public static final EntityReference DASHBOARD_SERVICE_ENTITY = + new DashboardService() + .withServiceType(CreateDashboardService.DashboardServiceType.Looker) + .getEntityReference() + .withType(Entity.DASHBOARD_SERVICE); + + public static final EntityReference MLMODEL_SERVICE_ENTITY = + new MlModelService() + .withServiceType(CreateMlModelService.MlModelServiceType.Mlflow) + .getEntityReference() + .withType(Entity.MLMODEL_SERVICE); + + public static Stream params() { + return Stream.of( + Arguments.of( + new DatabaseServiceMetadataPipeline().withDbtConfigSource(new LinkedHashMap<>()), + DATABASE_SERVICE_ENTITY, + DatabaseService.class, + PipelineType.METADATA, + true), + Arguments.of( + new DatabaseServiceQueryUsagePipeline(), + DATABASE_SERVICE_ENTITY, + DatabaseService.class, + PipelineType.USAGE, + false), + Arguments.of( + new DatabaseServiceQueryLineagePipeline(), + DATABASE_SERVICE_ENTITY, + DatabaseService.class, + PipelineType.LINEAGE, + false), + Arguments.of( + new DashboardServiceMetadataPipeline(), + DASHBOARD_SERVICE_ENTITY, + DatabaseService.class, + PipelineType.METADATA, + false), + Arguments.of( + new MessagingServiceMetadataPipeline(), + MESSAGING_SERVICE_ENTITY, + DatabaseService.class, + PipelineType.METADATA, + false), + Arguments.of( + new DatabaseServiceProfilerPipeline(), + DATABASE_SERVICE_ENTITY, + DatabaseService.class, + PipelineType.PROFILER, + false), + Arguments.of( + new PipelineServiceMetadataPipeline(), + PIPELINE_SERVICE_ENTITY, + DatabaseService.class, + PipelineType.METADATA, + false), + Arguments.of( + new MlmodelServiceMetadataPipeline(), + MLMODEL_SERVICE_ENTITY, + DatabaseService.class, + PipelineType.METADATA, + false)); + } +} diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/secrets/AWSSecretsManagerTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/secrets/AWSSecretsManagerTest.java index b6648a32c07..322eb485e09 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/secrets/AWSSecretsManagerTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/secrets/AWSSecretsManagerTest.java @@ -18,8 +18,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -45,13 +48,19 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.openmetadata.catalog.EntityInterface; import org.openmetadata.catalog.airflow.AirflowConfiguration; import org.openmetadata.catalog.airflow.AuthConfiguration; import org.openmetadata.catalog.api.services.CreateDatabaseService; import org.openmetadata.catalog.entity.services.ServiceType; +import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType; import org.openmetadata.catalog.fixtures.ConfigurationFixtures; +import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline; +import org.openmetadata.catalog.metadataIngestion.SourceConfig; import org.openmetadata.catalog.services.connections.database.MysqlConnection; import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection; +import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.util.JsonUtils; import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient; import software.amazon.awssdk.services.secretsmanager.model.CreateSecretRequest; @@ -178,6 +187,46 @@ public class AWSSecretsManagerTest { assertEquals(OpenMetadataServerConnection.SecretsManagerProvider.AWS, secretsManager.getSecretsManagerProvider()); } + @ParameterizedTest + @MethodSource( + "org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceUnitTestParams#params") + public void testEncryptAndDecryptDbtConfigSource( + Object config, + EntityReference service, + Class serviceClass, + PipelineType pipelineType, + boolean mustBeEncrypted) { + SourceConfig sourceConfigMock = mock(SourceConfig.class); + IngestionPipeline mockedIngestionPipeline = mock(IngestionPipeline.class); + + when(mockedIngestionPipeline.getService()).thenReturn(service); + lenient().when(mockedIngestionPipeline.getPipelineType()).thenReturn(pipelineType); + + if (mustBeEncrypted) { + when(mockedIngestionPipeline.getSourceConfig()).thenReturn(sourceConfigMock); + when(sourceConfigMock.getConfig()).thenReturn(config); + when(mockedIngestionPipeline.getName()).thenReturn("test-pipeline"); + when(secretsManagerClient.getSecretValue(any(GetSecretValueRequest.class))) + .thenReturn(GetSecretValueResponse.builder().secretString("{}}").build()); + } + + secretsManager.encryptOrDecryptDbtConfigSource(mockedIngestionPipeline, true); + + secretsManager.encryptOrDecryptDbtConfigSource(mockedIngestionPipeline, false); + + if (!mustBeEncrypted) { + verify(mockedIngestionPipeline, never()).setSourceConfig(any()); + verify(sourceConfigMock, never()).setConfig(any()); + } else { + ArgumentCaptor configCaptor = ArgumentCaptor.forClass(Object.class); + verify(mockedIngestionPipeline, times(4)).getSourceConfig(); + verify(sourceConfigMock, times(2)).setConfig(configCaptor.capture()); + assertNull(((DatabaseServiceMetadataPipeline) configCaptor.getAllValues().get(0)).getDbtConfigSource()); + assertEquals(configCaptor.getAllValues().get(1), config); + assertNotSame(configCaptor.getAllValues().get(1), config); + } + } + private void testEncryptDecryptServiceConnection(boolean decrypt) { MysqlConnection mysqlConnection = new MysqlConnection(); mysqlConnection.setPassword("openmetadata-test"); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/secrets/LocalSecretsManagerTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/secrets/LocalSecretsManagerTest.java index 89c295278aa..da57fecc3a4 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/secrets/LocalSecretsManagerTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/secrets/LocalSecretsManagerTest.java @@ -15,10 +15,15 @@ package org.openmetadata.catalog.secrets; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; import static org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.AuthProvider.AUTH_0; import static org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.AuthProvider.AZURE; import static org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection.AuthProvider.CUSTOM_OIDC; @@ -35,18 +40,24 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.openmetadata.catalog.EntityInterface; import org.openmetadata.catalog.airflow.AirflowConfiguration; import org.openmetadata.catalog.airflow.AuthConfiguration; import org.openmetadata.catalog.api.services.CreateDatabaseService; import org.openmetadata.catalog.api.services.CreateMlModelService; import org.openmetadata.catalog.entity.services.ServiceType; +import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType; import org.openmetadata.catalog.fernet.Fernet; import org.openmetadata.catalog.fixtures.ConfigurationFixtures; +import org.openmetadata.catalog.metadataIngestion.SourceConfig; import org.openmetadata.catalog.services.connections.database.MysqlConnection; import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection; import org.openmetadata.catalog.services.connections.mlModel.SklearnConnection; +import org.openmetadata.catalog.type.EntityReference; @ExtendWith(MockitoExtension.class) public class LocalSecretsManagerTest { @@ -125,6 +136,42 @@ public class LocalSecretsManagerTest { assertEquals(OpenMetadataServerConnection.SecretsManagerProvider.LOCAL, secretsManager.getSecretsManagerProvider()); } + @ParameterizedTest + @MethodSource( + "org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResourceUnitTestParams#params") + public void testEncryptAndDecryptDbtConfigSource( + Object config, + EntityReference service, + Class serviceClass, + PipelineType pipelineType, + boolean mustBeEncrypted) { + SourceConfig sourceConfigMock = mock(SourceConfig.class); + IngestionPipeline mockedIngestionPipeline = mock(IngestionPipeline.class); + + when(mockedIngestionPipeline.getService()).thenReturn(service); + lenient().when(mockedIngestionPipeline.getPipelineType()).thenReturn(pipelineType); + + if (mustBeEncrypted) { + when(mockedIngestionPipeline.getSourceConfig()).thenReturn(sourceConfigMock); + when(sourceConfigMock.getConfig()).thenReturn(config); + } + + secretsManager.encryptOrDecryptDbtConfigSource(mockedIngestionPipeline, true); + + secretsManager.encryptOrDecryptDbtConfigSource(mockedIngestionPipeline, false); + + if (!mustBeEncrypted) { + verify(mockedIngestionPipeline, never()).setSourceConfig(any()); + verify(sourceConfigMock, never()).setConfig(any()); + } else { + ArgumentCaptor configCaptor = ArgumentCaptor.forClass(Object.class); + verify(mockedIngestionPipeline, times(4)).getSourceConfig(); + verify(sourceConfigMock, times(2)).setConfig(configCaptor.capture()); + assertEquals(configCaptor.getAllValues().get(0), config); + assertEquals(configCaptor.getAllValues().get(1), config); + } + } + @ParameterizedTest @MethodSource("testDecryptAuthProviderConfigParams") void testDecryptAuthProviderConfig( diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index b1b798db898..0787347202c 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -19,6 +19,10 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, ) from metadata.generated.schema.entity.services.serviceType import ServiceType +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseMetadataConfigType, + DatabaseServiceMetadataPipeline, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) @@ -81,6 +85,8 @@ class Workflow: self._retrieve_service_connection_if_needed(metadata_config, service_type) + self._retrieve_dbt_config_source_if_needed(metadata_config, service_type) + self.source: Source = source_class.create( self.config.source.dict(), metadata_config ) @@ -248,16 +254,13 @@ class Workflow: ) -> None: """ We override the current `serviceConnection` source config object if source workflow service already exists - in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it - from the service object itself through the default `SecretsManager`. + in OM. When secrets' manager is configured, we retrieve the service connection from the secrets' manager. + Otherwise, we get the service connection from the service object itself through the default `SecretsManager`. :param metadata_config: OpenMetadata connection config :param service_type: source workflow service type :return: """ - # We override the current serviceConnection source object if source workflow service already exists in OM. - # We retrieve the service connection from the secrets' manager when it is configured. Otherwise, we get it - # from the service object itself. if service_type is not ServiceType.Metadata and not self._is_sample_source( self.config.source.type ): @@ -273,6 +276,36 @@ class Workflow: ) ) + def _retrieve_dbt_config_source_if_needed( + self, metadata_config: OpenMetadataConnection, service_type: ServiceType + ) -> None: + """ + We override the current `config` source config object if it is a metadata ingestion type. When secrets' manager + is configured, we retrieve the config from the secrets' manager. Otherwise, we get the config from the source + config object itself through the default `SecretsManager`. + + :return: + """ + config = self.config.source.sourceConfig.config + if ( + service_type is ServiceType.Database + and config + and config.type == DatabaseMetadataConfigType.DatabaseMetadata + ): + metadata = OpenMetadata(config=metadata_config) + dbt_config_source: object = ( + metadata.secrets_manager_client.retrieve_dbt_source_config( + self.config.source.sourceConfig, + f"{self.config.source.serviceName}_metadata", + ) + ) + if dbt_config_source and self.config.source.sourceConfig.config: + config_dict = self.config.source.sourceConfig.config.dict() + config_dict["dbtConfigSource"] = dbt_config_source + self.config.source.sourceConfig.config = ( + DatabaseServiceMetadataPipeline.parse_obj(config_dict) + ) + @staticmethod - def _is_sample_source(service_type): + def _is_sample_source(service_type: str) -> bool: return service_type in SAMPLE_SOURCE diff --git a/ingestion/src/metadata/utils/secrets_manager.py b/ingestion/src/metadata/utils/secrets_manager.py index 4885aa91f74..52cdef99315 100644 --- a/ingestion/src/metadata/utils/secrets_manager.py +++ b/ingestion/src/metadata/utils/secrets_manager.py @@ -32,6 +32,7 @@ from metadata.generated.schema.entity.services.messagingService import Messaging from metadata.generated.schema.entity.services.metadataService import MetadataService from metadata.generated.schema.entity.services.mlmodelService import MlModelService from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.metadataIngestion.workflow import SourceConfig from metadata.generated.schema.security.client import ( auth0SSOClientConfig, azureSSOClientConfig, @@ -96,7 +97,7 @@ class SecretsManager(metaclass=Singleton): service_type: str, ) -> ServiceConnection: """ - Retrieve the service connection from the secret manager to a given service connection object. + Retrieve the service connection from the secret manager from a given service connection object. :param service: Service connection object e.g. DatabaseConnection :param service_type: Service type e.g. databaseService """ @@ -110,6 +111,17 @@ class SecretsManager(metaclass=Singleton): """ pass + @abstractmethod + def retrieve_dbt_source_config( + self, source_config: SourceConfig, pipeline_name: str + ) -> object: + """ + Retrieve the DBT source config from the secret manager from a source config object. + :param source_config: SourceConfig object + :param pipeline_name: the pipeline's name + :return: + """ + @property def secret_id_separator(self) -> str: return "/" @@ -179,6 +191,9 @@ class LocalSecretsManager(SecretsManager): """ The LocalSecretsManager does not modify the OpenMetadataConnection object """ + logger.debug( + f"Adding auth provider security config using {SecretsManagerProvider.local.name} secrets' manager" + ) pass def retrieve_service_connection( @@ -189,8 +204,25 @@ class LocalSecretsManager(SecretsManager): """ The LocalSecretsManager does not modify the ServiceConnection object """ + logger.debug( + f"Retrieving service connection from {SecretsManagerProvider.local.name} secrets' manager for {service_type} - {service.name}" + ) return ServiceConnection(__root__=service.connection) + def retrieve_dbt_source_config( + self, source_config: SourceConfig, pipeline_name: str + ) -> object: + logger.debug( + f"Retrieving source_config from {SecretsManagerProvider.local.name} secrets' manager for {pipeline_name}" + ) + return ( + source_config.config.dbtConfigSource.dict() + if source_config + and source_config.config + and source_config.config.dbtConfigSource + else None + ) + class AWSSecretsManager(SecretsManager): def __init__(self, credentials: AWSCredentials, cluster_prefix: str): @@ -212,6 +244,9 @@ class AWSSecretsManager(SecretsManager): service: ServiceWithConnectionType, service_type: str, ) -> ServiceConnection: + logger.debug( + f"Retrieving service connection from {SecretsManagerProvider.aws.name} secrets' manager for {service_type} - {service.name}" + ) service_connection_type = service.serviceType.value service_name = service.name.__root__ secret_id = self.build_secret_id( @@ -229,6 +264,9 @@ class AWSSecretsManager(SecretsManager): return ServiceConnection(__root__=service_connection) def add_auth_provider_security_config(self, config: OpenMetadataConnection) -> None: + logger.debug( + f"Adding auth provider security config using {SecretsManagerProvider.aws.name} secrets' manager" + ) if config.authProvider == AuthProvider.no_auth: return config secret_id = self.build_secret_id( @@ -244,6 +282,16 @@ class AWSSecretsManager(SecretsManager): f"No client implemented for auth provider: [{config.authProvider}]" ) + def retrieve_dbt_source_config( + self, source_config: SourceConfig, pipeline_name: str + ) -> object: + logger.debug( + f"Retrieving source_config from {SecretsManagerProvider.local.name} secrets' manager for {pipeline_name}" + ) + secret_id = self.build_secret_id("database-metadata-pipeline", pipeline_name) + source_config_json = self._get_string_value(secret_id) + return json.loads(source_config_json) if source_config_json else None + def _get_string_value(self, name: str) -> str: """ :param name: The secret name to retrieve. Current stage is always retrieved. @@ -263,7 +311,11 @@ class AWSSecretsManager(SecretsManager): raise else: if "SecretString" in response: - return response["SecretString"] + return ( + response["SecretString"] + if response["SecretString"] != "null" + else None + ) else: raise ValueError("[SecretString] not present in the response.") diff --git a/ingestion/tests/unit/metadata/utils/test_secrets_manager.py b/ingestion/tests/unit/metadata/utils/test_secrets_manager.py index 382fe5e74de..c60bb2d5375 100644 --- a/ingestion/tests/unit/metadata/utils/test_secrets_manager.py +++ b/ingestion/tests/unit/metadata/utils/test_secrets_manager.py @@ -35,13 +35,18 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseService, DatabaseServiceType, ) +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, + DbtHttpConfig, +) +from metadata.generated.schema.metadataIngestion.workflow import SourceConfig from metadata.generated.schema.security.client.googleSSOClientConfig import ( GoogleSSOClientConfig, ) from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials from metadata.utils.secrets_manager import ( AUTH_PROVIDER_MAPPING, - SecretsManager, + AWSSecretsManager, Singleton, get_secrets_manager, ) @@ -57,6 +62,11 @@ DATABASE_SERVICE = { AUTH_PROVIDER_CONFIG = {"secretKey": "/fake/path"} +DBT_SOURCE_CONFIG = { + "dbtCatalogHttpPath": "/fake/path", + "dbtManifestHttpPath": "/fake/path", +} + class TestSecretsManager(TestCase): service_type: str = "database" @@ -65,6 +75,7 @@ class TestSecretsManager(TestCase): database_connection = MysqlConnection(**DATABASE_CONNECTION) auth_provider_config = GoogleSSOClientConfig(**AUTH_PROVIDER_CONFIG) om_connection: OpenMetadataConnection + dbt_source_config: DbtHttpConfig @classmethod def setUpClass(cls) -> None: @@ -75,6 +86,7 @@ class TestSecretsManager(TestCase): authProvider=AuthProvider.google, hostPort="http://localhost:8585/api", ) + cls.dbt_source_config = DbtHttpConfig.parse_obj(DBT_SOURCE_CONFIG) @classmethod def setUp(cls) -> None: @@ -107,6 +119,21 @@ class TestSecretsManager(TestCase): self.assertEqual(self.auth_provider_config, actual_om_connection.securityConfig) assert id(self.auth_provider_config) == id(actual_om_connection.securityConfig) + def test_local_manager_retrieve_dbt_source_config(self): + local_manager = get_secrets_manager( + self._build_open_metadata_connection(SecretsManagerProvider.local), None + ) + source_config = SourceConfig() + source_config.config = DatabaseServiceMetadataPipeline( + dbtConfigSource=self.dbt_source_config + ) + + actual_dbt_source_config = local_manager.retrieve_dbt_source_config( + source_config, "test-pipeline" + ) + + self.assertEqual(self.dbt_source_config.dict(), actual_dbt_source_config) + @patch("metadata.utils.secrets_manager.boto3") def test_aws_manager_add_service_config_connection(self, boto3_mock): aws_manager = self._build_secret_manager( @@ -130,7 +157,9 @@ class TestSecretsManager(TestCase): ) @patch("metadata.utils.secrets_manager.boto3") - def test_aws_manager_fails_add_auth_provider_security_config(self, mocked_boto3): + def test_aws_manager_fails_add_service_config_connection_when_not_stored( + self, mocked_boto3 + ): aws_manager = self._build_secret_manager(mocked_boto3, {}) with self.assertRaises(ValueError) as value_error: @@ -157,13 +186,50 @@ class TestSecretsManager(TestCase): assert id(self.auth_provider_config) != id(actual_om_connection.securityConfig) @patch("metadata.utils.secrets_manager.boto3") - def test_aws_manager_fails_add_service_config_connection_when_not_stored( + def test_aws_manager_retrieve_dbt_source_config(self, boto3_mock): + aws_manager = self._build_secret_manager( + boto3_mock, {"SecretString": json.dumps(DBT_SOURCE_CONFIG)} + ) + source_config = SourceConfig() + source_config.config = DatabaseServiceMetadataPipeline( + dbtConfigSource=self.dbt_source_config + ) + + actual_dbt_source_config = aws_manager.retrieve_dbt_source_config( + source_config, "test-pipeline" + ) + + expected_call = { + "SecretId": "/openmetadata/database-metadata-pipeline/test-pipeline" + } + aws_manager.secretsmanager_client.get_secret_value.assert_called_once_with( + **expected_call + ) + self.assertEqual(self.dbt_source_config.dict(), actual_dbt_source_config) + + @patch("metadata.utils.secrets_manager.boto3") + def test_aws_manager_fails_add_auth_provider_security_config(self, mocked_boto3): + aws_manager = self._build_secret_manager(mocked_boto3, {}) + + with self.assertRaises(ValueError) as value_error: + aws_manager.add_auth_provider_security_config(self.om_connection) + self.assertEqual( + "[SecretString] not present in the response.", value_error.exception + ) + + @patch("metadata.utils.secrets_manager.boto3") + def test_aws_manager_aws_manager_fails_retrieve_dbt_source_config_when_not_stored( self, mocked_boto3 ): aws_manager = self._build_secret_manager(mocked_boto3, {}) + source_config = SourceConfig() + source_config.config = DatabaseServiceMetadataPipeline( + dbtConfigSource=self.dbt_source_config + ) + with self.assertRaises(ValueError) as value_error: - aws_manager.retrieve_service_connection(self.service, self.service_type) + aws_manager.retrieve_dbt_source_config(source_config, "test-pipeline") self.assertEqual( "[SecretString] not present in the response.", value_error.exception ) @@ -188,17 +254,16 @@ class TestSecretsManager(TestCase): def _build_secret_manager( self, mocked_boto3: Mock, expected_json: Dict[str, Any] - ) -> SecretsManager: + ) -> AWSSecretsManager: self._init_boto3_mock(mocked_boto3, expected_json) - aws_manager = get_secrets_manager( - self._build_open_metadata_connection(SecretsManagerProvider.aws), + return AWSSecretsManager( AWSCredentials( awsAccessKeyId="fake_key", awsSecretAccessKey="fake_access", awsRegion="fake-region", ), + "openmetadata", ) - return aws_manager @staticmethod def _build_open_metadata_connection( diff --git a/openmetadata-core/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/openmetadata-core/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index bf523db48a5..c3d651e1e32 100644 --- a/openmetadata-core/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/openmetadata-core/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -154,6 +154,7 @@ "$ref": "../type/filterPattern.json#/definitions/filterPattern" }, "dbtConfigSource": { + "mask": true, "title": "DBT Configuration Source", "description": "Available sources to fetch DBT catalog and manifest files.", "oneOf": [ diff --git a/openmetadata-core/src/main/resources/json/schema/metadataIngestion/workflow.json b/openmetadata-core/src/main/resources/json/schema/metadataIngestion/workflow.json index e2ff769fbc0..955f38c7b90 100644 --- a/openmetadata-core/src/main/resources/json/schema/metadataIngestion/workflow.json +++ b/openmetadata-core/src/main/resources/json/schema/metadataIngestion/workflow.json @@ -11,7 +11,6 @@ "type": "object", "properties": { "config": { - "mask": true, "oneOf": [ { "$ref": "databaseServiceMetadataPipeline.json"