diff --git a/docker/datahub-gms/start.sh b/docker/datahub-gms/start.sh index 51cee07907..1956f6fade 100755 --- a/docker/datahub-gms/start.sh +++ b/docker/datahub-gms/start.sh @@ -7,7 +7,7 @@ fi dockerize \ -wait tcp://$EBEAN_DATASOURCE_HOST \ - -wait tcp://$KAFKA_BOOTSTRAP_SERVER \ + -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') \ -wait http://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT \ -wait $NEO4J_HOST \ -timeout 240s \ diff --git a/docker/datahub-mae-consumer/start.sh b/docker/datahub-mae-consumer/start.sh index bc2e08a891..99ebd5b64a 100755 --- a/docker/datahub-mae-consumer/start.sh +++ b/docker/datahub-mae-consumer/start.sh @@ -6,7 +6,7 @@ if ! echo $NEO4J_HOST | grep -q "://" ; then fi dockerize \ - -wait tcp://$KAFKA_BOOTSTRAP_SERVER \ + -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') \ -wait http://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT \ -wait $NEO4J_HOST \ -timeout 240s \ diff --git a/docker/datahub-mce-consumer/start.sh b/docker/datahub-mce-consumer/start.sh index 302df31d10..f8ed658751 100755 --- a/docker/datahub-mce-consumer/start.sh +++ b/docker/datahub-mce-consumer/start.sh @@ -2,6 +2,6 @@ # -wait tcp://GMS_HOST:$GMS_PORT \ dockerize \ - -wait tcp://$KAFKA_BOOTSTRAP_SERVER \ + -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') \ -timeout 240s \ java -jar /datahub/datahub-mce-consumer/bin/mce-consumer-job.jar \ No newline at end of file diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java index 2b6e6e17b4..40e86655b6 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java +++ b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java @@ -3,6 +3,7 @@ package com.linkedin.metadata.kafka.config; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.time.Duration; +import java.util.Arrays; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; @@ -40,7 +41,7 @@ public class KafkaConfig { // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS if (kafkaBootstrapServer != null && kafkaBootstrapServer.length() > 0) { - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(kafkaBootstrapServer.split(","))); } // else we rely on KafkaProperties which defaults to localhost:9092 props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);