feat(kafka): bump confluent kafka (#13767)

This commit is contained in:
david-leifker 2025-06-15 10:52:16 -05:00 committed by GitHub
parent 4924602773
commit cbd97186e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 365 additions and 205 deletions

View File

@ -45,7 +45,7 @@ buildscript {
ext.neo4jVersion = '5.20.0'
ext.neo4jTestVersion = '5.20.0'
ext.neo4jApocVersion = '5.20.0'
ext.testContainersVersion = '1.17.4'
ext.testContainersVersion = '1.21.1'
ext.elasticsearchVersion = '2.11.1' // ES 7.10, Opensearch 1.x, 2.x
ext.jacksonVersion = '2.18.4'
ext.jettyVersion = '12.0.21'
@ -57,7 +57,7 @@ buildscript {
ext.slf4jVersion = '1.7.36'
ext.logbackClassic = '1.5.18'
ext.hadoop3Version = '3.3.6'
ext.kafkaVersion = '7.9.1'
ext.kafkaVersion = '8.0.0'
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '15.5.2'
ext.googleJavaFormatVersion = '1.18.1'
@ -295,7 +295,7 @@ project.ext.externalDependency = [
'testContainersElasticsearch': 'org.testcontainers:elasticsearch:' + testContainersVersion,
'testContainersCassandra': 'org.testcontainers:cassandra:' + testContainersVersion,
'testContainersKafka': 'org.testcontainers:kafka:' + testContainersVersion,
'testContainersOpenSearch': 'org.opensearch:opensearch-testcontainers:2.0.0',
'testContainersOpenSearch': 'org.opensearch:opensearch-testcontainers:2.1.3',
'typesafeConfig':'com.typesafe:config:1.4.1',
'wiremock':'com.github.tomakehurst:wiremock:2.10.0',
'zookeeper': 'org.apache.zookeeper:zookeeper:3.8.4',

View File

@ -78,6 +78,9 @@ public class KafkaTrackingProducer {
props.put(
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
config.getString("analytics.kafka.delivery.timeout.ms"));
props.put(
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
config.getString("analytics.kafka.request.timeout.ms"));
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
config.getString("analytics.kafka.bootstrap.server"));
@ -151,6 +154,11 @@ public class KafkaTrackingProducer {
props,
SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS,
"analytics.kafka.sasl.client.callback.handler.class");
setConfig(
config,
props,
"sasl.oauthbearer.token.endpoint.url",
"analytics.kafka.sasl.oauthbearer.token.endpoint.url");
}
}

View File

@ -237,6 +237,8 @@ analytics.tracking.topic = DataHubUsageEvent_v1
analytics.tracking.topic = ${?DATAHUB_TRACKING_TOPIC}
analytics.kafka.delivery.timeout.ms = 30000
analytics.kafka.delivery.timeout.ms = ${?KAFKA_PROPERTIES_DELIVERY_TIMEOUT_MS}
analytics.kafka.request.timeout.ms = 3000
analytics.kafka.request.timeout.ms = ${?KAFKA_PROPERTIES_REQUEST_TIMEOUT_MS}
# Kafka Producer SSL Configs. All must be provided to enable SSL.
analytics.kafka.security.protocol = ${?KAFKA_PROPERTIES_SECURITY_PROTOCOL}
@ -255,6 +257,7 @@ analytics.kafka.sasl.jaas.config = ${?KAFKA_PROPERTIES_SASL_JAAS_CONFIG}
analytics.kafka.sasl.kerberos.service.name = ${?KAFKA_PROPERTIES_SASL_KERBEROS_SERVICE_NAME}
analytics.kafka.sasl.login.callback.handler.class = ${?KAFKA_PROPERTIES_SASL_LOGIN_CALLBACK_HANDLER_CLASS}
analytics.kafka.sasl.client.callback.handler.class = ${?KAFKA_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS}
analytics.kafka.sasl.oauthbearer.token.endpoint.url = ${?KAFKA_PROPERTIES_SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL}
# Required Elastic Client Configuration
analytics.elastic.host = ${?ELASTIC_CLIENT_HOST}

View File

@ -148,7 +148,7 @@ services:
- neo4jdata:/data
schema-registry:
hostname: schema-registry
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081
env_file: schema-registry/env/docker.env
@ -163,7 +163,7 @@ services:
condition: service_healthy
broker:
hostname: broker
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- 29092:29092
- 9092:9092
@ -181,7 +181,7 @@ services:
- broker:/var/lib/kafka/data/
zookeeper:
hostname: zookeeper
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- 2181:2181
env_file: zookeeper/env/docker.env

View File

@ -126,7 +126,7 @@ services:
- esdata:/usr/share/elasticsearch/data
schema-registry:
hostname: schema-registry
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081
env_file: schema-registry/env/docker.env
@ -141,7 +141,7 @@ services:
condition: service_healthy
broker:
hostname: broker
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092
env_file: broker/env/docker.env
@ -158,7 +158,7 @@ services:
- broker:/var/lib/kafka/data/
zookeeper:
hostname: zookeeper
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181
env_file: zookeeper/env/docker.env

View File

@ -2,7 +2,7 @@
---
services:
kafka-rest-proxy:
image: confluentinc/cp-kafka-rest:7.9.1
image: confluentinc/cp-kafka-rest:8.0.0
env_file: kafka-rest-proxy/env/docker.env
hostname: kafka-rest-proxy
ports:

View File

@ -146,7 +146,7 @@ services:
- neo4jdata:/data
schema-registry:
hostname: schema-registry
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081
env_file: schema-registry/env/docker.env
@ -161,7 +161,7 @@ services:
condition: service_healthy
broker:
hostname: broker
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092
env_file: broker/env/docker.env
@ -178,7 +178,7 @@ services:
- broker:/var/lib/kafka/data/
zookeeper:
hostname: zookeeper
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181
env_file: zookeeper/env/docker.env

View File

@ -231,7 +231,7 @@ services:
- neo4jdata:/data
kafka-broker:
hostname: broker
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
command:
- /bin/bash
- -c

View File

@ -24,7 +24,7 @@ services:
test: nc -z broker $${DATAHUB_KAFKA_BROKER_PORT:-9092}
timeout: 5s
hostname: broker
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092
volumes:
@ -274,7 +274,7 @@ services:
test: nc -z schema-registry ${DATAHUB_SCHEMA_REGISTRY_PORT:-8081}
timeout: 5s
hostname: schema-registry
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081
zookeeper:
@ -288,7 +288,7 @@ services:
test: echo srvr | nc zookeeper $${DATAHUB_ZK_PORT:-2181}
timeout: 5s
hostname: zookeeper
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181
volumes:

View File

@ -24,7 +24,7 @@ services:
test: nc -z broker $${DATAHUB_KAFKA_BROKER_PORT:-9092}
timeout: 5s
hostname: broker
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092
volumes:
@ -248,7 +248,7 @@ services:
test: nc -z schema-registry ${DATAHUB_SCHEMA_REGISTRY_PORT:-8081}
timeout: 5s
hostname: schema-registry
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081
zookeeper:
@ -262,7 +262,7 @@ services:
test: echo srvr | nc zookeeper $${DATAHUB_ZK_PORT:-2181}
timeout: 5s
hostname: zookeeper
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181
volumes:

View File

@ -24,7 +24,7 @@ services:
test: nc -z broker $${DATAHUB_KAFKA_BROKER_PORT:-9092}
timeout: 5s
hostname: broker
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092
volumes:
@ -248,7 +248,7 @@ services:
test: nc -z schema-registry ${DATAHUB_SCHEMA_REGISTRY_PORT:-8081}
timeout: 5s
hostname: schema-registry
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081
zookeeper:
@ -262,7 +262,7 @@ services:
test: echo srvr | nc zookeeper $${DATAHUB_ZK_PORT:-2181}
timeout: 5s
hostname: zookeeper
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181
volumes:

View File

@ -24,7 +24,7 @@ services:
test: nc -z broker $${DATAHUB_KAFKA_BROKER_PORT:-9092}
timeout: 5s
hostname: broker
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092
volumes:
@ -274,7 +274,7 @@ services:
test: nc -z schema-registry ${DATAHUB_SCHEMA_REGISTRY_PORT:-8081}
timeout: 5s
hostname: schema-registry
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081
zookeeper:
@ -288,7 +288,7 @@ services:
test: echo srvr | nc zookeeper $${DATAHUB_ZK_PORT:-2181}
timeout: 5s
hostname: zookeeper
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1}
image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0}
ports:
- ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181
volumes:

