feat(kafka): use a thread pool executor for kafka for thread reuse (#5079)

This commit is contained in:
RyanHolstien 2022-06-06 10:10:07 -05:00 committed by GitHub
parent a1287a82e2
commit e06df1fcf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 0 deletions

View File

@ -86,6 +86,7 @@ public class KafkaEventConsumerFactory {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
factory.setConcurrency(this.kafkaListenerConcurrency);
log.info("Event-based KafkaListenerContainerFactory built successfully");

View File

@ -42,6 +42,7 @@ public class SimpleKafkaConsumerFactory {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties()));
log.info("Simple KafkaListenerContainerFactory built successfully");

View File

@ -0,0 +1,20 @@
package com.linkedin.gms.factory.kafka;
import org.apache.avro.generic.GenericRecord;
import org.springframework.kafka.config.ContainerCustomizer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class ThreadPoolContainerCustomizer
implements ContainerCustomizer<String, GenericRecord, ConcurrentMessageListenerContainer<String, GenericRecord>> {
@Override
public void configure(ConcurrentMessageListenerContainer<String, GenericRecord> container) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// Default Queue Capacity is set to max, so we want to allow the thread pool to add concurrent threads up to configured value
threadPoolTaskExecutor.setCorePoolSize(container.getConcurrency());
threadPoolTaskExecutor.setMaxPoolSize(container.getConcurrency());
threadPoolTaskExecutor.initialize();
container.getContainerProperties().setConsumerTaskExecutor(threadPoolTaskExecutor);
}
}