From 28976583d5040ce2de93594fd55e22f280f73607 Mon Sep 17 00:00:00 2001 From: Parth Panchal <83201188+parthp2107@users.noreply.github.com> Date: Wed, 17 Aug 2022 11:28:55 +0530 Subject: [PATCH] Added SSL config for Kafka publisher (#6752) * Added ssl config for kafka publisher * Added ssl config for kafka publisher * Added ssl config for kafka publisher --- .../kafka/KafkaWebhookEventPublisher.java | 25 ++++++++++++ .../kafkaEventConfiguration.json | 39 +++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/kafka/KafkaWebhookEventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/kafka/KafkaWebhookEventPublisher.java index d8f6c33327c..8c29495bd96 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/kafka/KafkaWebhookEventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/kafka/KafkaWebhookEventPublisher.java @@ -3,9 +3,11 @@ package org.openmetadata.catalog.kafka; import com.fasterxml.jackson.core.JsonProcessingException; import java.util.Properties; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.SslConfigs; import org.openmetadata.catalog.events.WebhookPublisher; import org.openmetadata.catalog.jdbi3.CollectionDAO; import org.openmetadata.catalog.resources.events.EventResource; @@ -23,6 +25,29 @@ public class KafkaWebhookEventPublisher extends WebhookPublisher { public KafkaWebhookEventPublisher(Webhook webhook, CollectionDAO dao) { super(webhook, dao); this.webhook = webhook; + if (webhook.getKafkaProperties().getSecurityProtocol().equals(KafkaEventConfiguration.SecurityProtocol.SSL)) { + // configuration for SSL Encryption + if (webhook.getKafkaProperties().getSSLProtocol() != null + && webhook.getKafkaProperties().getSSLTrustStoreLocation() != null + && webhook.getKafkaProperties().getSSLTrustStorePassword() != null + && webhook.getKafkaProperties().getSSLKeystoreLocation() != null + && webhook.getKafkaProperties().getSSLKeystorePassword() != null + && webhook.getKafkaProperties().getSSLKeyPassword() != null) { + properties.put( + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, webhook.getKafkaProperties().getSecurityProtocol()); + properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, webhook.getKafkaProperties().getSSLProtocol()); + properties.put( + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, webhook.getKafkaProperties().getSSLTrustStoreLocation()); + properties.put( + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, webhook.getKafkaProperties().getSSLTrustStorePassword()); + // configuration for SSL Authentication + properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, webhook.getKafkaProperties().getSSLKeystoreLocation()); + properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, webhook.getKafkaProperties().getSSLKeystorePassword()); + properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, webhook.getKafkaProperties().getSSLKeyPassword()); + } else { + LOG.info("The SSL could not be configured as the required properties are not defined!"); + } + } properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, webhook.getEndpoint().toString()); properties.put(ProducerConfig.ACKS_CONFIG, webhook.getKafkaProperties().getAcks()); properties.put(ProducerConfig.RETRIES_CONFIG, webhook.getKafkaProperties().getRetries()); diff --git a/catalog-rest-service/src/main/resources/json/schema/configuration/kafkaEventConfiguration.json b/catalog-rest-service/src/main/resources/json/schema/configuration/kafkaEventConfiguration.json index f16e564ab20..f65eddff575 100644 --- a/catalog-rest-service/src/main/resources/json/schema/configuration/kafkaEventConfiguration.json +++ b/catalog-rest-service/src/main/resources/json/schema/configuration/kafkaEventConfiguration.json @@ -42,6 +42,45 @@ "description": "Serializer class for value", "default": "org.apache.kafka.common.serialization.StringSerializer", "type": "string" + }, + "securityProtocol": { + "description": "Kafka security protocol config", + "default": "PLAINTEXT", + "type": "string", + "enum": ["PLAINTEXT", "SSL"], + "javaEnums": [ + { + "name": "PLAINTEXT" + }, + { + "name": "SSL" + } + ] + }, + "SSLProtocol": { + "description": "Kafka SSL protocol config", + "default": "TLSv1.2", + "type": "string" + }, + "SSLTrustStoreLocation": { + "description": "Kafka SSL truststore location", + "type": "string" + }, + "SSLTrustStorePassword": { + "description": "Kafka SSL truststore password", + "type": "string" + }, + "SSLKeystoreLocation": { + "description": "Kafka SSL keystore location", + "type": "string" + }, + "SSLKeystorePassword": { + "description": "Kafka SSL keystore password", + "type": "string" + }, + "SSLKeyPassword": { + "description": "Kafka SSL key password", + "type": "string" } }, "required": ["topics"],