mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-29 02:48:24 +00:00
feat(metadata-service): add Schema Registry support for TLS (#3804)
Co-authored-by: Mike Schlosser <michael.a.schlosser@optum.com>
This commit is contained in:
parent
286b42663c
commit
101275230e
@ -74,16 +74,34 @@ extraEnvs:
|
||||
|
||||
## SSL
|
||||
|
||||
### Kafka
|
||||
We are using the Spring Boot framework to start our apps, including setting up Kafka. You can
|
||||
[use environment variables to set system properties](https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-external-config-relaxed-binding-from-environment-variables),
|
||||
including [Kafka properties](https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html#integration-properties).
|
||||
From there you can set your SSL configuration for Kafka.
|
||||
|
||||
If Schema Registry is configured to use security (SSL), then you also need to set
|
||||
[this config](https://docs.confluent.io/current/kafka/encryption.html#encryption-ssl-schema-registry).
|
||||
### Schema Registry
|
||||
If Schema Registry is configured to use security (SSL), then you also need to set additional values.
|
||||
|
||||
The [MCE](../../metadata-jobs/mce-consumer-job) and [MAE](../../metadata-jobs/mae-consumer-job) consumers can set
|
||||
default Spring Kafka environment values, for example:
|
||||
- `SPRING_KAFKA_PROPERTIES_SCHEMA_REGISTRY_SECURITY_PROTOCOL`
|
||||
- `SPRING_KAFKA_PROPERTIES_SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION`
|
||||
- `SPRING_KAFKA_PROPERTIES_SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD`
|
||||
- `SPRING_KAFKA_PROPERTIES_SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION`
|
||||
- `SPRING_KAFKA_PROPERTIES_SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD`
|
||||
|
||||
[GMS](../what/gms.md) can set the following environment variables that will be passed as properties when creating the Schema Registry
|
||||
Client.
|
||||
- `KAFKA_SCHEMA_REGISTRY_SECURITY_PROTOCOL`
|
||||
- `KAFKA_SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION`
|
||||
- `KAFKA_SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD`
|
||||
- `KAFKA_SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION`
|
||||
- `KAFKA_SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD`
|
||||
|
||||
> **Note** In the logs you might see something like
|
||||
> `The configuration 'kafkastore.ssl.truststore.password' was supplied but isn't a known config.` The configuration is
|
||||
> not a configuration required for the producer. These WARN message can be safely ignored. Each of Datahub services are
|
||||
> passed a full set of configuration but may not require all the configurations that are passed to them. These warn
|
||||
> messages indicate that the service was passed a configuration that is not relevant to it and can be safely ignored.
|
||||
|
||||
|
||||
@ -1,36 +1,70 @@
|
||||
package com.linkedin.gms.factory.kafka.schemaregistry;
|
||||
|
||||
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
|
||||
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
|
||||
import io.confluent.kafka.serializers.KafkaAvroSerializer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import javax.annotation.Nonnull;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class)
|
||||
public class KafkaSchemaRegistryFactory {
|
||||
|
||||
public static final String TYPE = "KAFKA";
|
||||
public static final String TYPE = "KAFKA";
|
||||
|
||||
@Value("${kafka.schemaRegistry.url}")
|
||||
private String kafkaSchemaRegistryUrl;
|
||||
@Value("${kafka.schemaRegistry.url}")
|
||||
private String kafkaSchemaRegistryUrl;
|
||||
|
||||
@Bean(name = "kafkaSchemaRegistry")
|
||||
@Nonnull
|
||||
protected SchemaRegistryConfig getInstance() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
|
||||
@Value("${kafka.schema.registry.ssl.truststore.location:}")
|
||||
private String sslTruststoreLocation;
|
||||
|
||||
log.info("Creating kafka registry");
|
||||
return new SchemaRegistryConfig(KafkaAvroSerializer.class, KafkaAvroDeserializer.class, props);
|
||||
}
|
||||
}
|
||||
@Value("${kafka.schema.registry.ssl.truststore.password:}")
|
||||
private String sslTruststorePassword;
|
||||
|
||||
@Value("${kafka.schema.registry.ssl.keystore.location:}")
|
||||
private String sslKeystoreLocation;
|
||||
|
||||
@Value("${kafka.schema.registry.ssl.keystore.password:}")
|
||||
private String sslKeystorePassword;
|
||||
|
||||
@Value("${kafka.schema.registry.security.protocol:}")
|
||||
private String securityProtocol;
|
||||
|
||||
@Bean(name = "kafkaSchemaRegistry")
|
||||
@Nonnull
|
||||
protected SchemaRegistryConfig getInstance() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
|
||||
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
|
||||
props.put(withNamespace(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), sslTruststoreLocation);
|
||||
props.put(withNamespace(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), sslTruststorePassword);
|
||||
props.put(withNamespace(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), sslKeystoreLocation);
|
||||
props.put(withNamespace(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), sslKeystorePassword);
|
||||
props.put(withNamespace(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG), securityProtocol);
|
||||
|
||||
if (sslKeystoreLocation.isEmpty()) {
|
||||
log.info("creating schema registry config using url: {}", kafkaSchemaRegistryUrl);
|
||||
} else {
|
||||
log.info("creating schema registry config using url: {}, keystore location: {} and truststore location: {}",
|
||||
kafkaSchemaRegistryUrl, sslTruststoreLocation, sslKeystoreLocation);
|
||||
}
|
||||
|
||||
return new SchemaRegistryConfig(KafkaAvroSerializer.class, KafkaAvroDeserializer.class, props);
|
||||
}
|
||||
|
||||
private String withNamespace(String configKey) {
|
||||
return SchemaRegistryClientConfig.CLIENT_NAMESPACE + configKey;
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user