View File

@ -7,16 +7,13 @@ import com.linkedin.dataset.DatasetProperties;
import datahub.client.MetadataWriteResponse;
import datahub.client.kafka.containers.KafkaContainer;
import datahub.client.kafka.containers.SchemaRegistryContainer;
import datahub.client.kafka.containers.ZookeeperContainer;
import datahub.event.MetadataChangeProposalWrapper;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.IOException;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
@ -33,7 +30,6 @@ public class KafkaEmitterTest {
private static Network network;
private static ZookeeperContainer zookeeperContainer;
private static KafkaContainer kafkaContainer;
private static SchemaRegistryContainer schemaRegistryContainer;
private static KafkaEmitterConfig config;
@ -43,20 +39,20 @@ public class KafkaEmitterTest {
@BeforeClass
public static void confluentSetup() throws Exception {
network = Network.newNetwork();
zookeeperContainer = new ZookeeperContainer().withNetwork(network);
kafkaContainer =
new KafkaContainer(zookeeperContainer.getInternalUrl())
.withNetwork(network)
.dependsOn(zookeeperContainer);
// Start Kafka with KRaft (no Zookeeper needed)
kafkaContainer = new KafkaContainer().withNetwork(network);
kafkaContainer.start();
// Schema Registry now only depends on Kafka
schemaRegistryContainer =
new SchemaRegistryContainer(
zookeeperContainer.getInternalUrl(), kafkaContainer.getInternalBootstrapServers())
new SchemaRegistryContainer(kafkaContainer.getInternalBootstrapServers())
.withNetwork(network)
.dependsOn(zookeeperContainer, kafkaContainer);
.dependsOn(kafkaContainer);
schemaRegistryContainer.start();
String bootstrap = createTopics(kafkaContainer.getBootstrapServers());
createKafkaEmitter(bootstrap);
createTopics(kafkaContainer.getBootstrapServers());
createKafkaEmitter(kafkaContainer.getBootstrapServers());
registerSchemaRegistryTypes();
}
@ -100,25 +96,18 @@ public class KafkaEmitterTest {
schemaRegistryClient.register(mcpSchema.getFullName(), mcpSchema);
}
private static String createTopics(Stream<String> bootstraps) {
private static void createTopics(String bootstrap)
throws ExecutionException, InterruptedException {
short replicationFactor = 1;
int partitions = 1;
return bootstraps
.parallel()
.map(
bootstrap -> {
try {
createAdminClient(bootstrap)
.createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor)))
.all()
.get();
return bootstrap;
} catch (RuntimeException | InterruptedException | ExecutionException ex) {
return null;
}
})
.filter(Objects::nonNull)
.findFirst()
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
AdminClient adminClient = KafkaAdminClient.create(props);
adminClient
.createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor)))
.all()
.get();
}

