mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-02 12:26:42 +00:00
* See #2486. Add fernet Java library to encrypt DatabaseService.DatabaseConnection.password * Add support for rotation. * Add support for CLI to rotate the key. * The fernet key can be null. No encryption will be done. * Add FernetConfiguration * Add env variable for Fernet. * Convert base64 to base64url * Decrypt (admin or bot) or nullify tokens for all methods including ListVersions and GetVersion * Fix test with owner USER to handle null in DatabaseConnection
This commit is contained in:
parent
f9b5901d69
commit
0e73de4658
@ -53,7 +53,7 @@ execute() {
|
||||
|
||||
printUsage() {
|
||||
cat <<-EOF
|
||||
USAGE: $0 [create|migrate|info|validate|drop|drop-create|es-drop|es-create|drop-create-all|migrate-all|repair|check-connection]
|
||||
USAGE: $0 [create|migrate|info|validate|drop|drop-create|es-drop|es-create|drop-create-all|migrate-all|repair|check-connection|rotate]
|
||||
create : Creates the tables. The target database should be empty
|
||||
migrate : Migrates the database to the latest version or creates the tables if the database is empty. Use "info" to see the current version and the pending migrations
|
||||
info : Shows the list of migrations applied and the pending migration waiting to be applied on the target database
|
||||
@ -67,6 +67,7 @@ USAGE: $0 [create|migrate|info|validate|drop|drop-create|es-drop|es-create|drop-
|
||||
repair : Repairs the DATABASE_CHANGE_LOG table which is used to track all the migrations on the target database
|
||||
This involves removing entries for the failed migrations and update the checksum of migrations already applied on the target database
|
||||
check-connection : Checks if a connection can be successfully obtained for the target database
|
||||
rotate : Rotate the Fernet Key defined in $FERNET_KEY
|
||||
EOF
|
||||
}
|
||||
|
||||
@ -80,7 +81,7 @@ fi
|
||||
opt="$1"
|
||||
|
||||
case "${opt}" in
|
||||
create | drop | migrate | info | validate | repair | check-connection | es-drop | es-create )
|
||||
create | drop | migrate | info | validate | repair | check-connection | es-drop | es-create | rotate)
|
||||
execute "${opt}"
|
||||
;;
|
||||
drop-create )
|
||||
@ -92,6 +93,9 @@ drop-create-all )
|
||||
migrate-all )
|
||||
execute "migrate" && execute "es-migrate"
|
||||
;;
|
||||
rotate )
|
||||
execute "rotate"
|
||||
;;
|
||||
*)
|
||||
printUsage
|
||||
exit 1
|
||||
|
@ -22,6 +22,10 @@
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.macasaet.fernet</groupId>
|
||||
<artifactId>fernet-java8</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.openmetadata</groupId>
|
||||
<artifactId>common</artifactId>
|
||||
|
@ -52,6 +52,7 @@ import org.openmetadata.catalog.events.EventPubSub;
|
||||
import org.openmetadata.catalog.exception.CatalogGenericExceptionMapper;
|
||||
import org.openmetadata.catalog.exception.ConstraintViolationExceptionMapper;
|
||||
import org.openmetadata.catalog.exception.JsonMappingExceptionMapper;
|
||||
import org.openmetadata.catalog.fernet.Fernet;
|
||||
import org.openmetadata.catalog.migration.Migration;
|
||||
import org.openmetadata.catalog.migration.MigrationConfiguration;
|
||||
import org.openmetadata.catalog.resources.CollectionRegistry;
|
||||
@ -95,6 +96,9 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
|
||||
jdbi.setSqlLogger(sqlLogger);
|
||||
}
|
||||
|
||||
// Configure the Fernet instance
|
||||
Fernet.getInstance().setFernetKey(catalogConfig);
|
||||
|
||||
// Validate flyway Migrations
|
||||
validateMigrations(jdbi, catalogConfig.getMigrationConfiguration());
|
||||
|
||||
|
@ -26,6 +26,7 @@ import lombok.Setter;
|
||||
import org.openmetadata.catalog.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.catalog.elasticsearch.ElasticSearchConfiguration;
|
||||
import org.openmetadata.catalog.events.EventHandlerConfiguration;
|
||||
import org.openmetadata.catalog.fernet.FernetConfiguration;
|
||||
import org.openmetadata.catalog.migration.MigrationConfiguration;
|
||||
import org.openmetadata.catalog.security.AuthenticationConfiguration;
|
||||
import org.openmetadata.catalog.security.AuthorizerConfiguration;
|
||||
@ -80,6 +81,11 @@ public class CatalogApplicationConfig extends Configuration {
|
||||
@Setter
|
||||
private MigrationConfiguration migrationConfiguration;
|
||||
|
||||
@JsonProperty("fernetConfiguration")
|
||||
@Getter
|
||||
@Setter
|
||||
private FernetConfiguration fernetConfiguration;
|
||||
|
||||
@JsonProperty("health")
|
||||
@NotNull
|
||||
@Valid
|
||||
|
@ -82,10 +82,10 @@ public class AirflowRESTClient {
|
||||
throw new AirflowException("Failed to get access_token. Please check AirflowConfiguration username, password");
|
||||
}
|
||||
|
||||
public String deploy(AirflowPipeline airflowPipeline, CatalogApplicationConfig config) {
|
||||
public String deploy(AirflowPipeline airflowPipeline, CatalogApplicationConfig config, Boolean decrypt) {
|
||||
try {
|
||||
IngestionAirflowPipeline pipeline =
|
||||
AirflowUtils.toIngestionPipeline(airflowPipeline, config.getAirflowConfiguration());
|
||||
AirflowUtils.toIngestionPipeline(airflowPipeline, config.getAirflowConfiguration(), decrypt);
|
||||
String token = authenticate();
|
||||
String authToken = String.format(AUTH_TOKEN, token);
|
||||
String pipelinePayload = JsonUtils.pojoToJson(pipeline);
|
||||
|
@ -15,6 +15,7 @@ package org.openmetadata.catalog.airflow;
|
||||
|
||||
import static org.openmetadata.catalog.Entity.DATABASE_SERVICE;
|
||||
import static org.openmetadata.catalog.Entity.helper;
|
||||
import static org.openmetadata.catalog.fernet.Fernet.decryptIfTokenized;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
@ -60,15 +61,16 @@ public final class AirflowUtils {
|
||||
|
||||
private AirflowUtils() {}
|
||||
|
||||
public static OpenMetadataIngestionComponent makeOpenMetadataDatasourceComponent(AirflowPipeline airflowPipeline)
|
||||
throws IOException, ParseException {
|
||||
public static OpenMetadataIngestionComponent makeOpenMetadataDatasourceComponent(
|
||||
AirflowPipeline airflowPipeline, Boolean decrypt) throws IOException, ParseException {
|
||||
DatabaseService databaseService = helper(airflowPipeline).findEntity("service", DATABASE_SERVICE);
|
||||
DatabaseConnection databaseConnection = databaseService.getDatabaseConnection();
|
||||
PipelineConfig pipelineConfig = airflowPipeline.getPipelineConfig();
|
||||
Map<String, Object> dbConfig = new HashMap<>();
|
||||
dbConfig.put(INGESTION_HOST_PORT, databaseConnection.getHostPort());
|
||||
dbConfig.put(INGESTION_USERNAME, databaseConnection.getUsername());
|
||||
dbConfig.put(INGESTION_PASSWORD, databaseConnection.getPassword());
|
||||
String password = decrypt ? decryptIfTokenized(databaseConnection.getPassword()) : databaseConnection.getPassword();
|
||||
dbConfig.put(INGESTION_PASSWORD, password);
|
||||
dbConfig.put(INGESTION_DATABASE, databaseConnection.getDatabase());
|
||||
dbConfig.put(INGESTION_SERVICE_NAME, databaseService.getName());
|
||||
if (databaseConnection.getConnectionOptions() != null
|
||||
@ -125,18 +127,20 @@ public final class AirflowUtils {
|
||||
}
|
||||
|
||||
public static OpenMetadataIngestionConfig buildDatabaseIngestion(
|
||||
AirflowPipeline airflowPipeline, AirflowConfiguration airflowConfiguration) throws IOException, ParseException {
|
||||
AirflowPipeline airflowPipeline, AirflowConfiguration airflowConfiguration, Boolean decrypt)
|
||||
throws IOException, ParseException {
|
||||
return OpenMetadataIngestionConfig.builder()
|
||||
.source(makeOpenMetadataDatasourceComponent(airflowPipeline))
|
||||
.source(makeOpenMetadataDatasourceComponent(airflowPipeline, decrypt))
|
||||
.sink(makeOpenMetadataSinkComponent(airflowPipeline))
|
||||
.metadataServer(makeOpenMetadataConfigComponent(airflowConfiguration))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static IngestionAirflowPipeline toIngestionPipeline(
|
||||
AirflowPipeline airflowPipeline, AirflowConfiguration airflowConfiguration) throws IOException, ParseException {
|
||||
AirflowPipeline airflowPipeline, AirflowConfiguration airflowConfiguration, Boolean decrypt)
|
||||
throws IOException, ParseException {
|
||||
Map<String, Object> taskParams = new HashMap<>();
|
||||
taskParams.put("workflow_config", buildDatabaseIngestion(airflowPipeline, airflowConfiguration));
|
||||
taskParams.put("workflow_config", buildDatabaseIngestion(airflowPipeline, airflowConfiguration, decrypt));
|
||||
IngestionTaskConfig taskConfig = IngestionTaskConfig.builder().opKwargs(taskParams).build();
|
||||
OpenMetadataIngestionTask task =
|
||||
OpenMetadataIngestionTask.builder().name(airflowPipeline.getName()).config(taskConfig).build();
|
||||
|
@ -60,4 +60,16 @@ public final class CatalogExceptionMessage {
|
||||
public static String invalidServiceEntity(String serviceType, String entityType, String expected) {
|
||||
return String.format("Invalid service type `%s` for %s. Expected %s.", serviceType, entityType, expected);
|
||||
}
|
||||
|
||||
public static String fernetKeyNotDefined() {
|
||||
return "The Fernet Key is null";
|
||||
}
|
||||
|
||||
public static String isNotTokenized() {
|
||||
return "The field is not tokenized";
|
||||
}
|
||||
|
||||
public static String isAlreadyTokenized() {
|
||||
return "The field is already tokenized";
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,124 @@
|
||||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.catalog.fernet;
|
||||
|
||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.fernetKeyNotDefined;
|
||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.isAlreadyTokenized;
|
||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.isNotTokenized;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.macasaet.fernet.Key;
|
||||
import com.macasaet.fernet.StringValidator;
|
||||
import com.macasaet.fernet.Token;
|
||||
import com.macasaet.fernet.Validator;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.TemporalAmount;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.NonNull;
|
||||
import org.openmetadata.catalog.CatalogApplicationConfig;
|
||||
|
||||
public class Fernet {
|
||||
private static Fernet instance;
|
||||
private String fernetKey;
|
||||
public static String FERNET_PREFIX = "fernet:";
|
||||
public static String FERNET_NO_ENCRYPTION = "no_encryption_at_rest";
|
||||
private Validator<String> validator =
|
||||
new StringValidator() {
|
||||
public TemporalAmount getTimeToLive() {
|
||||
return Duration.ofSeconds(Instant.MAX.getEpochSecond());
|
||||
}
|
||||
};
|
||||
|
||||
private Fernet() {}
|
||||
|
||||
public static Fernet getInstance() {
|
||||
if (instance == null) {
|
||||
instance = new Fernet();
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public void setFernetKey(CatalogApplicationConfig config) {
|
||||
FernetConfiguration fernetConfiguration = config.getFernetConfiguration();
|
||||
if (fernetConfiguration != null && !FERNET_NO_ENCRYPTION.equals(fernetConfiguration.getFernetKey())) {
|
||||
setFernetKey(fernetConfiguration.getFernetKey());
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Fernet(String fernetKey) {
|
||||
this.setFernetKey(fernetKey);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setFernetKey(String fernetKey) {
|
||||
if (fernetKey != null) {
|
||||
// convert base64 to base64url
|
||||
this.fernetKey = fernetKey.replace("/", "_").replace("+", "-").replace("=", "");
|
||||
} else {
|
||||
this.fernetKey = null;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getFernetKey() {
|
||||
return this.fernetKey;
|
||||
}
|
||||
|
||||
public Boolean isKeyDefined() {
|
||||
return fernetKey != null;
|
||||
}
|
||||
|
||||
public String encrypt(@NonNull String secret) {
|
||||
if (secret.startsWith(FERNET_PREFIX)) {
|
||||
throw new IllegalArgumentException(isAlreadyTokenized());
|
||||
}
|
||||
if (isKeyDefined()) {
|
||||
Key key = new Key(fernetKey.split(",")[0]);
|
||||
return FERNET_PREFIX + Token.generate(key, secret).serialise();
|
||||
}
|
||||
throw new IllegalArgumentException(fernetKeyNotDefined());
|
||||
}
|
||||
|
||||
public static Boolean isTokenized(String tokenized) {
|
||||
return tokenized.startsWith(FERNET_PREFIX);
|
||||
}
|
||||
|
||||
public String decrypt(String tokenized) {
|
||||
if (!isKeyDefined()) {
|
||||
throw new IllegalArgumentException(fernetKeyNotDefined());
|
||||
}
|
||||
if (tokenized != null && tokenized.startsWith(FERNET_PREFIX)) {
|
||||
String str = tokenized.split(FERNET_PREFIX, 2)[1];
|
||||
Token token = Token.fromString(str);
|
||||
List<Key> keys = Arrays.stream(fernetKey.split(",")).map(Key::new).collect(Collectors.toList());
|
||||
return token.validateAndDecrypt(keys, validator);
|
||||
}
|
||||
throw new IllegalArgumentException(isNotTokenized());
|
||||
}
|
||||
|
||||
public static String decryptIfTokenized(String tokenized) {
|
||||
if (tokenized == null) {
|
||||
return null;
|
||||
}
|
||||
Fernet fernet = Fernet.getInstance();
|
||||
if (fernet.isKeyDefined() && isTokenized(tokenized)) {
|
||||
return fernet.decrypt(tokenized);
|
||||
}
|
||||
return tokenized;
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.catalog.fernet;
|
||||
|
||||
import javax.validation.constraints.NotEmpty;
|
||||
|
||||
public class FernetConfiguration {
|
||||
|
||||
@NotEmpty private String fernetKey;
|
||||
|
||||
public String getFernetKey() {
|
||||
return fernetKey;
|
||||
}
|
||||
|
||||
public void setFernetKey(String fernetKey) {
|
||||
this.fernetKey = fernetKey;
|
||||
}
|
||||
}
|
@ -14,26 +14,35 @@
|
||||
package org.openmetadata.catalog.jdbi3;
|
||||
|
||||
import static org.openmetadata.catalog.Entity.helper;
|
||||
import static org.openmetadata.catalog.fernet.Fernet.decryptIfTokenized;
|
||||
import static org.openmetadata.catalog.fernet.Fernet.isTokenized;
|
||||
import static org.openmetadata.catalog.util.EntityUtil.toBoolean;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.entity.services.DatabaseService;
|
||||
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.catalog.fernet.Fernet;
|
||||
import org.openmetadata.catalog.resources.services.database.DatabaseServiceResource;
|
||||
import org.openmetadata.catalog.type.ChangeDescription;
|
||||
import org.openmetadata.catalog.type.DatabaseConnection;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.type.Include;
|
||||
import org.openmetadata.catalog.util.EntityInterface;
|
||||
import org.openmetadata.catalog.util.EntityUtil.Fields;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
|
||||
public class DatabaseServiceRepository extends EntityRepository<DatabaseService> {
|
||||
private static final Fields UPDATE_FIELDS = new Fields(DatabaseServiceResource.FIELD_LIST, "owner");
|
||||
private final Fernet fernet;
|
||||
|
||||
public DatabaseServiceRepository(CollectionDAO dao) {
|
||||
super(
|
||||
@ -47,6 +56,24 @@ public class DatabaseServiceRepository extends EntityRepository<DatabaseService>
|
||||
false,
|
||||
true,
|
||||
false);
|
||||
fernet = Fernet.getInstance();
|
||||
}
|
||||
|
||||
public void rotate() throws GeneralSecurityException, IOException, ParseException {
|
||||
if (!fernet.isKeyDefined()) {
|
||||
throw new IllegalArgumentException(CatalogExceptionMessage.fernetKeyNotDefined());
|
||||
}
|
||||
List<String> jsons = dao.listAfter(null, Integer.MAX_VALUE, "", Include.ALL);
|
||||
for (String json : jsons) {
|
||||
DatabaseService databaseService = JsonUtils.readValue(json, DatabaseService.class);
|
||||
DatabaseConnection databaseConnection = databaseService.getDatabaseConnection();
|
||||
if (databaseConnection != null && databaseConnection.getPassword() != null) {
|
||||
String password = decryptIfTokenized(databaseConnection.getPassword());
|
||||
String tokenized = fernet.encrypt(password);
|
||||
databaseConnection.setPassword(tokenized);
|
||||
storeEntity(databaseService, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -89,9 +116,17 @@ public class DatabaseServiceRepository extends EntityRepository<DatabaseService>
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare(DatabaseService entity) throws IOException, ParseException {
|
||||
public void prepare(DatabaseService databaseService) throws IOException, ParseException {
|
||||
// Check if owner is valid and set the relationship
|
||||
entity.setOwner(helper(entity).validateOwnerOrNull());
|
||||
databaseService.setOwner(helper(databaseService).validateOwnerOrNull());
|
||||
DatabaseConnection databaseConnection = databaseService.getDatabaseConnection();
|
||||
if (fernet.isKeyDefined()
|
||||
&& databaseConnection != null
|
||||
&& databaseConnection.getPassword() != null
|
||||
&& !isTokenized(databaseConnection.getPassword())) {
|
||||
String tokenized = fernet.encrypt(databaseConnection.getPassword());
|
||||
databaseConnection.setPassword(tokenized);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -252,6 +287,14 @@ public class DatabaseServiceRepository extends EntityRepository<DatabaseService>
|
||||
private void updateDatabaseConnectionConfig() throws JsonProcessingException {
|
||||
DatabaseConnection origConn = original.getEntity().getDatabaseConnection();
|
||||
DatabaseConnection updatedConn = updated.getEntity().getDatabaseConnection();
|
||||
if (origConn != null
|
||||
&& updatedConn != null
|
||||
&& Objects.equals(
|
||||
Fernet.decryptIfTokenized(origConn.getPassword()),
|
||||
Fernet.decryptIfTokenized(updatedConn.getPassword()))) {
|
||||
// Password in clear didn't change. The tokenized changed because it's time-dependent.
|
||||
updatedConn.setPassword(origConn.getPassword());
|
||||
}
|
||||
recordChange("databaseConnection", origConn, updatedConn, true);
|
||||
}
|
||||
}
|
||||
|
@ -337,7 +337,7 @@ public class AirflowPipelineResource {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
AirflowPipeline airflowPipeline = getAirflowPipeline(securityContext, create);
|
||||
airflowPipeline = addHref(uriInfo, dao.create(uriInfo, airflowPipeline));
|
||||
deploy(airflowPipeline);
|
||||
deploy(airflowPipeline, true);
|
||||
return Response.created(airflowPipeline.getHref()).entity(airflowPipeline).build();
|
||||
}
|
||||
|
||||
@ -393,7 +393,7 @@ public class AirflowPipelineResource {
|
||||
AirflowPipeline pipeline = getAirflowPipeline(securityContext, update);
|
||||
SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, dao.getOriginalOwner(pipeline));
|
||||
PutResponse<AirflowPipeline> response = dao.createOrUpdate(uriInfo, pipeline);
|
||||
deploy(pipeline);
|
||||
deploy(pipeline, SecurityUtil.isAdminOrBotRole(authorizer, securityContext));
|
||||
addHref(uriInfo, response.getEntity());
|
||||
return response.toResponse();
|
||||
}
|
||||
@ -462,9 +462,9 @@ public class AirflowPipelineResource {
|
||||
.withUpdatedAt(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
private void deploy(AirflowPipeline airflowPipeline) {
|
||||
private void deploy(AirflowPipeline airflowPipeline, Boolean decrypt) {
|
||||
if (Boolean.TRUE.equals(airflowPipeline.getForceDeploy())) {
|
||||
airflowRESTClient.deploy(airflowPipeline, config);
|
||||
airflowRESTClient.deploy(airflowPipeline, config, decrypt);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -13,6 +13,8 @@
|
||||
|
||||
package org.openmetadata.catalog.resources.services.database;
|
||||
|
||||
import static org.openmetadata.catalog.fernet.Fernet.isTokenized;
|
||||
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
@ -29,6 +31,7 @@ import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.validation.Valid;
|
||||
import javax.validation.constraints.Max;
|
||||
import javax.validation.constraints.Min;
|
||||
@ -50,14 +53,19 @@ import javax.ws.rs.core.UriInfo;
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.api.services.CreateDatabaseService;
|
||||
import org.openmetadata.catalog.entity.services.DatabaseService;
|
||||
import org.openmetadata.catalog.fernet.Fernet;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository;
|
||||
import org.openmetadata.catalog.resources.Collection;
|
||||
import org.openmetadata.catalog.security.AuthorizationException;
|
||||
import org.openmetadata.catalog.security.Authorizer;
|
||||
import org.openmetadata.catalog.security.SecurityUtil;
|
||||
import org.openmetadata.catalog.type.DatabaseConnection;
|
||||
import org.openmetadata.catalog.type.EntityHistory;
|
||||
import org.openmetadata.catalog.type.Include;
|
||||
import org.openmetadata.catalog.type.MetadataOperation;
|
||||
import org.openmetadata.catalog.util.EntityUtil;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import org.openmetadata.catalog.util.RestUtil;
|
||||
import org.openmetadata.catalog.util.RestUtil.DeleteResponse;
|
||||
import org.openmetadata.catalog.util.RestUtil.PutResponse;
|
||||
@ -75,6 +83,7 @@ public class DatabaseServiceResource {
|
||||
|
||||
static final String FIELDS = "airflowPipeline,owner";
|
||||
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replace(" ", "").split(","));
|
||||
private final Fernet fernet;
|
||||
|
||||
public static ResultList<DatabaseService> addHref(UriInfo uriInfo, ResultList<DatabaseService> dbServices) {
|
||||
Optional.ofNullable(dbServices.getData()).orElse(Collections.emptyList()).forEach(i -> addHref(uriInfo, i));
|
||||
@ -91,6 +100,7 @@ public class DatabaseServiceResource {
|
||||
Objects.requireNonNull(dao, "DatabaseServiceRepository must not be null");
|
||||
this.dao = new DatabaseServiceRepository(dao);
|
||||
this.authorizer = authorizer;
|
||||
this.fernet = Fernet.getInstance();
|
||||
}
|
||||
|
||||
public static class DatabaseServiceList extends ResultList<DatabaseService> {
|
||||
@ -117,6 +127,7 @@ public class DatabaseServiceResource {
|
||||
})
|
||||
public ResultList<DatabaseService> list(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(
|
||||
description = "Fields requested in the returned resource",
|
||||
schema = @Schema(type = "string", example = FIELDS))
|
||||
@ -146,7 +157,7 @@ public class DatabaseServiceResource {
|
||||
} else {
|
||||
dbServices = dao.listAfter(uriInfo, fields, null, limitParam, after, include);
|
||||
}
|
||||
return addHref(uriInfo, dbServices);
|
||||
return addHref(uriInfo, decryptOrNullify(securityContext, dbServices));
|
||||
}
|
||||
|
||||
@GET
|
||||
@ -180,7 +191,7 @@ public class DatabaseServiceResource {
|
||||
Include include)
|
||||
throws IOException, ParseException {
|
||||
EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, fieldsParam);
|
||||
return addHref(uriInfo, dao.get(uriInfo, id, fields, include));
|
||||
return addHref(uriInfo, decryptOrNullify(securityContext, dao.get(uriInfo, id, fields, include)));
|
||||
}
|
||||
|
||||
@GET
|
||||
@ -214,7 +225,7 @@ public class DatabaseServiceResource {
|
||||
Include include)
|
||||
throws IOException, ParseException {
|
||||
EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, fieldsParam);
|
||||
return addHref(uriInfo, dao.getByName(uriInfo, name, fields, include));
|
||||
return addHref(uriInfo, decryptOrNullify(securityContext, dao.getByName(uriInfo, name, fields, include)));
|
||||
}
|
||||
|
||||
@GET
|
||||
@ -234,7 +245,21 @@ public class DatabaseServiceResource {
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "database service Id", schema = @Schema(type = "string")) @PathParam("id") String id)
|
||||
throws IOException, ParseException {
|
||||
return dao.listVersions(id);
|
||||
EntityHistory entityHistory = dao.listVersions(id);
|
||||
List<Object> versions =
|
||||
entityHistory.getVersions().stream()
|
||||
.map(
|
||||
json -> {
|
||||
try {
|
||||
DatabaseService databaseService = JsonUtils.readValue((String) json, DatabaseService.class);
|
||||
return JsonUtils.pojoToJson(decryptOrNullify(securityContext, databaseService));
|
||||
} catch (IOException e) {
|
||||
return json;
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
entityHistory.setVersions(versions);
|
||||
return entityHistory;
|
||||
}
|
||||
|
||||
@GET
|
||||
@ -263,7 +288,7 @@ public class DatabaseServiceResource {
|
||||
@PathParam("version")
|
||||
String version)
|
||||
throws IOException, ParseException {
|
||||
return dao.getVersion(id, version);
|
||||
return decryptOrNullify(securityContext, dao.getVersion(id, version));
|
||||
}
|
||||
|
||||
@POST
|
||||
@ -286,7 +311,7 @@ public class DatabaseServiceResource {
|
||||
throws IOException, ParseException {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
DatabaseService service = getService(create, securityContext);
|
||||
service = addHref(uriInfo, dao.create(uriInfo, service));
|
||||
service = addHref(uriInfo, decryptOrNullify(securityContext, dao.create(uriInfo, service)));
|
||||
return Response.created(service.getHref()).entity(service).build();
|
||||
}
|
||||
|
||||
@ -311,7 +336,7 @@ public class DatabaseServiceResource {
|
||||
DatabaseService service = getService(update, securityContext);
|
||||
SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, dao.getOriginalOwner(service));
|
||||
PutResponse<DatabaseService> response = dao.createOrUpdate(uriInfo, service, true);
|
||||
addHref(uriInfo, response.getEntity());
|
||||
addHref(uriInfo, decryptOrNullify(securityContext, response.getEntity()));
|
||||
return response.toResponse();
|
||||
}
|
||||
|
||||
@ -338,9 +363,33 @@ public class DatabaseServiceResource {
|
||||
throws IOException, ParseException {
|
||||
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
|
||||
DeleteResponse<DatabaseService> response = dao.delete(securityContext.getUserPrincipal().getName(), id, recursive);
|
||||
decryptOrNullify(securityContext, response.getEntity());
|
||||
return response.toResponse();
|
||||
}
|
||||
|
||||
private ResultList<DatabaseService> decryptOrNullify(
|
||||
SecurityContext securityContext, ResultList<DatabaseService> databaseServices) {
|
||||
Optional.ofNullable(databaseServices.getData())
|
||||
.orElse(Collections.emptyList())
|
||||
.forEach(databaseService -> decryptOrNullify(securityContext, databaseService));
|
||||
return databaseServices;
|
||||
}
|
||||
|
||||
private DatabaseService decryptOrNullify(SecurityContext securityContext, DatabaseService databaseService) {
|
||||
try {
|
||||
SecurityUtil.checkAdminRoleOrPermissions(authorizer, securityContext, null, MetadataOperation.DecryptTokens);
|
||||
} catch (AuthorizationException e) {
|
||||
return databaseService.withDatabaseConnection(null);
|
||||
}
|
||||
DatabaseConnection databaseConnection = databaseService.getDatabaseConnection();
|
||||
if (databaseConnection != null
|
||||
&& databaseConnection.getPassword() != null
|
||||
&& isTokenized(databaseConnection.getPassword())) {
|
||||
databaseConnection.setPassword(fernet.decrypt(databaseConnection.getPassword()));
|
||||
}
|
||||
return databaseService;
|
||||
}
|
||||
|
||||
private DatabaseService getService(CreateDatabaseService create, SecurityContext securityContext) {
|
||||
return new DatabaseService()
|
||||
.withId(UUID.randomUUID())
|
||||
|
@ -37,6 +37,15 @@ public final class SecurityUtil {
|
||||
}
|
||||
}
|
||||
|
||||
public static Boolean isAdminOrBotRole(Authorizer authorizer, SecurityContext securityContext) {
|
||||
try {
|
||||
checkAdminOrBotRole(authorizer, securityContext);
|
||||
return true;
|
||||
} catch (AuthorizationException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static void checkAdminOrBotRole(Authorizer authorizer, SecurityContext securityContext) {
|
||||
Principal principal = securityContext.getUserPrincipal();
|
||||
AuthenticationContext authenticationCtx = SecurityUtil.getAuthenticationContext(principal);
|
||||
|
@ -209,6 +209,10 @@ public final class RestUtil {
|
||||
this.changeType = changeType;
|
||||
}
|
||||
|
||||
public T getEntity() {
|
||||
return entity;
|
||||
}
|
||||
|
||||
public Response toResponse() {
|
||||
ResponseBuilder responseBuilder = Response.status(Status.OK).header(CHANGE_CUSTOM_HEADER, changeType);
|
||||
return responseBuilder.entity(entity).build();
|
||||
|
@ -24,11 +24,14 @@ import io.dropwizard.db.DataSourceFactory;
|
||||
import io.dropwizard.jackson.Jackson;
|
||||
import io.dropwizard.jersey.validation.Validators;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.text.ParseException;
|
||||
import javax.validation.Validator;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.CommandLineParser;
|
||||
@ -38,9 +41,14 @@ import org.apache.commons.cli.Options;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.flywaydb.core.Flyway;
|
||||
import org.flywaydb.core.api.MigrationVersion;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.jdbi.v3.sqlobject.SqlObjectPlugin;
|
||||
import org.openmetadata.catalog.CatalogApplicationConfig;
|
||||
import org.openmetadata.catalog.elasticsearch.ElasticSearchConfiguration;
|
||||
import org.openmetadata.catalog.elasticsearch.ElasticSearchIndexDefinition;
|
||||
import org.openmetadata.catalog.fernet.Fernet;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository;
|
||||
|
||||
public final class TablesInitializer {
|
||||
private static final String OPTION_SCRIPT_ROOT_PATH = "script-root";
|
||||
@ -84,6 +92,7 @@ public final class TablesInitializer {
|
||||
OPTIONS.addOption(
|
||||
null, SchemaMigrationOption.ES_DROP.toString(), false, "Drop all the indexes in the elastic search");
|
||||
OPTIONS.addOption(null, SchemaMigrationOption.ES_MIGRATE.toString(), false, "Update Elastic Search index mapping");
|
||||
OPTIONS.addOption(null, SchemaMigrationOption.ROTATE.toString(), false, "Rotate the Fernet Key");
|
||||
}
|
||||
|
||||
private TablesInitializer() {}
|
||||
@ -128,6 +137,7 @@ public final class TablesInitializer {
|
||||
new SubstitutingSourceProvider(
|
||||
new FileConfigurationSourceProvider(), new EnvironmentVariableSubstitutor(false)),
|
||||
confFilePath);
|
||||
Fernet.getInstance().setFernetKey(config);
|
||||
DataSourceFactory dataSourceFactory = config.getDataSourceFactory();
|
||||
ElasticSearchConfiguration esConfig = config.getElasticSearchConfiguration();
|
||||
if (dataSourceFactory == null) {
|
||||
@ -144,7 +154,7 @@ public final class TablesInitializer {
|
||||
Flyway flyway = get(jdbcUrl, user, password, scriptRootPath, !disableValidateOnMigrate);
|
||||
RestHighLevelClient client = ElasticSearchClientUtils.createElasticSearchClient(esConfig);
|
||||
try {
|
||||
execute(flyway, client, schemaMigrationOptionSpecified);
|
||||
execute(flyway, client, schemaMigrationOptionSpecified, dataSourceFactory);
|
||||
System.out.printf("\"%s\" option successful%n", schemaMigrationOptionSpecified);
|
||||
} catch (Exception e) {
|
||||
System.err.printf("\"%s\" option failed : %s%n", schemaMigrationOptionSpecified, e);
|
||||
@ -172,8 +182,12 @@ public final class TablesInitializer {
|
||||
.load();
|
||||
}
|
||||
|
||||
private static void execute(Flyway flyway, RestHighLevelClient client, SchemaMigrationOption schemaMigrationOption)
|
||||
throws SQLException {
|
||||
private static void execute(
|
||||
Flyway flyway,
|
||||
RestHighLevelClient client,
|
||||
SchemaMigrationOption schemaMigrationOption,
|
||||
DataSourceFactory dataSourceFactory)
|
||||
throws SQLException, GeneralSecurityException, IOException, ParseException {
|
||||
ElasticSearchIndexDefinition esIndexDefinition;
|
||||
switch (schemaMigrationOption) {
|
||||
case CREATE:
|
||||
@ -228,6 +242,9 @@ public final class TablesInitializer {
|
||||
esIndexDefinition = new ElasticSearchIndexDefinition(client);
|
||||
esIndexDefinition.dropIndexes();
|
||||
break;
|
||||
case ROTATE:
|
||||
rotate(dataSourceFactory);
|
||||
break;
|
||||
default:
|
||||
throw new SQLException("SchemaMigrationHelper unable to execute the option : " + schemaMigrationOption);
|
||||
}
|
||||
@ -238,6 +255,17 @@ public final class TablesInitializer {
|
||||
formatter.printHelp("TableInitializer [options]", TablesInitializer.OPTIONS);
|
||||
}
|
||||
|
||||
public static void rotate(DataSourceFactory dataSourceFactory)
|
||||
throws GeneralSecurityException, IOException, ParseException {
|
||||
String user = dataSourceFactory.getUser() != null ? dataSourceFactory.getUser() : "";
|
||||
String password = dataSourceFactory.getPassword() != null ? dataSourceFactory.getPassword() : "";
|
||||
Jdbi jdbi = Jdbi.create(dataSourceFactory.getUrl(), user, password);
|
||||
jdbi.installPlugin(new SqlObjectPlugin());
|
||||
CollectionDAO daoObject = jdbi.onDemand(CollectionDAO.class);
|
||||
DatabaseServiceRepository databaseServiceRepository = new DatabaseServiceRepository(daoObject);
|
||||
databaseServiceRepository.rotate();
|
||||
}
|
||||
|
||||
enum SchemaMigrationOption {
|
||||
CHECK_CONNECTION("check-connection"),
|
||||
CREATE("create"),
|
||||
@ -248,7 +276,8 @@ public final class TablesInitializer {
|
||||
REPAIR("repair"),
|
||||
ES_DROP("es-drop"),
|
||||
ES_CREATE("es-create"),
|
||||
ES_MIGRATE("es-migrate");
|
||||
ES_MIGRATE("es-migrate"),
|
||||
ROTATE("rotate");
|
||||
|
||||
private final String value;
|
||||
|
||||
|
@ -16,7 +16,8 @@
|
||||
"UpdateDescription",
|
||||
"UpdateOwner",
|
||||
"UpdateTags",
|
||||
"UpdateLineage"
|
||||
"UpdateLineage",
|
||||
"DecryptTokens"
|
||||
],
|
||||
"javaEnums": [
|
||||
{ "name": "SuggestDescription" },
|
||||
@ -24,7 +25,8 @@
|
||||
{ "name": "UpdateDescription" },
|
||||
{ "name": "UpdateOwner" },
|
||||
{ "name": "UpdateTags" },
|
||||
{ "name": "UpdateLineage" }
|
||||
{ "name": "UpdateLineage" },
|
||||
{ "name": "DecryptTokens" }
|
||||
]
|
||||
}
|
||||
},
|
||||
|
@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.glassfish.jersey.client.ClientProperties;
|
||||
import org.glassfish.jersey.client.HttpUrlConnectorProvider;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.openmetadata.catalog.fernet.Fernet;
|
||||
import org.openmetadata.catalog.resources.CollectionRegistry;
|
||||
import org.openmetadata.catalog.resources.EmbeddedMySqlSupport;
|
||||
import org.openmetadata.catalog.resources.events.WebhookCallbackResource;
|
||||
@ -35,6 +36,8 @@ public abstract class CatalogApplicationTest {
|
||||
public static final DropwizardAppExtension<CatalogApplicationConfig> APP;
|
||||
private static final Client client;
|
||||
protected static final WebhookCallbackResource webhookCallbackResource = new WebhookCallbackResource();
|
||||
public static final String FERNET_KEY_1 = "ihZpp5gmmDvVsgoOG6OVivKWwC9vd5JQ";
|
||||
public static final String FERNET_KEY_2 = "0cDdxg2rlodhcsjtmuFsOOvWpRRTW9ZJ";
|
||||
|
||||
static {
|
||||
CollectionRegistry.addTestResource(webhookCallbackResource);
|
||||
@ -44,6 +47,7 @@ public abstract class CatalogApplicationTest {
|
||||
client.property(ClientProperties.CONNECT_TIMEOUT, 0);
|
||||
client.property(ClientProperties.READ_TIMEOUT, 0);
|
||||
client.property(HttpUrlConnectorProvider.SET_METHOD_WORKAROUND, true);
|
||||
Fernet.getInstance().setFernetKey(FERNET_KEY_1);
|
||||
}
|
||||
|
||||
public static WebTarget getResource(String collection) {
|
||||
|
@ -140,7 +140,7 @@ import org.openmetadata.catalog.util.TestUtils;
|
||||
public abstract class EntityResourceTest<T, K> extends CatalogApplicationTest {
|
||||
private static final Map<String, EntityResourceTest<?, ?>> ENTITY_RESOURCE_TEST_MAP = new HashMap<>();
|
||||
private final String entityType;
|
||||
private final Class<T> entityClass;
|
||||
protected final Class<T> entityClass;
|
||||
private final Class<? extends ResultList<T>> entityListClass;
|
||||
protected final String collectionName;
|
||||
private final String allFields;
|
||||
|
@ -28,6 +28,7 @@ import static org.openmetadata.catalog.airflow.AirflowUtils.INGESTION_OPTIONS;
|
||||
import static org.openmetadata.catalog.airflow.AirflowUtils.INGESTION_PASSWORD;
|
||||
import static org.openmetadata.catalog.airflow.AirflowUtils.INGESTION_SERVICE_NAME;
|
||||
import static org.openmetadata.catalog.airflow.AirflowUtils.INGESTION_USERNAME;
|
||||
import static org.openmetadata.catalog.fernet.Fernet.decryptIfTokenized;
|
||||
import static org.openmetadata.catalog.util.TestUtils.ADMIN_AUTH_HEADERS;
|
||||
import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE;
|
||||
import static org.openmetadata.catalog.util.TestUtils.assertListNotNull;
|
||||
@ -542,7 +543,8 @@ public class AirflowPipelineResourceTest extends EntityOperationsResourceTest<Ai
|
||||
|
||||
private void validateGeneratedAirflowPipelineConfig(AirflowPipeline airflowPipeline)
|
||||
throws IOException, ParseException {
|
||||
IngestionAirflowPipeline ingestionPipeline = AirflowUtils.toIngestionPipeline(airflowPipeline, AIRFLOW_CONFIG);
|
||||
IngestionAirflowPipeline ingestionPipeline =
|
||||
AirflowUtils.toIngestionPipeline(airflowPipeline, AIRFLOW_CONFIG, true);
|
||||
DatabaseService databaseService = helper(airflowPipeline).findEntity("service", DATABASE_SERVICE);
|
||||
DatabaseConnection databaseConnection = databaseService.getDatabaseConnection();
|
||||
DatabaseServiceMetadataPipeline metadataPipeline =
|
||||
@ -564,7 +566,7 @@ public class AirflowPipelineResourceTest extends EntityOperationsResourceTest<Ai
|
||||
assertEquals(databaseService.getName(), source.getConfig().get(INGESTION_SERVICE_NAME));
|
||||
assertEquals(databaseConnection.getHostPort(), source.getConfig().get(INGESTION_HOST_PORT));
|
||||
assertEquals(databaseConnection.getUsername(), source.getConfig().get(INGESTION_USERNAME));
|
||||
assertEquals(databaseConnection.getPassword(), source.getConfig().get(INGESTION_PASSWORD));
|
||||
assertEquals(decryptIfTokenized(databaseConnection.getPassword()), source.getConfig().get(INGESTION_PASSWORD));
|
||||
assertEquals(databaseConnection.getDatabase(), source.getConfig().get(INGESTION_DATABASE));
|
||||
if (databaseConnection.getConnectionArguments() != null) {
|
||||
assertEquals(
|
||||
|
@ -91,6 +91,7 @@ class PermisssionsResourceTest extends CatalogApplicationTest {
|
||||
put(MetadataOperation.UpdateLineage, Boolean.TRUE);
|
||||
put(MetadataOperation.UpdateOwner, Boolean.TRUE);
|
||||
put(MetadataOperation.UpdateTags, Boolean.TRUE);
|
||||
put(MetadataOperation.DecryptTokens, Boolean.TRUE);
|
||||
}
|
||||
}),
|
||||
Arguments.of(
|
||||
@ -103,6 +104,7 @@ class PermisssionsResourceTest extends CatalogApplicationTest {
|
||||
put(MetadataOperation.UpdateLineage, Boolean.TRUE);
|
||||
put(MetadataOperation.UpdateOwner, Boolean.TRUE);
|
||||
put(MetadataOperation.UpdateTags, Boolean.TRUE);
|
||||
put(MetadataOperation.DecryptTokens, Boolean.FALSE);
|
||||
}
|
||||
}),
|
||||
Arguments.of(
|
||||
@ -115,6 +117,7 @@ class PermisssionsResourceTest extends CatalogApplicationTest {
|
||||
put(MetadataOperation.UpdateLineage, Boolean.FALSE);
|
||||
put(MetadataOperation.UpdateOwner, Boolean.FALSE);
|
||||
put(MetadataOperation.UpdateTags, Boolean.FALSE);
|
||||
put(MetadataOperation.DecryptTokens, Boolean.FALSE);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
@ -13,17 +13,23 @@
|
||||
|
||||
package org.openmetadata.catalog.resources.services;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
|
||||
import static javax.ws.rs.core.Response.Status.OK;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.openmetadata.catalog.Entity.helper;
|
||||
import static org.openmetadata.catalog.util.TestUtils.ADMIN_AUTH_HEADERS;
|
||||
import static org.openmetadata.catalog.util.TestUtils.TEST_AUTH_HEADERS;
|
||||
import static org.openmetadata.catalog.util.TestUtils.getPrincipal;
|
||||
|
||||
import io.dropwizard.db.DataSourceFactory;
|
||||
import java.io.IOException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.http.client.HttpResponseException;
|
||||
@ -35,6 +41,7 @@ import org.openmetadata.catalog.api.operations.pipelines.PipelineConfig;
|
||||
import org.openmetadata.catalog.api.services.CreateDatabaseService;
|
||||
import org.openmetadata.catalog.api.services.CreateDatabaseService.DatabaseServiceType;
|
||||
import org.openmetadata.catalog.entity.services.DatabaseService;
|
||||
import org.openmetadata.catalog.fernet.Fernet;
|
||||
import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceEntityInterface;
|
||||
import org.openmetadata.catalog.operations.pipelines.AirflowPipeline;
|
||||
import org.openmetadata.catalog.operations.pipelines.DatabaseServiceMetadataPipeline;
|
||||
@ -46,11 +53,13 @@ import org.openmetadata.catalog.type.ChangeDescription;
|
||||
import org.openmetadata.catalog.type.ConnectionArguments;
|
||||
import org.openmetadata.catalog.type.ConnectionOptions;
|
||||
import org.openmetadata.catalog.type.DatabaseConnection;
|
||||
import org.openmetadata.catalog.type.EntityHistory;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.type.FieldChange;
|
||||
import org.openmetadata.catalog.type.Schedule;
|
||||
import org.openmetadata.catalog.util.EntityInterface;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import org.openmetadata.catalog.util.TablesInitializer;
|
||||
import org.openmetadata.catalog.util.TestUtils;
|
||||
import org.openmetadata.catalog.util.TestUtils.UpdateType;
|
||||
|
||||
@ -156,8 +165,8 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
|
||||
new DatabaseServiceMetadataPipeline()
|
||||
.withMarkDeletedTables(true)
|
||||
.withIncludeViews(true)
|
||||
.withSchemaFilterPattern(new FilterPattern().withExcludes(Arrays.asList("information_schema.*", "test.*")))
|
||||
.withTableFilterPattern(new FilterPattern().withIncludes(Arrays.asList("sales.*", "users.*")));
|
||||
.withSchemaFilterPattern(new FilterPattern().withExcludes(asList("information_schema.*", "test.*")))
|
||||
.withTableFilterPattern(new FilterPattern().withIncludes(asList("sales.*", "users.*")));
|
||||
PipelineConfig pipelineConfig =
|
||||
new PipelineConfig()
|
||||
.withSchema(PipelineConfig.Schema.DATABASE_SERVICE_METADATA_PIPELINE)
|
||||
@ -172,6 +181,76 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
|
||||
assertEquals(airflowPipeline.getFullyQualifiedName(), expectedPipeline.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
void fernet_createDatabaseService(TestInfo test) throws IOException {
|
||||
Fernet.getInstance().setFernetKey(FERNET_KEY_1);
|
||||
|
||||
DatabaseConnection databaseConnection =
|
||||
new DatabaseConnection()
|
||||
.withDatabase("test")
|
||||
.withHostPort("host:9000")
|
||||
.withPassword("password")
|
||||
.withUsername("username");
|
||||
createAndCheckEntity(createRequest(test, 0).withDatabaseConnection(databaseConnection), ADMIN_AUTH_HEADERS);
|
||||
Fernet.getInstance().setFernetKey(FERNET_KEY_1 + ",old_key_not_to_be_used");
|
||||
createAndCheckEntity(createRequest(test, 1).withDatabaseConnection(databaseConnection), ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
@Test
|
||||
void fernet_rotateDatabaseService(TestInfo test) throws IOException, GeneralSecurityException, ParseException {
|
||||
DatabaseConnection databaseConnection =
|
||||
new DatabaseConnection()
|
||||
.withDatabase("test")
|
||||
.withHostPort("host:9000")
|
||||
.withPassword("password")
|
||||
.withUsername("username");
|
||||
int i = 0;
|
||||
List<String> keys = asList(null, FERNET_KEY_1, FERNET_KEY_2);
|
||||
List<DatabaseService> services = new ArrayList<>();
|
||||
for (String key : keys) {
|
||||
Fernet.getInstance().setFernetKey(key);
|
||||
services.add(
|
||||
createAndCheckEntity(createRequest(test, i).withDatabaseConnection(databaseConnection), ADMIN_AUTH_HEADERS));
|
||||
i++;
|
||||
}
|
||||
Fernet.getInstance().setFernetKey(FERNET_KEY_2 + "," + FERNET_KEY_1);
|
||||
DataSourceFactory dataSourceFactory = APP.getConfiguration().getDataSourceFactory();
|
||||
TablesInitializer.rotate(dataSourceFactory);
|
||||
Fernet.getInstance().setFernetKey(FERNET_KEY_2);
|
||||
for (DatabaseService service : services) {
|
||||
DatabaseService rotated = getEntity(service.getId(), ADMIN_AUTH_HEADERS);
|
||||
assertEquals(databaseConnection, rotated.getDatabaseConnection());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void fernet_removeDatabaseConnection(TestInfo test) throws IOException {
|
||||
DatabaseConnection databaseConnection =
|
||||
new DatabaseConnection()
|
||||
.withDatabase("test")
|
||||
.withHostPort("host:9000")
|
||||
.withPassword("password")
|
||||
.withUsername("username");
|
||||
DatabaseService service =
|
||||
createAndCheckEntity(createRequest(test).withDatabaseConnection(databaseConnection), ADMIN_AUTH_HEADERS);
|
||||
CreateDatabaseService update = createRequest(test).withDescription("description1");
|
||||
updateEntity(update, OK, ADMIN_AUTH_HEADERS);
|
||||
update.withDescription("description2");
|
||||
updateEntity(update, OK, ADMIN_AUTH_HEADERS);
|
||||
EntityHistory history = getVersionList(service.getId(), TEST_AUTH_HEADERS);
|
||||
for (Object version : history.getVersions()) {
|
||||
DatabaseService databaseService = JsonUtils.readValue((String) version, entityClass);
|
||||
assertNull(databaseService.getDatabaseConnection());
|
||||
databaseService = getVersion(databaseService.getId(), databaseService.getVersion(), TEST_AUTH_HEADERS);
|
||||
assertNull(databaseService.getDatabaseConnection());
|
||||
}
|
||||
}
|
||||
|
||||
private void validatePassword(String fernetKey, String expected, String tokenized) {
|
||||
Fernet fernet = new Fernet(fernetKey);
|
||||
assertEquals(expected, fernet.decrypt(tokenized));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CreateDatabaseService createRequest(
|
||||
String name, String description, String displayName, EntityReference owner) {
|
||||
@ -193,8 +272,10 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
|
||||
createRequest.getOwner());
|
||||
assertEquals(createRequest.getName(), service.getName());
|
||||
|
||||
// Validate Database Connection
|
||||
assertEquals(createRequest.getDatabaseConnection(), service.getDatabaseConnection());
|
||||
// Validate Database Connection if available. We nullify when not admin or bot
|
||||
if (service.getDatabaseConnection() != null) {
|
||||
assertEquals(createRequest.getDatabaseConnection(), service.getDatabaseConnection());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -147,6 +147,12 @@ slackEventPublishers:
|
||||
- eventType: "entityDeleted"
|
||||
entities:
|
||||
- "*"
|
||||
|
||||
# no_encryption_at_rest is the default value, and it does what it says. Please read the manual on how
|
||||
# to secure your instance of OpenMetadata with TLS and encryption at rest.
|
||||
fernetConfiguration:
|
||||
fernetKey: ${FERNET_KEY:-no_encryption_at_rest}
|
||||
|
||||
health:
|
||||
delayedShutdownHandlerEnabled: true
|
||||
shutdownWaitPeriod: 1s
|
||||
|
6
pom.xml
6
pom.xml
@ -78,6 +78,7 @@
|
||||
<log4j.version>2.17.0</log4j.version>
|
||||
<org.junit.jupiter.version>5.8.2</org.junit.jupiter.version>
|
||||
<dropwizard-health.version>1.7.1</dropwizard-health.version>
|
||||
<fernet.version>1.4.2</fernet.version>
|
||||
|
||||
<!-- sonar -Dsonar.login=XXX -->
|
||||
<sonar.projectKey>open-metadata_OpenMetadata</sonar.projectKey>
|
||||
@ -93,6 +94,11 @@
|
||||
</properties>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.macasaet.fernet</groupId>
|
||||
<artifactId>fernet-java8</artifactId>
|
||||
<version>${fernet.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
|
Loading…
x
Reference in New Issue
Block a user