kafka消费者

2017-05-22  本文已影响429人  七海的游风

1.kafka消费组基本概念

kafka消费topic是以group为单位来的,一个group消费一个topic。一个group能容纳多个consumer。consumer消费是以分区(partition)来的,一个consumer可以消费一个或多个partition,一个partition只能被一个consumer消费。(如果一个consumer group中的consumer个数多于topic中的partition的个数,多出来的consumer会闲置(idle),所以如果为了增加消费者能力,只简单增加消费者数量不一定会有用).

消费与分区对应关系

消费者数量小于partition的数量
消费者数量小于partition的数量
消费者数量小于partition的数量

2. consumer group的分区再平衡

每个consumer负责自己对应的分区,但是当group中有consumer退出或者新加入consumer,再或者topic中新增partition,group中的消费者负责的partition都得重新计算,Rebalance 期间consumer不能再消费消息,做rebalance的时候是会影响整个consumer group。

consumer获知自己消费的分区以及group内其他成员信息都是通过向一个叫做Group Coordinator的broker发送心跳来的,不同的group的broker可能不同。只要consumer再给Coordinator发送心跳,就被认为是正常的。触发心跳是通过consumer客户端轮询处理消息来的。如果consumer长时间没有心跳group coordinator就会认为consumer已经挂了,触发rebalance,新版本的java api(kafka_2.11的0.10.2.0已经支持了)支持显示的关闭客户端,这样可以避免有group coordinator因为超时来触发rebalance有此导致消息积压。

  分区分配流程:
  1.第一个加入group的consumer是consumer的leader(这个consumer奔溃之后会怎么样暂时不清楚)
  2.新加入的consumer向group coordinator发送加入请求
  3.leader从group coordinator接收消费者列表,然后给每个consumer分配分区
  4.leader将重新分配的信息发送给group coordinator,group coordinator再将信息发送给所有的consumer

3.启动一个consumer

使用java api只需要配置 bootstrap.servers, key.deserializer, value.deserializer三个配置就可以。一般还要带上group.id,指定所属的消费组。

  Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "testTopicGroup1");
        new KafkaConsumer<String, String>(properties);

4.订阅topic

consumer.subscribe(Collections.singletonList("testTopic"));
可以指定多个topic,可以使用正则表达式:
consumer.subscribe("test.*");
demo:

 private volatile boolean shutdown = false;
    public void poll(){

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "120.27.8.221:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "testGroup");
        KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);

        //关闭轮询
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){
            @Override
            public void run() {
                shutdown = true;
            }
        }));

        try{
            while (!shutdown){
                //开始轮询消息,poll会找到group coordinator,加入consumer group,确认消费的分区,获取消息
                //poll会获取本地最大的offset之后的消息,而不是commit到kafka中的offset
                ConsumerRecords<String, String> records = consumer.poll(100);
                for(ConsumerRecord record:records){
                    System.out.println("topic = " + record.topic()
                      + ", partition = " + record.partition()
                      + ", offset = " + record.offset()
                      + ", customer = " + record.key()
                      + ", country = " + record.value());
                }
            }
        }finally {
            //及时关闭消费者
            consumer.close();
        }

    }

5.commit offset

不管什么时候调用poll方法都会获取到还未被消费过的消息,这个实现通过消息的offset来实现的,每个分区的offset的管理是通过consumer自己向一个特殊的topic(__consumer_offsets)提交消息来实现的.

1.自动提交

开启自动提交之后,在每次调用poll获取消息的时候会检查时间查看是否需要提交offset,如果已经到时间之后会提交offset,自动提交的好处是方便,劣势是不能灵活控制,如果间隔期间consumer奔溃,已经处理且未提交的消息会被处理两遍。
自动提交配置:
enable.auto.commit=true ##开启自动提交,默认5s提交一次
auto.commit.interval.ms=1000 ##设置自动提交时间间隔

上一篇下一篇

猜你喜欢

热点阅读