test(KafkaEmitter): Enable ability to run test locally (#6123)

This commit is contained in:
david-leifker 2022-10-06 15:34:11 -05:00 committed by GitHub
parent 3f84a2050a
commit 3dc25c14e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 58 additions and 28 deletions

View File

@ -4,6 +4,8 @@ import static datahub.client.kafka.KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -12,10 +14,11 @@ import org.apache.avro.Schema;
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.junit.Before; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.testcontainers.containers.Network; import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startables;
import org.testng.Assert; import org.testng.Assert;
import com.linkedin.dataset.DatasetProperties; import com.linkedin.dataset.DatasetProperties;
@ -37,27 +40,30 @@ public class KafkaEmitterTest {
private static ZookeeperContainer zookeeperContainer; private static ZookeeperContainer zookeeperContainer;
private static KafkaContainer kafkaContainer; private static KafkaContainer kafkaContainer;
private static SchemaRegistryContainer schemaRegistryContainer; private static SchemaRegistryContainer schemaRegistryContainer;
private KafkaEmitterConfig config; private static KafkaEmitterConfig config;
private KafkaEmitter emitter; private static KafkaEmitter emitter;
@SuppressWarnings("resource") @SuppressWarnings("resource")
@Before @BeforeClass
public void confluentSetup() throws Exception { public static void confluentSetup() throws Exception {
network = Network.newNetwork(); network = Network.newNetwork();
zookeeperContainer = new ZookeeperContainer().withNetwork(network); zookeeperContainer = new ZookeeperContainer().withNetwork(network);
kafkaContainer = new KafkaContainer(zookeeperContainer.getInternalUrl()).withNetwork(network); kafkaContainer = new KafkaContainer(zookeeperContainer.getInternalUrl())
schemaRegistryContainer = new SchemaRegistryContainer(zookeeperContainer.getInternalUrl()).withNetwork(network); .withNetwork(network)
Startables.deepStart(Stream.of(zookeeperContainer, kafkaContainer, schemaRegistryContainer)).join(); .dependsOn(zookeeperContainer);
schemaRegistryContainer = new SchemaRegistryContainer(zookeeperContainer.getInternalUrl())
.withNetwork(network)
.dependsOn(zookeeperContainer, kafkaContainer);
schemaRegistryContainer.start();
createKafkaEmitter(); String bootstrap = createTopics(kafkaContainer.getBootstrapServers());
createTopics(); createKafkaEmitter(bootstrap);
registerSchemaRegistryTypes(); registerSchemaRegistryTypes();
} }
public void createKafkaEmitter() throws IOException { public static void createKafkaEmitter(String bootstrap) throws IOException {
KafkaEmitterConfig.KafkaEmitterConfigBuilder builder = KafkaEmitterConfig.builder(); KafkaEmitterConfig.KafkaEmitterConfigBuilder builder = KafkaEmitterConfig.builder();
builder.bootstrap(kafkaContainer.getBootstrapServers()); builder.bootstrap(bootstrap);
builder.schemaRegistryUrl(schemaRegistryContainer.getUrl()); builder.schemaRegistryUrl(schemaRegistryContainer.getUrl());
config = builder.build(); config = builder.build();
emitter = new KafkaEmitter(config); emitter = new KafkaEmitter(config);
@ -80,8 +86,11 @@ public class KafkaEmitterTest {
Assert.assertTrue(response.isSuccess()); Assert.assertTrue(response.isSuccess());
} }
private AdminClient createAdminClient() { private static AdminClient createAdminClient(String bootstrap) {
return KafkaAdminClient.create(emitter.getKafkaConfgiProperties()); // Fail fast
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
return KafkaAdminClient.create(props);
} }
private static void registerSchemaRegistryTypes() throws IOException, RestClientException { private static void registerSchemaRegistryTypes() throws IOException, RestClientException {
@ -90,11 +99,17 @@ public class KafkaEmitterTest {
schemaRegistryClient.register(mcpSchema.getFullName(), mcpSchema); schemaRegistryClient.register(mcpSchema.getFullName(), mcpSchema);
} }
private void createTopics() throws InterruptedException, ExecutionException { private static String createTopics(Stream<String> bootstraps) {
AdminClient adminClient = createAdminClient();
short replicationFactor = 1; short replicationFactor = 1;
int partitions = 1; int partitions = 1;
adminClient.createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get(); return bootstraps.parallel().map(bootstrap -> {
try {
createAdminClient(bootstrap).createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get();
return bootstrap;
} catch (TimeoutException | InterruptedException | ExecutionException ex) {
return null;
}
}).filter(Objects::nonNull).findFirst().get();
} }
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")

View File

@ -2,6 +2,7 @@ package datahub.client.kafka.containers;
import com.github.dockerjava.api.command.InspectContainerResponse; import com.github.dockerjava.api.command.InspectContainerResponse;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.images.builder.Transferable; import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.TestcontainersConfiguration; import org.testcontainers.utility.TestcontainersConfiguration;
@ -20,6 +21,7 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
private static final String STARTER_SCRIPT = "/testcontainers_start.sh"; private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
private static final int KAFKA_INTERNAL_PORT = 9092; private static final int KAFKA_INTERNAL_PORT = 9092;
private static final int KAFKA_LOCAL_PORT = 9093;
public static final int KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT = 29092; public static final int KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT = 29092;
@ -39,15 +41,17 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
super(getKafkaContainerImage(confluentPlatformVersion)); super(getKafkaContainerImage(confluentPlatformVersion));
this.zookeeperConnect = zookeeperConnect; this.zookeeperConnect = zookeeperConnect;
withExposedPorts(KAFKA_INTERNAL_PORT); withExposedPorts(KAFKA_INTERNAL_PORT, KAFKA_LOCAL_PORT);
// Use two listeners with different names, it will force Kafka to communicate // Use two listeners with different names, it will force Kafka to communicate
// with itself via internal // with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will
// try to use the advertised listener // try to use the advertised listener
withEnv("KAFKA_LISTENERS", withEnv("KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT + ",BROKER://0.0.0.0:" + KAFKA_INTERNAL_PORT); "PLAINTEXT://0.0.0.0:" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); + ",BROKER://0.0.0.0:" + KAFKA_INTERNAL_PORT
+ ",BROKER_LOCAL://0.0.0.0:" + KAFKA_LOCAL_PORT);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,BROKER_LOCAL:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
withEnv("KAFKA_BROKER_ID", "1"); withEnv("KAFKA_BROKER_ID", "1");
@ -57,14 +61,15 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
withNetworkAliases(networkAlias); withNetworkAliases(networkAlias);
waitingFor(new HostPortWaitStrategy());
} }
public String getBootstrapServers() { public Stream<String> getBootstrapServers() {
if (port == PORT_NOT_ASSIGNED) { if (port == PORT_NOT_ASSIGNED) {
throw new IllegalStateException("You should start Kafka container first"); throw new IllegalStateException("You should start Kafka container first");
} }
return String.format("PLAINTEXT://%s:%s", getHost(), port); return Stream.of(String.format("PLAINTEXT://%s:%s", getHost(), port),
String.format("PLAINTEXT://localhost:%s", getMappedPort(KAFKA_LOCAL_PORT)));
} }
@Override @Override
@ -84,10 +89,16 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
return; return;
} }
// Use two listeners with different names, it will force Kafka to communicate
// with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will
// try to use the advertised listener
String command = "#!/bin/bash \n"; String command = "#!/bin/bash \n";
command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n"; command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n";
command += "export KAFKA_ADVERTISED_LISTENERS='" + Stream command += "export KAFKA_ADVERTISED_LISTENERS='" + Stream
.concat(Stream.of("PLAINTEXT://" + networkAlias + ":" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT), .concat(Stream.of("PLAINTEXT://" + networkAlias + ":" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT,
"BROKER_LOCAL://localhost:" + getMappedPort(KAFKA_LOCAL_PORT)),
containerInfo.getNetworkSettings().getNetworks().values().stream() containerInfo.getNetworkSettings().getNetworks().values().stream()
.map(it -> "BROKER://" + it.getIpAddress() + ":" + KAFKA_INTERNAL_PORT)) .map(it -> "BROKER://" + it.getIpAddress() + ":" + KAFKA_INTERNAL_PORT))
.collect(Collectors.joining(",")) + "'\n"; .collect(Collectors.joining(",")) + "'\n";

View File

@ -4,8 +4,10 @@ import static datahub.client.kafka.containers.Utils.CONFLUENT_PLATFORM_VERSION;
import static java.lang.String.format; import static java.lang.String.format;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.TestcontainersConfiguration; import org.testcontainers.utility.TestcontainersConfiguration;
@ -27,7 +29,7 @@ public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryCont
withExposedPorts(SCHEMA_REGISTRY_INTERNAL_PORT); withExposedPorts(SCHEMA_REGISTRY_INTERNAL_PORT);
withNetworkAliases(networkAlias); withNetworkAliases(networkAlias);
waitingFor(Wait.forHttp("/subjects")); waitingFor(new HttpWaitStrategy().forPath("/subjects").withStartupTimeout(Duration.ofMinutes(2)));
} }
public String getUrl() { public String getUrl() {

View File

@ -1,6 +1,7 @@
package datahub.client.kafka.containers; package datahub.client.kafka.containers;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.utility.TestcontainersConfiguration; import org.testcontainers.utility.TestcontainersConfiguration;
import java.io.IOException; import java.io.IOException;
@ -30,6 +31,7 @@ public class ZookeeperContainer extends GenericContainer<ZookeeperContainer> {
addExposedPort(ZOOKEEPER_INTERNAL_PORT); addExposedPort(ZOOKEEPER_INTERNAL_PORT);
withNetworkAliases(networkAlias); withNetworkAliases(networkAlias);
waitingFor(new HostPortWaitStrategy());
} }
public String getInternalUrl() { public String getInternalUrl() {