View File

@ -2,130 +2,92 @@ package datahub.client.kafka.containers;
import static datahub.client.kafka.containers.Utils.CONFLUENT_PLATFORM_VERSION;
import com.github.dockerjava.api.command.InspectContainerResponse;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.TestcontainersConfiguration;
/** This container wraps Confluent Kafka. */
/** This container wraps Confluent Kafka using KRaft mode (no Zookeeper required). */
public class KafkaContainer extends GenericContainer<KafkaContainer> {
private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
private static final int KAFKA_INTERNAL_PORT = 9092;
private static final int KAFKA_LOCAL_PORT = 9093;
private static final int KAFKA_EXTERNAL_PORT = 9093;
private static final int KAFKA_CONTROLLER_PORT = 29093;
public static final int KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT = 29092;
private static final int PORT_NOT_ASSIGNED = -1;
private String zookeeperConnect = null;
private int port = PORT_NOT_ASSIGNED;
private final String networkAlias = "kafka";
public KafkaContainer(String zookeeperConnect) {
this(CONFLUENT_PLATFORM_VERSION, zookeeperConnect);
public KafkaContainer() {
this(CONFLUENT_PLATFORM_VERSION);
}
public KafkaContainer(String confluentPlatformVersion, String zookeeperConnect) {
public KafkaContainer(String confluentPlatformVersion) {
super(getKafkaContainerImage(confluentPlatformVersion));
this.zookeeperConnect = zookeeperConnect;
withExposedPorts(KAFKA_INTERNAL_PORT, KAFKA_LOCAL_PORT);
// Fix the external port to 9093 to match the advertised listener
// Only expose the external port, not the internal port to avoid conflicts
addFixedExposedPort(KAFKA_EXTERNAL_PORT, KAFKA_EXTERNAL_PORT);
// Use two listeners with different names, it will force Kafka to communicate
// with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will
// try to use the advertised listener
// KRaft configuration
String clusterId = java.util.UUID.randomUUID().toString();
withEnv("KAFKA_NODE_ID", "1");
withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@" + networkAlias + ":" + KAFKA_CONTROLLER_PORT);
withEnv("CLUSTER_ID", clusterId);
withEnv("KAFKA_CLUSTER_ID", clusterId);
// Configure listeners
withEnv(
"KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:"
+ KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT
+ ",CONTROLLER://0.0.0.0:"
+ KAFKA_CONTROLLER_PORT
+ ",BROKER://0.0.0.0:"
+ KAFKA_INTERNAL_PORT
+ ",BROKER_LOCAL://0.0.0.0:"
+ KAFKA_LOCAL_PORT);
+ ",BROKER_EXTERNAL://0.0.0.0:"
+ KAFKA_EXTERNAL_PORT);
withEnv(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,BROKER_LOCAL:PLAINTEXT");
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT,BROKER_EXTERNAL:PLAINTEXT");
// Advertised listeners - external port must match the container port
withEnv(
"KAFKA_ADVERTISED_LISTENERS",
String.format(
"PLAINTEXT://%s:%d,BROKER://%s:%d,BROKER_EXTERNAL://localhost:%d",
networkAlias,
KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT,
networkAlias,
KAFKA_INTERNAL_PORT,
KAFKA_EXTERNAL_PORT // This must be 9093, not a mapped port
));
withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
// Common Kafka settings
withEnv("KAFKA_BROKER_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
withNetworkAliases(networkAlias);
waitingFor(new HostPortWaitStrategy());
// Wait for Kafka to be ready by checking the external port
waitingFor(Wait.forListeningPort());
}
public Stream<String> getBootstrapServers() {
if (port == PORT_NOT_ASSIGNED) {
throw new IllegalStateException("You should start Kafka container first");
}
return Stream.of(
String.format("PLAINTEXT://%s:%s", getHost(), port),
String.format("PLAINTEXT://localhost:%s", getMappedPort(KAFKA_LOCAL_PORT)));
public String getBootstrapServers() {
return "localhost:" + KAFKA_EXTERNAL_PORT;
}
public String getInternalBootstrapServers() {
return String.format("PLAINTEXT://%s:%s", networkAlias, KAFKA_INTERNAL_PORT);
}
@Override
protected void doStart() {
withCommand(
"sh",
"-c",
"while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + "sh " + STARTER_SCRIPT);
super.doStart();
}
@Override
protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
super.containerIsStarting(containerInfo, reused);
port = getMappedPort(KAFKA_INTERNAL_PORT);
if (reused) {
return;
}
// Use two listeners with different names, it will force Kafka to communicate
// with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will
// try to use the advertised listener
String command = "#!/bin/bash \n";
command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n";
command +=
"export KAFKA_ADVERTISED_LISTENERS='"
+ Stream.concat(
Stream.of(
"PLAINTEXT://"
+ networkAlias
+ ":"
+ KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT,
"BROKER_LOCAL://localhost:" + getMappedPort(KAFKA_LOCAL_PORT)),
containerInfo.getNetworkSettings().getNetworks().values().stream()
.map(it -> "BROKER://" + it.getIpAddress() + ":" + KAFKA_INTERNAL_PORT))
.collect(Collectors.joining(","))
+ "'\n";
command += ". /etc/confluent/docker/bash-config \n";
command += "/etc/confluent/docker/configure \n";
command += "/etc/confluent/docker/launch \n";
copyFileToContainer(
Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), STARTER_SCRIPT);
return networkAlias + ":" + KAFKA_INTERNAL_PORT;
}
private static String getKafkaContainerImage(String confluentPlatformVersion) {

View File

@ -1,44 +1,37 @@
package datahub.client.kafka.containers;
import static datahub.client.kafka.containers.Utils.CONFLUENT_PLATFORM_VERSION;
import static java.lang.String.format;
import java.io.IOException;
import java.time.Duration;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.TestcontainersConfiguration;
/** This container wraps Confluent Schema Registry. */
public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {
private static final int SCHEMA_REGISTRY_INTERNAL_PORT = 8081;
private final String networkAlias = "schema-registry";
public SchemaRegistryContainer(String zookeeperConnect, String kafkaBootstrap)
throws IOException {
this(CONFLUENT_PLATFORM_VERSION, zookeeperConnect, kafkaBootstrap);
public SchemaRegistryContainer(String kafkaBootstrapServers) {
this(CONFLUENT_PLATFORM_VERSION, kafkaBootstrapServers);
}
public SchemaRegistryContainer(
String confluentPlatformVersion, String zookeeperConnect, String kafkaBootstrap)
throws IOException {
public SchemaRegistryContainer(String confluentPlatformVersion, String kafkaBootstrapServers) {
super(getSchemaRegistryContainerImage(confluentPlatformVersion));
addEnv("SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL", zookeeperConnect);
addEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost");
addEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", kafkaBootstrap);
withExposedPorts(SCHEMA_REGISTRY_INTERNAL_PORT);
withNetworkAliases(networkAlias);
waitingFor(
new HttpWaitStrategy().forPath("/subjects").withStartupTimeout(Duration.ofMinutes(2)));
withEnv("SCHEMA_REGISTRY_HOST_NAME", networkAlias);
withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + SCHEMA_REGISTRY_INTERNAL_PORT);
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", kafkaBootstrapServers);
waitingFor(Wait.forHttp("/subjects").forPort(SCHEMA_REGISTRY_INTERNAL_PORT));
}
public String getUrl() {
return format(
"http://%s:%d",
this.getContainerIpAddress(), this.getMappedPort(SCHEMA_REGISTRY_INTERNAL_PORT));
return String.format("http://%s:%d", getHost(), getMappedPort(SCHEMA_REGISTRY_INTERNAL_PORT));
}
private static String getSchemaRegistryContainerImage(String confluentPlatformVersion) {

View File

@ -4,7 +4,7 @@ import java.io.IOException;
import java.net.ServerSocket;
final class Utils {
public static final String CONFLUENT_PLATFORM_VERSION = "7.9.1";
public static final String CONFLUENT_PLATFORM_VERSION = "8.0.0";
private Utils() {}

View File

@ -10,6 +10,7 @@ import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import io.datahubproject.openapi.v1.models.TraceStorageStatus;
import io.datahubproject.openapi.v1.models.TraceWriteStatus;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@ -23,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
@ -37,12 +39,13 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.Utils;
@Slf4j
@SuperBuilder
@ -242,11 +245,8 @@ public abstract class KafkaTraceReader<T extends RecordTemplate> {
try {
// Get all topic partitions first
Map<String, TopicDescription> topicInfo =
adminClient
.describeTopics(Collections.singletonList(getTopicName()))
.all()
.get(timeoutSeconds, TimeUnit.SECONDS);
Map<String, KafkaFuture<TopicDescription>> topicInfo =
adminClient.describeTopics(Collections.singletonList(getTopicName())).topicNameValues();
if (topicInfo == null || !topicInfo.containsKey(getTopicName())) {
log.error("Failed to get topic information for topic: {}", getTopicName());
@ -255,7 +255,7 @@ public abstract class KafkaTraceReader<T extends RecordTemplate> {
// Create a list of all TopicPartitions
List<TopicPartition> allPartitions =
topicInfo.get(getTopicName()).partitions().stream()
topicInfo.get(getTopicName()).get(timeoutSeconds, TimeUnit.SECONDS).partitions().stream()
.map(partitionInfo -> new TopicPartition(getTopicName(), partitionInfo.partition()))
.collect(Collectors.toList());
@ -320,11 +320,8 @@ public abstract class KafkaTraceReader<T extends RecordTemplate> {
public Map<TopicPartition, Long> getEndOffsets(boolean skipCache) {
try {
// Get all topic partitions first (reuse the same approach as in getAllPartitionOffsets)
Map<String, TopicDescription> topicInfo =
adminClient
.describeTopics(Collections.singletonList(getTopicName()))
.all()
.get(timeoutSeconds, TimeUnit.SECONDS);
Map<String, KafkaFuture<TopicDescription>> topicInfo =
adminClient.describeTopics(Collections.singletonList(getTopicName())).topicNameValues();
if (topicInfo == null || !topicInfo.containsKey(getTopicName())) {
log.error("Failed to get topic information for topic: {}", getTopicName());
@ -333,7 +330,7 @@ public abstract class KafkaTraceReader<T extends RecordTemplate> {
// Create a list of all TopicPartitions
List<TopicPartition> allPartitions =
topicInfo.get(getTopicName()).partitions().stream()
topicInfo.get(getTopicName()).get(timeoutSeconds, TimeUnit.SECONDS).partitions().stream()
.map(partitionInfo -> new TopicPartition(getTopicName(), partitionInfo.partition()))
.collect(Collectors.toList());
@ -628,14 +625,13 @@ public abstract class KafkaTraceReader<T extends RecordTemplate> {
urn.toString(),
key -> {
try {
DefaultPartitioner partitioner = new DefaultPartitioner();
TopicDescription topicDescription =
adminClient
.describeTopics(Collections.singletonList(getTopicName()))
.all()
.get()
.get(getTopicName());
.topicNameValues()
.get(getTopicName())
.get(timeoutSeconds, TimeUnit.SECONDS);
if (topicDescription == null) {
throw new IllegalStateException("Topic " + getTopicName() + " not found");
@ -664,11 +660,13 @@ public abstract class KafkaTraceReader<T extends RecordTemplate> {
new Cluster(
null, nodes, partitions, Collections.emptySet(), Collections.emptySet());
int partition =
partitioner.partition(getTopicName(), key, key.getBytes(), null, null, cluster);
int partition = getPartitionForKey(key, cluster.partitionCountForTopic(getTopicName()));
return new TopicPartition(getTopicName(), partition);
} catch (InterruptedException | ExecutionException e) {
} catch (InterruptedException
| ExecutionException
| RuntimeException
| TimeoutException e) {
throw new RuntimeException("Failed to get topic partition for " + key, e);
}
});
@ -684,4 +682,19 @@ public abstract class KafkaTraceReader<T extends RecordTemplate> {
return consumer.offsetsForTimes(timestampsToSearch).get(topicPartition);
}
/**
* Calculate which partition a key would be assigned to. This replicates Kafka's default
* partitioning behavior.
*/
private static int getPartitionForKey(String key, int numPartitions) {
if (key == null) {
throw new IllegalArgumentException("Key cannot be null");
}
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
// Use murmur2 hash (same as Kafka default)
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

View File

@ -2,6 +2,7 @@ package com.linkedin.metadata.trace;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -27,8 +28,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.admin.AdminClient;
@ -87,9 +91,8 @@ public abstract class BaseKafkaTraceReaderTest<M extends RecordTemplate> {
new TopicDescription(TOPIC_NAME, false, Collections.singletonList(partitionInfo));
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
when(mockDescribeTopicsResult.all())
.thenReturn(
KafkaFuture.completedFuture(Collections.singletonMap(TOPIC_NAME, topicDescription)));
when(mockDescribeTopicsResult.topicNameValues())
.thenReturn(Map.of(TOPIC_NAME, KafkaFuture.completedFuture(topicDescription)));
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
// Mock consumer group offset lookup
@ -257,9 +260,8 @@ public abstract class BaseKafkaTraceReaderTest<M extends RecordTemplate> {
TopicDescription topicDescription = new TopicDescription(TOPIC_NAME, false, partitionInfos);
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
when(mockDescribeTopicsResult.all())
.thenReturn(
KafkaFuture.completedFuture(Collections.singletonMap(TOPIC_NAME, topicDescription)));
when(mockDescribeTopicsResult.topicNameValues())
.thenReturn(Map.of(TOPIC_NAME, KafkaFuture.completedFuture(topicDescription)));
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
// Setup consumer group offsets for multiple partitions
@ -309,9 +311,8 @@ public abstract class BaseKafkaTraceReaderTest<M extends RecordTemplate> {
new TopicDescription(TOPIC_NAME, false, Collections.singletonList(partitionInfo));
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
when(mockDescribeTopicsResult.all())
.thenReturn(
KafkaFuture.completedFuture(Collections.singletonMap(TOPIC_NAME, topicDescription)));
when(mockDescribeTopicsResult.topicNameValues())
.thenReturn(Map.of(TOPIC_NAME, KafkaFuture.completedFuture(topicDescription)));
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
@ -368,9 +369,8 @@ public abstract class BaseKafkaTraceReaderTest<M extends RecordTemplate> {
new TopicDescription(TOPIC_NAME, false, Collections.singletonList(partitionInfo));
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
when(mockDescribeTopicsResult.all())
.thenReturn(
KafkaFuture.completedFuture(Collections.singletonMap(TOPIC_NAME, topicDescription)));
when(mockDescribeTopicsResult.topicNameValues())
.thenReturn(Map.of(TOPIC_NAME, KafkaFuture.completedFuture(topicDescription)));
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
// Setup consumer to return end offsets
@ -408,9 +408,8 @@ public abstract class BaseKafkaTraceReaderTest<M extends RecordTemplate> {
new TopicDescription(TOPIC_NAME, false, Collections.singletonList(partitionInfo));
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
when(mockDescribeTopicsResult.all())
.thenReturn(
KafkaFuture.completedFuture(Collections.singletonMap(TOPIC_NAME, topicDescription)));
when(mockDescribeTopicsResult.topicNameValues())
.thenReturn(Map.of(TOPIC_NAME, KafkaFuture.completedFuture(topicDescription)));
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
// Setup consumer to return end offsets
@ -539,4 +538,126 @@ public abstract class BaseKafkaTraceReaderTest<M extends RecordTemplate> {
// Verify that endOffsets was called twice
verify(consumer, times(2)).endOffsets(anyCollection());
}
@Test
public void testGetAllPartitionOffsets_FailedToGetTopicInfo() {
// Arrange
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
when(mockDescribeTopicsResult.topicNameValues())
.thenReturn(Collections.emptyMap()); // Return empty map to simulate topic not found
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
// Act
Map<TopicPartition, OffsetAndMetadata> result = traceReader.getAllPartitionOffsets(false);
// Assert
assertTrue(result.isEmpty());
// The error should be logged but method returns empty map
}
@Test
public void testGetEndOffsets_FailedToGetTopicInfo() {
// Arrange
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
when(mockDescribeTopicsResult.topicNameValues())
.thenReturn(Collections.emptyMap()); // Return empty map to simulate topic not found
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
// Act
Map<TopicPartition, Long> result = traceReader.getEndOffsets(false);
// Assert
assertTrue(result.isEmpty());
// The error should be logged but method returns empty map
}
@Test
public void testGetTopicPartition_TopicNotFound() throws Exception {
// Arrange
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
KafkaFuture<TopicDescription> nullFuture = KafkaFuture.completedFuture(null);
when(mockDescribeTopicsResult.topicNameValues()).thenReturn(Map.of(TOPIC_NAME, nullFuture));
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
// Act - findMessages catches the exception and returns empty map
Map<Urn, Map<String, Pair<ConsumerRecord<String, GenericRecord>, SystemMetadata>>> result =
traceReader.findMessages(
Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME)),
TRACE_ID,
System.currentTimeMillis());
// Assert - the method should return an empty map for the URN that had an error
assertTrue(result.containsKey(TEST_URN));
assertTrue(result.get(TEST_URN).isEmpty());
}
@Test
public void testGetTopicPartition_ExecutionException() throws Exception {
// Arrange
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
KafkaFuture<TopicDescription> failedFuture = mock(KafkaFuture.class);
when(failedFuture.get(anyLong(), any(TimeUnit.class)))
.thenThrow(new ExecutionException("Test exception", new RuntimeException()));
when(mockDescribeTopicsResult.topicNameValues()).thenReturn(Map.of(TOPIC_NAME, failedFuture));
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
// Act - findMessages catches the exception and returns empty map
Map<Urn, Map<String, Pair<ConsumerRecord<String, GenericRecord>, SystemMetadata>>> result =
traceReader.findMessages(
Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME)),
TRACE_ID,
System.currentTimeMillis());
// Assert - the method should return an empty map for the URN that had an error
assertTrue(result.containsKey(TEST_URN));
assertTrue(result.get(TEST_URN).isEmpty());
}
@Test
public void testGetTopicPartition_InterruptedException() throws Exception {
// Arrange
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
KafkaFuture<TopicDescription> failedFuture = mock(KafkaFuture.class);
when(failedFuture.get(anyLong(), any(TimeUnit.class)))
.thenThrow(new InterruptedException("Test interruption"));
when(mockDescribeTopicsResult.topicNameValues()).thenReturn(Map.of(TOPIC_NAME, failedFuture));
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
// Act - findMessages catches the exception and returns empty map
Map<Urn, Map<String, Pair<ConsumerRecord<String, GenericRecord>, SystemMetadata>>> result =
traceReader.findMessages(
Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME)),
TRACE_ID,
System.currentTimeMillis());
// Assert - the method should return an empty map for the URN that had an error
assertTrue(result.containsKey(TEST_URN));
assertTrue(result.get(TEST_URN).isEmpty());
}
@Test
public void testGetTopicPartition_TimeoutException() throws Exception {
// Arrange
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
KafkaFuture<TopicDescription> failedFuture = mock(KafkaFuture.class);
when(failedFuture.get(anyLong(), any(TimeUnit.class)))
.thenThrow(new TimeoutException("Test timeout"));
when(mockDescribeTopicsResult.topicNameValues()).thenReturn(Map.of(TOPIC_NAME, failedFuture));
when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult);
// Act - findMessages catches the exception and returns empty map
Map<Urn, Map<String, Pair<ConsumerRecord<String, GenericRecord>, SystemMetadata>>> result =
traceReader.findMessages(
Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME)),
TRACE_ID,
System.currentTimeMillis());
// Assert - the method should return an empty map for the URN that had an error
assertTrue(result.containsKey(TEST_URN));
assertTrue(result.get(TEST_URN).isEmpty());
}
}

View File

@ -1,19 +1,35 @@
<configuration debug="false" scan="true">
<appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
<pattern>%date{ISO8601} [%thread] %-5level %logger{36} urn=%X{entityUrn:-none} aspect=%X{aspectName:-none} entityType=%X{entityType:-none} changeType=%X{changeType:-none} - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
<filter class="com.linkedin.metadata.utils.log.LogMessageFilter">
<excluded>scanned from multiple locations</excluded>
<excluded>[ignore_throttled] parameter is deprecated because frozen indices have been deprecated</excluded>
</filter>
</appender>
<logger name="com.datastax.oss" level="ERROR" />
<logger name="com.datastax.driver" level="FATAL" />
<!-- Silence noisy third-party libraries -->
<logger name="com.datastax.oss" level="ERROR"/>
<logger name="com.datastax.driver" level="FATAL"/>
<logger name="org.apache.kafka" level="ERROR"/>
<logger name="org.springframework" level="WARN"/>
<logger name="org.eclipse.jetty" level="WARN"/>
<logger name="com.linkedin.restli" level="WARN"/>
<logger name="org.apache.http" level="WARN"/>
<logger name="io.confluent" level="WARN"/>
<logger name="nl.basjes.parse" level="WARN"/>
<logger name="org.hibernate" level="WARN"/>
<logger name="com.linkedin.parseq" level="WARN"/>
<!-- Your application packages can stay at INFO level -->
<logger name="com.linkedin.metadata" level="INFO"/>
<logger name="io.datahubproject" level="INFO"/>
<root level="WARN">
<appender-ref ref="STDOUT" />
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -0,0 +1,26 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
</appender>
<!-- Set root to WARN to eliminate most third-party logs -->
<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
<!-- Only enable INFO for your application's packages -->
<logger name="com.linkedin.metadata" level="INFO"/>
<logger name="io.datahubproject" level="INFO"/>
<!-- Silence particularly noisy packages -->
<logger name="org.apache.kafka" level="ERROR"/>
<logger name="org.springframework" level="WARN"/>
<logger name="org.eclipse.jetty" level="WARN"/>
<logger name="com.linkedin.restli" level="WARN"/>
<logger name="org.apache.http" level="WARN"/>
</configuration>

View File

@ -0,0 +1,26 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
</appender>
<!-- Set root to WARN to eliminate most third-party logs -->
<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
<!-- Only enable INFO for your application's packages -->
<logger name="com.linkedin.metadata" level="INFO"/>
<logger name="io.datahubproject" level="INFO"/>
<!-- Silence particularly noisy packages -->
<logger name="org.apache.kafka" level="ERROR"/>
<logger name="org.springframework" level="WARN"/>
<logger name="org.eclipse.jetty" level="WARN"/>
<logger name="com.linkedin.restli" level="WARN"/>
<logger name="org.apache.http" level="WARN"/>
</configuration>

View File

@ -52,6 +52,9 @@ public class DataHubKafkaProducerFactory {
props.put(
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
kafkaConfiguration.getProducer().getDeliveryTimeout());
props.put(
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
kafkaConfiguration.getProducer().getRequestTimeout());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBootstrapServers());
// key: Actor urn.
// value: JSON object.