Added SSL config for Kafka publisher (#6752)

* Added ssl config for kafka publisher

* Added ssl config for kafka publisher

* Added ssl config for kafka publisher
This commit is contained in:
Parth Panchal 2022-08-17 11:28:55 +05:30 committed by GitHub
parent 874a86944d
commit 28976583d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 0 deletions

View File

@ -3,9 +3,11 @@ package org.openmetadata.catalog.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;
import org.openmetadata.catalog.events.WebhookPublisher;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.resources.events.EventResource;
@ -23,6 +25,29 @@ public class KafkaWebhookEventPublisher extends WebhookPublisher {
public KafkaWebhookEventPublisher(Webhook webhook, CollectionDAO dao) {
super(webhook, dao);
this.webhook = webhook;
if (webhook.getKafkaProperties().getSecurityProtocol().equals(KafkaEventConfiguration.SecurityProtocol.SSL)) {
// configuration for SSL Encryption
if (webhook.getKafkaProperties().getSSLProtocol() != null
&& webhook.getKafkaProperties().getSSLTrustStoreLocation() != null
&& webhook.getKafkaProperties().getSSLTrustStorePassword() != null
&& webhook.getKafkaProperties().getSSLKeystoreLocation() != null
&& webhook.getKafkaProperties().getSSLKeystorePassword() != null
&& webhook.getKafkaProperties().getSSLKeyPassword() != null) {
properties.put(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, webhook.getKafkaProperties().getSecurityProtocol());
properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, webhook.getKafkaProperties().getSSLProtocol());
properties.put(
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, webhook.getKafkaProperties().getSSLTrustStoreLocation());
properties.put(
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, webhook.getKafkaProperties().getSSLTrustStorePassword());
// configuration for SSL Authentication
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, webhook.getKafkaProperties().getSSLKeystoreLocation());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, webhook.getKafkaProperties().getSSLKeystorePassword());
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, webhook.getKafkaProperties().getSSLKeyPassword());
} else {
LOG.info("The SSL could not be configured as the required properties are not defined!");
}
}
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, webhook.getEndpoint().toString());
properties.put(ProducerConfig.ACKS_CONFIG, webhook.getKafkaProperties().getAcks());
properties.put(ProducerConfig.RETRIES_CONFIG, webhook.getKafkaProperties().getRetries());

View File

@ -42,6 +42,45 @@
"description": "Serializer class for value",
"default": "org.apache.kafka.common.serialization.StringSerializer",
"type": "string"
},
"securityProtocol": {
"description": "Kafka security protocol config",
"default": "PLAINTEXT",
"type": "string",
"enum": ["PLAINTEXT", "SSL"],
"javaEnums": [
{
"name": "PLAINTEXT"
},
{
"name": "SSL"
}
]
},
"SSLProtocol": {
"description": "Kafka SSL protocol config",
"default": "TLSv1.2",
"type": "string"
},
"SSLTrustStoreLocation": {
"description": "Kafka SSL truststore location",
"type": "string"
},
"SSLTrustStorePassword": {
"description": "Kafka SSL truststore password",
"type": "string"
},
"SSLKeystoreLocation": {
"description": "Kafka SSL keystore location",
"type": "string"
},
"SSLKeystorePassword": {
"description": "Kafka SSL keystore password",
"type": "string"
},
"SSLKeyPassword": {
"description": "Kafka SSL key password",
"type": "string"
}
},
"required": ["topics"],