Kafka 使用ExecutorService 进行消费
2019-07-26 本文已影响0人
ZhangDHing
前言:
Apache Kafka 作为当下最常用消息中间件之一。给到我的需求是需要我们处理大量的消息(如果单线程处理过多消息会出现性能瓶颈)。
如何使用Java的ExecutorService框架来创建线程池处理大量消息?
1.创建一个可以从topic中poll()消息后传递到线程池以进行进一步处理。
2.创建工作线程,以执行每条消息的进一步处理。
1.topic消息传递到ThreadPoolExecutorService
/** kafka 消息处理*/
public class KafkaProcessor {
private final KafkaConsumer<String, String> myConsumer;
private ExecutorService executor;
private static final Properties KAFKA_PROPERTIES = new Properties();
//基础的kafka配置~
static {
KAFKA_PROPERTIES.put("bootstrap.servers", "localhost:9092");
KAFKA_PROPERTIES.put("group.id", "test-consumer-group");
KAFKA_PROPERTIES.put("enable.auto.commit", "true");
KAFKA_PROPERTIES.put("auto.commit.interval.ms", "1000");
KAFKA_PROPERTIES.put("session.timeout.ms", "30000");
KAFKA_PROPERTIES.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KAFKA_PROPERTIES.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public KafkaProcessor() {
this.myConsumer = new KafkaConsumer<>(KAFKA_PROPERTIES);//初始化配置
this.myConsumer.subscribe(Arrays.asList("test")); //订阅topic=test
}
public void init(int numberOfThreads) {
//创建一个线程池
/**
* public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
*corePoolSize : 核心线程数,一旦创建将不会再释放。如果创建的线程数还没有达到指定的核心线 程数量,将会继续创建新的核心线程,直到达到最大核心线程数后,核心线程数将不在增加;如果没有空闲的核心线程,同时又未达到最大线程数,则将继续创建非核心线程;如果核心线程数等于最大线程数,则当核心线程都处于激活状态时,任务将被挂起,等待空闲线程来执行。
*maximumPoolSize : 最大线程数,允许创建的最大线程数量。如果最大线程数等于核心线程数,则无法创建非核心线程;如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。
*keepAliveTime : 也就是当线程空闲时,所允许保存的最大时间,超过这个时间,线程将被释放销毁,但只针对于非核心线程。
*unit : 时间单位,TimeUnit.SECONDS等。
*workQueue : 任务队列,存储暂时无法执行的任务,等待空闲线程来执行任务。
*threadFactory : 线程工程,用于创建线程。
*handler : 当线程边界和队列容量已经达到最大时,用于处理阻塞时的程序
*/
executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,0L,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = myConsumer.poll(100);//每隔时间段进行消息拉取
for (final ConsumerRecord<String, String> record : records) {
executor.submit(new KafkaRecordHandler(record));
}
}
}
//别忘记线程池的关闭!
public void shutdown() {
if (myConsumer != null) {
myConsumer.close();
}
if (executor != null) {
executor.shutdown();
}
try {
if (executor != null && !executor.awaitTermination(60, TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
}
}catch (InterruptedException e) {
executor.shutdownNow();
}
}
}
image.gif
2.创建工作线程
// 创建消息线程进行处理
public class KafkaRecordHandler implements Runnable {
private ConsumerRecord<String, String> record;
public KafkaRecordHandler(ConsumerRecord<String, String> record) {
this.record = record;
}
@Override
public void run() {
//业务操作...
System.out.println("value = "+record.value());
System.out.println("Thread id = "+ Thread.currentThread().getId());
}
}
image.gif
3.Using ?
//消费测试
public class ConsumerTest {
public static void main(String[] args) {
KafkaProcessor processor = new KafkaProcessor();
try {
processor.init(5);//指定相应的线程数!
}catch (Exception exp) {
processor.shutdown();
}
}
}
image.gif
4.总结
可能并不适合所有方案,按需定制方案。