diff --git a/docs/how/kafka-config.md b/docs/how/kafka-config.md index cea92ce4ec..79144bc2a5 100644 --- a/docs/how/kafka-config.md +++ b/docs/how/kafka-config.md @@ -74,16 +74,34 @@ extraEnvs: ## SSL +### Kafka We are using the Spring Boot framework to start our apps, including setting up Kafka. You can [use environment variables to set system properties](https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-external-config-relaxed-binding-from-environment-variables), including [Kafka properties](https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html#integration-properties). From there you can set your SSL configuration for Kafka. -If Schema Registry is configured to use security (SSL), then you also need to set -[this config](https://docs.confluent.io/current/kafka/encryption.html#encryption-ssl-schema-registry). +### Schema Registry +If Schema Registry is configured to use security (SSL), then you also need to set additional values. + +The [MCE](../../metadata-jobs/mce-consumer-job) and [MAE](../../metadata-jobs/mae-consumer-job) consumers can set +default Spring Kafka environment values, for example: +- `SPRING_KAFKA_PROPERTIES_SCHEMA_REGISTRY_SECURITY_PROTOCOL` +- `SPRING_KAFKA_PROPERTIES_SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION` +- `SPRING_KAFKA_PROPERTIES_SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD` +- `SPRING_KAFKA_PROPERTIES_SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION` +- `SPRING_KAFKA_PROPERTIES_SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD` + +[GMS](../what/gms.md) can set the following environment variables that will be passed as properties when creating the Schema Registry +Client. +- `KAFKA_SCHEMA_REGISTRY_SECURITY_PROTOCOL` +- `KAFKA_SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION` +- `KAFKA_SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD` +- `KAFKA_SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION` +- `KAFKA_SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD` > **Note** In the logs you might see something like > `The configuration 'kafkastore.ssl.truststore.password' was supplied but isn't a known config.` The configuration is > not a configuration required for the producer. These WARN message can be safely ignored. Each of Datahub services are > passed a full set of configuration but may not require all the configurations that are passed to them. These warn > messages indicate that the service was passed a configuration that is not relevant to it and can be safely ignored. + diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java index 4bd38ecb33..91b54078f2 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java @@ -1,36 +1,70 @@ package com.linkedin.gms.factory.kafka.schemaregistry; import com.linkedin.gms.factory.spring.YamlPropertySourceFactory; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.Map; @Slf4j @Configuration @PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class) public class KafkaSchemaRegistryFactory { - public static final String TYPE = "KAFKA"; + public static final String TYPE = "KAFKA"; - @Value("${kafka.schemaRegistry.url}") - private String kafkaSchemaRegistryUrl; + @Value("${kafka.schemaRegistry.url}") + private String kafkaSchemaRegistryUrl; - @Bean(name = "kafkaSchemaRegistry") - @Nonnull - protected SchemaRegistryConfig getInstance() { - Map props = new HashMap<>(); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); + @Value("${kafka.schema.registry.ssl.truststore.location:}") + private String sslTruststoreLocation; - log.info("Creating kafka registry"); - return new SchemaRegistryConfig(KafkaAvroSerializer.class, KafkaAvroDeserializer.class, props); - } -} + @Value("${kafka.schema.registry.ssl.truststore.password:}") + private String sslTruststorePassword; + + @Value("${kafka.schema.registry.ssl.keystore.location:}") + private String sslKeystoreLocation; + + @Value("${kafka.schema.registry.ssl.keystore.password:}") + private String sslKeystorePassword; + + @Value("${kafka.schema.registry.security.protocol:}") + private String securityProtocol; + + @Bean(name = "kafkaSchemaRegistry") + @Nonnull + protected SchemaRegistryConfig getInstance() { + Map props = new HashMap<>(); + + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); + props.put(withNamespace(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), sslTruststoreLocation); + props.put(withNamespace(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), sslTruststorePassword); + props.put(withNamespace(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), sslKeystoreLocation); + props.put(withNamespace(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), sslKeystorePassword); + props.put(withNamespace(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG), securityProtocol); + + if (sslKeystoreLocation.isEmpty()) { + log.info("creating schema registry config using url: {}", kafkaSchemaRegistryUrl); + } else { + log.info("creating schema registry config using url: {}, keystore location: {} and truststore location: {}", + kafkaSchemaRegistryUrl, sslTruststoreLocation, sslKeystoreLocation); + } + + return new SchemaRegistryConfig(KafkaAvroSerializer.class, KafkaAvroDeserializer.class, props); + } + + private String withNamespace(String configKey) { + return SchemaRegistryClientConfig.CLIENT_NAMESPACE + configKey; + } +} \ No newline at end of file