diff --git a/build.gradle b/build.gradle index 9396ea658f..822e28affb 100644 --- a/build.gradle +++ b/build.gradle @@ -45,7 +45,7 @@ buildscript { ext.neo4jVersion = '5.20.0' ext.neo4jTestVersion = '5.20.0' ext.neo4jApocVersion = '5.20.0' - ext.testContainersVersion = '1.17.4' + ext.testContainersVersion = '1.21.1' ext.elasticsearchVersion = '2.11.1' // ES 7.10, Opensearch 1.x, 2.x ext.jacksonVersion = '2.18.4' ext.jettyVersion = '12.0.21' @@ -57,7 +57,7 @@ buildscript { ext.slf4jVersion = '1.7.36' ext.logbackClassic = '1.5.18' ext.hadoop3Version = '3.3.6' - ext.kafkaVersion = '7.9.1' + ext.kafkaVersion = '8.0.0' ext.hazelcastVersion = '5.3.6' ext.ebeanVersion = '15.5.2' ext.googleJavaFormatVersion = '1.18.1' @@ -295,7 +295,7 @@ project.ext.externalDependency = [ 'testContainersElasticsearch': 'org.testcontainers:elasticsearch:' + testContainersVersion, 'testContainersCassandra': 'org.testcontainers:cassandra:' + testContainersVersion, 'testContainersKafka': 'org.testcontainers:kafka:' + testContainersVersion, - 'testContainersOpenSearch': 'org.opensearch:opensearch-testcontainers:2.0.0', + 'testContainersOpenSearch': 'org.opensearch:opensearch-testcontainers:2.1.3', 'typesafeConfig':'com.typesafe:config:1.4.1', 'wiremock':'com.github.tomakehurst:wiremock:2.10.0', 'zookeeper': 'org.apache.zookeeper:zookeeper:3.8.4', diff --git a/datahub-frontend/app/client/KafkaTrackingProducer.java b/datahub-frontend/app/client/KafkaTrackingProducer.java index 76980e75c3..359e0db38e 100644 --- a/datahub-frontend/app/client/KafkaTrackingProducer.java +++ b/datahub-frontend/app/client/KafkaTrackingProducer.java @@ -78,6 +78,9 @@ public class KafkaTrackingProducer { props.put( ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.getString("analytics.kafka.delivery.timeout.ms")); + props.put( + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, + config.getString("analytics.kafka.request.timeout.ms")); props.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString("analytics.kafka.bootstrap.server")); @@ -151,6 +154,11 @@ public class KafkaTrackingProducer { props, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "analytics.kafka.sasl.client.callback.handler.class"); + setConfig( + config, + props, + "sasl.oauthbearer.token.endpoint.url", + "analytics.kafka.sasl.oauthbearer.token.endpoint.url"); } } diff --git a/datahub-frontend/conf/application.conf b/datahub-frontend/conf/application.conf index 356a48e11d..f17e6db691 100644 --- a/datahub-frontend/conf/application.conf +++ b/datahub-frontend/conf/application.conf @@ -237,6 +237,8 @@ analytics.tracking.topic = DataHubUsageEvent_v1 analytics.tracking.topic = ${?DATAHUB_TRACKING_TOPIC} analytics.kafka.delivery.timeout.ms = 30000 analytics.kafka.delivery.timeout.ms = ${?KAFKA_PROPERTIES_DELIVERY_TIMEOUT_MS} +analytics.kafka.request.timeout.ms = 3000 +analytics.kafka.request.timeout.ms = ${?KAFKA_PROPERTIES_REQUEST_TIMEOUT_MS} # Kafka Producer SSL Configs. All must be provided to enable SSL. analytics.kafka.security.protocol = ${?KAFKA_PROPERTIES_SECURITY_PROTOCOL} @@ -255,6 +257,7 @@ analytics.kafka.sasl.jaas.config = ${?KAFKA_PROPERTIES_SASL_JAAS_CONFIG} analytics.kafka.sasl.kerberos.service.name = ${?KAFKA_PROPERTIES_SASL_KERBEROS_SERVICE_NAME} analytics.kafka.sasl.login.callback.handler.class = ${?KAFKA_PROPERTIES_SASL_LOGIN_CALLBACK_HANDLER_CLASS} analytics.kafka.sasl.client.callback.handler.class = ${?KAFKA_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS} +analytics.kafka.sasl.oauthbearer.token.endpoint.url = ${?KAFKA_PROPERTIES_SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL} # Required Elastic Client Configuration analytics.elastic.host = ${?ELASTIC_CLIENT_HOST} diff --git a/docker/docker-compose-with-cassandra.yml b/docker/docker-compose-with-cassandra.yml index 0db086a6b0..0063e60755 100644 --- a/docker/docker-compose-with-cassandra.yml +++ b/docker/docker-compose-with-cassandra.yml @@ -148,7 +148,7 @@ services: - neo4jdata:/data schema-registry: hostname: schema-registry - image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081 env_file: schema-registry/env/docker.env @@ -163,7 +163,7 @@ services: condition: service_healthy broker: hostname: broker - image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - 29092:29092 - 9092:9092 @@ -181,7 +181,7 @@ services: - broker:/var/lib/kafka/data/ zookeeper: hostname: zookeeper - image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - 2181:2181 env_file: zookeeper/env/docker.env diff --git a/docker/docker-compose-without-neo4j.yml b/docker/docker-compose-without-neo4j.yml index 3d232b70f1..1fc195e2a7 100644 --- a/docker/docker-compose-without-neo4j.yml +++ b/docker/docker-compose-without-neo4j.yml @@ -126,7 +126,7 @@ services: - esdata:/usr/share/elasticsearch/data schema-registry: hostname: schema-registry - image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081 env_file: schema-registry/env/docker.env @@ -141,7 +141,7 @@ services: condition: service_healthy broker: hostname: broker - image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092 env_file: broker/env/docker.env @@ -158,7 +158,7 @@ services: - broker:/var/lib/kafka/data/ zookeeper: hostname: zookeeper - image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 env_file: zookeeper/env/docker.env diff --git a/docker/docker-compose.tools.yml b/docker/docker-compose.tools.yml index 10d97cf9ff..849d363bcd 100644 --- a/docker/docker-compose.tools.yml +++ b/docker/docker-compose.tools.yml @@ -2,7 +2,7 @@ --- services: kafka-rest-proxy: - image: confluentinc/cp-kafka-rest:7.9.1 + image: confluentinc/cp-kafka-rest:8.0.0 env_file: kafka-rest-proxy/env/docker.env hostname: kafka-rest-proxy ports: diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 47377843f9..68a9ce707b 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -146,7 +146,7 @@ services: - neo4jdata:/data schema-registry: hostname: schema-registry - image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081 env_file: schema-registry/env/docker.env @@ -161,7 +161,7 @@ services: condition: service_healthy broker: hostname: broker - image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092 env_file: broker/env/docker.env @@ -178,7 +178,7 @@ services: - broker:/var/lib/kafka/data/ zookeeper: hostname: zookeeper - image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 env_file: zookeeper/env/docker.env diff --git a/docker/profiles/docker-compose.prerequisites.yml b/docker/profiles/docker-compose.prerequisites.yml index 871c1d818b..026308cd3a 100644 --- a/docker/profiles/docker-compose.prerequisites.yml +++ b/docker/profiles/docker-compose.prerequisites.yml @@ -231,7 +231,7 @@ services: - neo4jdata:/data kafka-broker: hostname: broker - image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} command: - /bin/bash - -c diff --git a/docker/quickstart/docker-compose-m1.quickstart.yml b/docker/quickstart/docker-compose-m1.quickstart.yml index eef84da243..bb48c645f9 100644 --- a/docker/quickstart/docker-compose-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-m1.quickstart.yml @@ -24,7 +24,7 @@ services: test: nc -z broker $${DATAHUB_KAFKA_BROKER_PORT:-9092} timeout: 5s hostname: broker - image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092 volumes: @@ -274,7 +274,7 @@ services: test: nc -z schema-registry ${DATAHUB_SCHEMA_REGISTRY_PORT:-8081} timeout: 5s hostname: schema-registry - image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081 zookeeper: @@ -288,7 +288,7 @@ services: test: echo srvr | nc zookeeper $${DATAHUB_ZK_PORT:-2181} timeout: 5s hostname: zookeeper - image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: diff --git a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml index b439fc2501..ab5fe31504 100644 --- a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml @@ -24,7 +24,7 @@ services: test: nc -z broker $${DATAHUB_KAFKA_BROKER_PORT:-9092} timeout: 5s hostname: broker - image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092 volumes: @@ -248,7 +248,7 @@ services: test: nc -z schema-registry ${DATAHUB_SCHEMA_REGISTRY_PORT:-8081} timeout: 5s hostname: schema-registry - image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081 zookeeper: @@ -262,7 +262,7 @@ services: test: echo srvr | nc zookeeper $${DATAHUB_ZK_PORT:-2181} timeout: 5s hostname: zookeeper - image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: diff --git a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml index bb14009ba3..2a097048e9 100644 --- a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml @@ -24,7 +24,7 @@ services: test: nc -z broker $${DATAHUB_KAFKA_BROKER_PORT:-9092} timeout: 5s hostname: broker - image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092 volumes: @@ -248,7 +248,7 @@ services: test: nc -z schema-registry ${DATAHUB_SCHEMA_REGISTRY_PORT:-8081} timeout: 5s hostname: schema-registry - image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081 zookeeper: @@ -262,7 +262,7 @@ services: test: echo srvr | nc zookeeper $${DATAHUB_ZK_PORT:-2181} timeout: 5s hostname: zookeeper - image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index dbf512b379..a07feb981a 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -24,7 +24,7 @@ services: test: nc -z broker $${DATAHUB_KAFKA_BROKER_PORT:-9092} timeout: 5s hostname: broker - image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_KAFKA_IMAGE:-confluentinc/cp-kafka}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092 volumes: @@ -274,7 +274,7 @@ services: test: nc -z schema-registry ${DATAHUB_SCHEMA_REGISTRY_PORT:-8081} timeout: 5s hostname: schema-registry - image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_SCHEMA_REGISTRY_IMAGE:-confluentinc/cp-schema-registry}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081 zookeeper: @@ -288,7 +288,7 @@ services: test: echo srvr | nc zookeeper $${DATAHUB_ZK_PORT:-2181} timeout: 5s hostname: zookeeper - image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-7.9.1} + image: ${DATAHUB_CONFLUENT_ZOOKEEPER_IMAGE:-confluentinc/cp-zookeeper}:${DATAHUB_CONFLUENT_VERSION:-8.0.0} ports: - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java index 5161e6460b..34fa251e75 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java @@ -7,16 +7,13 @@ import com.linkedin.dataset.DatasetProperties; import datahub.client.MetadataWriteResponse; import datahub.client.kafka.containers.KafkaContainer; import datahub.client.kafka.containers.SchemaRegistryContainer; -import datahub.client.kafka.containers.ZookeeperContainer; import datahub.event.MetadataChangeProposalWrapper; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import java.io.IOException; -import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.stream.Stream; import org.apache.avro.Schema; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.KafkaAdminClient; @@ -33,7 +30,6 @@ public class KafkaEmitterTest { private static Network network; - private static ZookeeperContainer zookeeperContainer; private static KafkaContainer kafkaContainer; private static SchemaRegistryContainer schemaRegistryContainer; private static KafkaEmitterConfig config; @@ -43,20 +39,20 @@ public class KafkaEmitterTest { @BeforeClass public static void confluentSetup() throws Exception { network = Network.newNetwork(); - zookeeperContainer = new ZookeeperContainer().withNetwork(network); - kafkaContainer = - new KafkaContainer(zookeeperContainer.getInternalUrl()) - .withNetwork(network) - .dependsOn(zookeeperContainer); + + // Start Kafka with KRaft (no Zookeeper needed) + kafkaContainer = new KafkaContainer().withNetwork(network); + kafkaContainer.start(); + + // Schema Registry now only depends on Kafka schemaRegistryContainer = - new SchemaRegistryContainer( - zookeeperContainer.getInternalUrl(), kafkaContainer.getInternalBootstrapServers()) + new SchemaRegistryContainer(kafkaContainer.getInternalBootstrapServers()) .withNetwork(network) - .dependsOn(zookeeperContainer, kafkaContainer); + .dependsOn(kafkaContainer); schemaRegistryContainer.start(); - String bootstrap = createTopics(kafkaContainer.getBootstrapServers()); - createKafkaEmitter(bootstrap); + createTopics(kafkaContainer.getBootstrapServers()); + createKafkaEmitter(kafkaContainer.getBootstrapServers()); registerSchemaRegistryTypes(); } @@ -100,25 +96,18 @@ public class KafkaEmitterTest { schemaRegistryClient.register(mcpSchema.getFullName(), mcpSchema); } - private static String createTopics(Stream bootstraps) { + private static void createTopics(String bootstrap) + throws ExecutionException, InterruptedException { short replicationFactor = 1; int partitions = 1; - return bootstraps - .parallel() - .map( - bootstrap -> { - try { - createAdminClient(bootstrap) - .createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))) - .all() - .get(); - return bootstrap; - } catch (RuntimeException | InterruptedException | ExecutionException ex) { - return null; - } - }) - .filter(Objects::nonNull) - .findFirst() + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + AdminClient adminClient = KafkaAdminClient.create(props); + + adminClient + .createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))) + .all() .get(); } diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/KafkaContainer.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/KafkaContainer.java index 86ced5ce62..debc028877 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/KafkaContainer.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/KafkaContainer.java @@ -2,130 +2,92 @@ package datahub.client.kafka.containers; import static datahub.client.kafka.containers.Utils.CONFLUENT_PLATFORM_VERSION; -import com.github.dockerjava.api.command.InspectContainerResponse; -import java.nio.charset.StandardCharsets; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; -import org.testcontainers.images.builder.Transferable; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.TestcontainersConfiguration; -/** This container wraps Confluent Kafka. */ +/** This container wraps Confluent Kafka using KRaft mode (no Zookeeper required). */ public class KafkaContainer extends GenericContainer { - private static final String STARTER_SCRIPT = "/testcontainers_start.sh"; - private static final int KAFKA_INTERNAL_PORT = 9092; - private static final int KAFKA_LOCAL_PORT = 9093; - + private static final int KAFKA_EXTERNAL_PORT = 9093; + private static final int KAFKA_CONTROLLER_PORT = 29093; public static final int KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT = 29092; - private static final int PORT_NOT_ASSIGNED = -1; - - private String zookeeperConnect = null; - - private int port = PORT_NOT_ASSIGNED; - private final String networkAlias = "kafka"; - public KafkaContainer(String zookeeperConnect) { - this(CONFLUENT_PLATFORM_VERSION, zookeeperConnect); + public KafkaContainer() { + this(CONFLUENT_PLATFORM_VERSION); } - public KafkaContainer(String confluentPlatformVersion, String zookeeperConnect) { + public KafkaContainer(String confluentPlatformVersion) { super(getKafkaContainerImage(confluentPlatformVersion)); - this.zookeeperConnect = zookeeperConnect; - withExposedPorts(KAFKA_INTERNAL_PORT, KAFKA_LOCAL_PORT); + // Fix the external port to 9093 to match the advertised listener + // Only expose the external port, not the internal port to avoid conflicts + addFixedExposedPort(KAFKA_EXTERNAL_PORT, KAFKA_EXTERNAL_PORT); - // Use two listeners with different names, it will force Kafka to communicate - // with itself via internal - // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will - // try to use the advertised listener + // KRaft configuration + String clusterId = java.util.UUID.randomUUID().toString(); + withEnv("KAFKA_NODE_ID", "1"); + withEnv("KAFKA_PROCESS_ROLES", "broker,controller"); + withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@" + networkAlias + ":" + KAFKA_CONTROLLER_PORT); + withEnv("CLUSTER_ID", clusterId); + withEnv("KAFKA_CLUSTER_ID", clusterId); + + // Configure listeners withEnv( "KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT + + ",CONTROLLER://0.0.0.0:" + + KAFKA_CONTROLLER_PORT + ",BROKER://0.0.0.0:" + KAFKA_INTERNAL_PORT - + ",BROKER_LOCAL://0.0.0.0:" - + KAFKA_LOCAL_PORT); + + ",BROKER_EXTERNAL://0.0.0.0:" + + KAFKA_EXTERNAL_PORT); + withEnv( "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", - "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,BROKER_LOCAL:PLAINTEXT"); + "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT,BROKER_EXTERNAL:PLAINTEXT"); + + // Advertised listeners - external port must match the container port + withEnv( + "KAFKA_ADVERTISED_LISTENERS", + String.format( + "PLAINTEXT://%s:%d,BROKER://%s:%d,BROKER_EXTERNAL://localhost:%d", + networkAlias, + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT, + networkAlias, + KAFKA_INTERNAL_PORT, + KAFKA_EXTERNAL_PORT // This must be 9093, not a mapped port + )); + + withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER"); withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + // Common Kafka settings withEnv("KAFKA_BROKER_ID", "1"); withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"); withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1"); + withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1"); + withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1"); withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); + withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"); withNetworkAliases(networkAlias); - waitingFor(new HostPortWaitStrategy()); + + // Wait for Kafka to be ready by checking the external port + waitingFor(Wait.forListeningPort()); } - public Stream getBootstrapServers() { - if (port == PORT_NOT_ASSIGNED) { - throw new IllegalStateException("You should start Kafka container first"); - } - return Stream.of( - String.format("PLAINTEXT://%s:%s", getHost(), port), - String.format("PLAINTEXT://localhost:%s", getMappedPort(KAFKA_LOCAL_PORT))); + public String getBootstrapServers() { + return "localhost:" + KAFKA_EXTERNAL_PORT; } public String getInternalBootstrapServers() { - return String.format("PLAINTEXT://%s:%s", networkAlias, KAFKA_INTERNAL_PORT); - } - - @Override - protected void doStart() { - withCommand( - "sh", - "-c", - "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + "sh " + STARTER_SCRIPT); - - super.doStart(); - } - - @Override - protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { - super.containerIsStarting(containerInfo, reused); - - port = getMappedPort(KAFKA_INTERNAL_PORT); - - if (reused) { - return; - } - - // Use two listeners with different names, it will force Kafka to communicate - // with itself via internal - // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will - // try to use the advertised listener - - String command = "#!/bin/bash \n"; - command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n"; - command += - "export KAFKA_ADVERTISED_LISTENERS='" - + Stream.concat( - Stream.of( - "PLAINTEXT://" - + networkAlias - + ":" - + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT, - "BROKER_LOCAL://localhost:" + getMappedPort(KAFKA_LOCAL_PORT)), - containerInfo.getNetworkSettings().getNetworks().values().stream() - .map(it -> "BROKER://" + it.getIpAddress() + ":" + KAFKA_INTERNAL_PORT)) - .collect(Collectors.joining(",")) - + "'\n"; - - command += ". /etc/confluent/docker/bash-config \n"; - command += "/etc/confluent/docker/configure \n"; - command += "/etc/confluent/docker/launch \n"; - - copyFileToContainer( - Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), STARTER_SCRIPT); + return networkAlias + ":" + KAFKA_INTERNAL_PORT; } private static String getKafkaContainerImage(String confluentPlatformVersion) { diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/SchemaRegistryContainer.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/SchemaRegistryContainer.java index 5c0223e580..65f70c715c 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/SchemaRegistryContainer.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/SchemaRegistryContainer.java @@ -1,44 +1,37 @@ package datahub.client.kafka.containers; import static datahub.client.kafka.containers.Utils.CONFLUENT_PLATFORM_VERSION; -import static java.lang.String.format; -import java.io.IOException; -import java.time.Duration; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.TestcontainersConfiguration; +/** This container wraps Confluent Schema Registry. */ public class SchemaRegistryContainer extends GenericContainer { + private static final int SCHEMA_REGISTRY_INTERNAL_PORT = 8081; private final String networkAlias = "schema-registry"; - public SchemaRegistryContainer(String zookeeperConnect, String kafkaBootstrap) - throws IOException { - this(CONFLUENT_PLATFORM_VERSION, zookeeperConnect, kafkaBootstrap); + public SchemaRegistryContainer(String kafkaBootstrapServers) { + this(CONFLUENT_PLATFORM_VERSION, kafkaBootstrapServers); } - public SchemaRegistryContainer( - String confluentPlatformVersion, String zookeeperConnect, String kafkaBootstrap) - throws IOException { + public SchemaRegistryContainer(String confluentPlatformVersion, String kafkaBootstrapServers) { super(getSchemaRegistryContainerImage(confluentPlatformVersion)); - addEnv("SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL", zookeeperConnect); - addEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost"); - addEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", kafkaBootstrap); - withExposedPorts(SCHEMA_REGISTRY_INTERNAL_PORT); withNetworkAliases(networkAlias); - waitingFor( - new HttpWaitStrategy().forPath("/subjects").withStartupTimeout(Duration.ofMinutes(2))); + withEnv("SCHEMA_REGISTRY_HOST_NAME", networkAlias); + withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + SCHEMA_REGISTRY_INTERNAL_PORT); + withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", kafkaBootstrapServers); + + waitingFor(Wait.forHttp("/subjects").forPort(SCHEMA_REGISTRY_INTERNAL_PORT)); } public String getUrl() { - return format( - "http://%s:%d", - this.getContainerIpAddress(), this.getMappedPort(SCHEMA_REGISTRY_INTERNAL_PORT)); + return String.format("http://%s:%d", getHost(), getMappedPort(SCHEMA_REGISTRY_INTERNAL_PORT)); } private static String getSchemaRegistryContainerImage(String confluentPlatformVersion) { diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/Utils.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/Utils.java index 885eb59a92..7f8bb4c074 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/Utils.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/Utils.java @@ -4,7 +4,7 @@ import java.io.IOException; import java.net.ServerSocket; final class Utils { - public static final String CONFLUENT_PLATFORM_VERSION = "7.9.1"; + public static final String CONFLUENT_PLATFORM_VERSION = "8.0.0"; private Utils() {} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/trace/KafkaTraceReader.java b/metadata-io/src/main/java/com/linkedin/metadata/trace/KafkaTraceReader.java index 92c5c0db10..07c4b65c9e 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/trace/KafkaTraceReader.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/trace/KafkaTraceReader.java @@ -10,6 +10,7 @@ import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; import io.datahubproject.openapi.v1.models.TraceStorageStatus; import io.datahubproject.openapi.v1.models.TraceWriteStatus; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -23,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -37,12 +39,13 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.Utils; @Slf4j @SuperBuilder @@ -242,11 +245,8 @@ public abstract class KafkaTraceReader { try { // Get all topic partitions first - Map topicInfo = - adminClient - .describeTopics(Collections.singletonList(getTopicName())) - .all() - .get(timeoutSeconds, TimeUnit.SECONDS); + Map> topicInfo = + adminClient.describeTopics(Collections.singletonList(getTopicName())).topicNameValues(); if (topicInfo == null || !topicInfo.containsKey(getTopicName())) { log.error("Failed to get topic information for topic: {}", getTopicName()); @@ -255,7 +255,7 @@ public abstract class KafkaTraceReader { // Create a list of all TopicPartitions List allPartitions = - topicInfo.get(getTopicName()).partitions().stream() + topicInfo.get(getTopicName()).get(timeoutSeconds, TimeUnit.SECONDS).partitions().stream() .map(partitionInfo -> new TopicPartition(getTopicName(), partitionInfo.partition())) .collect(Collectors.toList()); @@ -320,11 +320,8 @@ public abstract class KafkaTraceReader { public Map getEndOffsets(boolean skipCache) { try { // Get all topic partitions first (reuse the same approach as in getAllPartitionOffsets) - Map topicInfo = - adminClient - .describeTopics(Collections.singletonList(getTopicName())) - .all() - .get(timeoutSeconds, TimeUnit.SECONDS); + Map> topicInfo = + adminClient.describeTopics(Collections.singletonList(getTopicName())).topicNameValues(); if (topicInfo == null || !topicInfo.containsKey(getTopicName())) { log.error("Failed to get topic information for topic: {}", getTopicName()); @@ -333,7 +330,7 @@ public abstract class KafkaTraceReader { // Create a list of all TopicPartitions List allPartitions = - topicInfo.get(getTopicName()).partitions().stream() + topicInfo.get(getTopicName()).get(timeoutSeconds, TimeUnit.SECONDS).partitions().stream() .map(partitionInfo -> new TopicPartition(getTopicName(), partitionInfo.partition())) .collect(Collectors.toList()); @@ -628,14 +625,13 @@ public abstract class KafkaTraceReader { urn.toString(), key -> { try { - DefaultPartitioner partitioner = new DefaultPartitioner(); TopicDescription topicDescription = adminClient .describeTopics(Collections.singletonList(getTopicName())) - .all() - .get() - .get(getTopicName()); + .topicNameValues() + .get(getTopicName()) + .get(timeoutSeconds, TimeUnit.SECONDS); if (topicDescription == null) { throw new IllegalStateException("Topic " + getTopicName() + " not found"); @@ -664,11 +660,13 @@ public abstract class KafkaTraceReader { new Cluster( null, nodes, partitions, Collections.emptySet(), Collections.emptySet()); - int partition = - partitioner.partition(getTopicName(), key, key.getBytes(), null, null, cluster); + int partition = getPartitionForKey(key, cluster.partitionCountForTopic(getTopicName())); return new TopicPartition(getTopicName(), partition); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException + | ExecutionException + | RuntimeException + | TimeoutException e) { throw new RuntimeException("Failed to get topic partition for " + key, e); } }); @@ -684,4 +682,19 @@ public abstract class KafkaTraceReader { return consumer.offsetsForTimes(timestampsToSearch).get(topicPartition); } + + /** + * Calculate which partition a key would be assigned to. This replicates Kafka's default + * partitioning behavior. + */ + private static int getPartitionForKey(String key, int numPartitions) { + if (key == null) { + throw new IllegalArgumentException("Key cannot be null"); + } + + byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + + // Use murmur2 hash (same as Kafka default) + return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/trace/BaseKafkaTraceReaderTest.java b/metadata-io/src/test/java/com/linkedin/metadata/trace/BaseKafkaTraceReaderTest.java index 28a40a3071..05b4caa81f 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/trace/BaseKafkaTraceReaderTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/trace/BaseKafkaTraceReaderTest.java @@ -2,6 +2,7 @@ package com.linkedin.metadata.trace; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -27,8 +28,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.admin.AdminClient; @@ -87,9 +91,8 @@ public abstract class BaseKafkaTraceReaderTest { new TopicDescription(TOPIC_NAME, false, Collections.singletonList(partitionInfo)); DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); - when(mockDescribeTopicsResult.all()) - .thenReturn( - KafkaFuture.completedFuture(Collections.singletonMap(TOPIC_NAME, topicDescription))); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of(TOPIC_NAME, KafkaFuture.completedFuture(topicDescription))); when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); // Mock consumer group offset lookup @@ -257,9 +260,8 @@ public abstract class BaseKafkaTraceReaderTest { TopicDescription topicDescription = new TopicDescription(TOPIC_NAME, false, partitionInfos); DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); - when(mockDescribeTopicsResult.all()) - .thenReturn( - KafkaFuture.completedFuture(Collections.singletonMap(TOPIC_NAME, topicDescription))); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of(TOPIC_NAME, KafkaFuture.completedFuture(topicDescription))); when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); // Setup consumer group offsets for multiple partitions @@ -309,9 +311,8 @@ public abstract class BaseKafkaTraceReaderTest { new TopicDescription(TOPIC_NAME, false, Collections.singletonList(partitionInfo)); DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); - when(mockDescribeTopicsResult.all()) - .thenReturn( - KafkaFuture.completedFuture(Collections.singletonMap(TOPIC_NAME, topicDescription))); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of(TOPIC_NAME, KafkaFuture.completedFuture(topicDescription))); when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0); @@ -368,9 +369,8 @@ public abstract class BaseKafkaTraceReaderTest { new TopicDescription(TOPIC_NAME, false, Collections.singletonList(partitionInfo)); DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); - when(mockDescribeTopicsResult.all()) - .thenReturn( - KafkaFuture.completedFuture(Collections.singletonMap(TOPIC_NAME, topicDescription))); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of(TOPIC_NAME, KafkaFuture.completedFuture(topicDescription))); when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); // Setup consumer to return end offsets @@ -408,9 +408,8 @@ public abstract class BaseKafkaTraceReaderTest { new TopicDescription(TOPIC_NAME, false, Collections.singletonList(partitionInfo)); DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); - when(mockDescribeTopicsResult.all()) - .thenReturn( - KafkaFuture.completedFuture(Collections.singletonMap(TOPIC_NAME, topicDescription))); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of(TOPIC_NAME, KafkaFuture.completedFuture(topicDescription))); when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); // Setup consumer to return end offsets @@ -539,4 +538,126 @@ public abstract class BaseKafkaTraceReaderTest { // Verify that endOffsets was called twice verify(consumer, times(2)).endOffsets(anyCollection()); } + + @Test + public void testGetAllPartitionOffsets_FailedToGetTopicInfo() { + // Arrange + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Collections.emptyMap()); // Return empty map to simulate topic not found + when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Act + Map result = traceReader.getAllPartitionOffsets(false); + + // Assert + assertTrue(result.isEmpty()); + // The error should be logged but method returns empty map + } + + @Test + public void testGetEndOffsets_FailedToGetTopicInfo() { + // Arrange + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Collections.emptyMap()); // Return empty map to simulate topic not found + when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Act + Map result = traceReader.getEndOffsets(false); + + // Assert + assertTrue(result.isEmpty()); + // The error should be logged but method returns empty map + } + + @Test + public void testGetTopicPartition_TopicNotFound() throws Exception { + // Arrange + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + KafkaFuture nullFuture = KafkaFuture.completedFuture(null); + + when(mockDescribeTopicsResult.topicNameValues()).thenReturn(Map.of(TOPIC_NAME, nullFuture)); + when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Act - findMessages catches the exception and returns empty map + Map, SystemMetadata>>> result = + traceReader.findMessages( + Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME)), + TRACE_ID, + System.currentTimeMillis()); + + // Assert - the method should return an empty map for the URN that had an error + assertTrue(result.containsKey(TEST_URN)); + assertTrue(result.get(TEST_URN).isEmpty()); + } + + @Test + public void testGetTopicPartition_ExecutionException() throws Exception { + // Arrange + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + KafkaFuture failedFuture = mock(KafkaFuture.class); + when(failedFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new ExecutionException("Test exception", new RuntimeException())); + + when(mockDescribeTopicsResult.topicNameValues()).thenReturn(Map.of(TOPIC_NAME, failedFuture)); + when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Act - findMessages catches the exception and returns empty map + Map, SystemMetadata>>> result = + traceReader.findMessages( + Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME)), + TRACE_ID, + System.currentTimeMillis()); + + // Assert - the method should return an empty map for the URN that had an error + assertTrue(result.containsKey(TEST_URN)); + assertTrue(result.get(TEST_URN).isEmpty()); + } + + @Test + public void testGetTopicPartition_InterruptedException() throws Exception { + // Arrange + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + KafkaFuture failedFuture = mock(KafkaFuture.class); + when(failedFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new InterruptedException("Test interruption")); + + when(mockDescribeTopicsResult.topicNameValues()).thenReturn(Map.of(TOPIC_NAME, failedFuture)); + when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Act - findMessages catches the exception and returns empty map + Map, SystemMetadata>>> result = + traceReader.findMessages( + Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME)), + TRACE_ID, + System.currentTimeMillis()); + + // Assert - the method should return an empty map for the URN that had an error + assertTrue(result.containsKey(TEST_URN)); + assertTrue(result.get(TEST_URN).isEmpty()); + } + + @Test + public void testGetTopicPartition_TimeoutException() throws Exception { + // Arrange + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + KafkaFuture failedFuture = mock(KafkaFuture.class); + when(failedFuture.get(anyLong(), any(TimeUnit.class))) + .thenThrow(new TimeoutException("Test timeout")); + + when(mockDescribeTopicsResult.topicNameValues()).thenReturn(Map.of(TOPIC_NAME, failedFuture)); + when(adminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Act - findMessages catches the exception and returns empty map + Map, SystemMetadata>>> result = + traceReader.findMessages( + Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME)), + TRACE_ID, + System.currentTimeMillis()); + + // Assert - the method should return an empty map for the URN that had an error + assertTrue(result.containsKey(TEST_URN)); + assertTrue(result.get(TEST_URN).isEmpty()); + } } diff --git a/metadata-io/src/test/resources/logback-test.xml b/metadata-io/src/test/resources/logback-test.xml index b474d56dbf..215cb0b8d2 100644 --- a/metadata-io/src/test/resources/logback-test.xml +++ b/metadata-io/src/test/resources/logback-test.xml @@ -1,19 +1,35 @@ - - %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n + %date{ISO8601} [%thread] %-5level %logger{36} urn=%X{entityUrn:-none} aspect=%X{aspectName:-none} entityType=%X{entityType:-none} changeType=%X{changeType:-none} - %msg%n WARN + + scanned from multiple locations + [ignore_throttled] parameter is deprecated because frozen indices have been deprecated + - - + + + + + + + + + + + + + + + + - + - \ No newline at end of file diff --git a/metadata-jobs/mae-consumer-job/src/test/resources/logback-test.xml b/metadata-jobs/mae-consumer-job/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..4083c67be1 --- /dev/null +++ b/metadata-jobs/mae-consumer-job/src/test/resources/logback-test.xml @@ -0,0 +1,26 @@ + + + + %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + WARN + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/metadata-jobs/mce-consumer-job/src/test/resources/logback-test.xml b/metadata-jobs/mce-consumer-job/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..4083c67be1 --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/test/resources/logback-test.xml @@ -0,0 +1,26 @@ + + + + %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + WARN + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java index be9fbf2d92..aa93b2d15c 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java @@ -52,6 +52,9 @@ public class DataHubKafkaProducerFactory { props.put( ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaConfiguration.getProducer().getDeliveryTimeout()); + props.put( + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, + kafkaConfiguration.getProducer().getRequestTimeout()); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBootstrapServers()); // key: Actor urn. // value: JSON object.