kakfa 入门

2019-05-23  本文已影响0人  清雨季

一 安装及简单使用

kafka可以安装在windos mac linux上,以mac为例
mac 安装步骤:

$ brew install kafka
$ zkServer start
$ brew services start kafka

创建主题:

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

使用产生者发送消息:

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
TEST
TEST2
TEST3
TEST4

使用消息者接收消息:

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
TEST
TEST2
TEST3
TEST4

二 使用Java客户端

首先需要引入kafka的包:

    <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.2.6.RELEASE</version>
    </dependency>

2.1 生产者

简单示例
        Properties properties = new Properties();
        properties.put("bootstrap.servers", url);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, key, value);
        kafkaProducer.send(producerRecord);

kafkaProducer.send(producerRecord); 这个方法会同步发送消息,如果需要异步,可以使用下面的方法:

kafkaProducer.send(producerRecord, new Callback() {
       @Override
       public void onCompletion(RecordMetadata recordMetadata, Exception e) {
             //handler result......
       }
});

2.2 消费者

public class MKafkaConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "group.default");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties,
                new StringDeserializer(), new StringDeserializer());
        kafkaConsumer.subscribe(Collections.singletonList("test"));  // A
        try {
            while (true) { // B
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); // C
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.value());
                }
            }
        } finally {
            kafkaConsumer.close();
        }
    }
}

A处代码订阅主题,B处轮询,C处获取消息,最多阻塞100ms,100ms后没有消息会返回空。

三 生产者详解

相关配置

控制消息发送的顺序:需要把max.in.flight.requests.per.connection设置为1,即串行发送。

分区控制

kafka的消息分为健和值,值就是真正的消息内容。健可以作为消息的附加体,也会被用来分区,kafka的默认分区规则如下:

当健值为null时,会随机分配可用分区

当健值为不为null时,会对健进行散列,然后根据散列值来映射到所有分区上

由于是根据散列值来映射的,所以只有分区数量不变的情况下才能保证散列值相同的消息被分配到同一分区上。

以上说的是默认的分区器,我们可以自定义分区器,只需要实现Partitioner即可

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //计算分区并反回
        return 0;
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> map) {
    }
}

四 消费者详解

4.1 消费者和消费群组

假如我们有一个kafka主题,生产者生产消息的速度是1000QPS,这时候我们只需要使用一台机器消费消息即可,这一台机器就是一个消费者。

但是如果生产者生产消息的速度是10 0000QPS,用一台机器肯定是不够的,我们可以用10台,每台消费其中1/10的消息,那这十台就组成了一个消费群组。上面的例子中我们指定了消费者所属的群组:

properties.put("group.id", "group.default");

这里举的是机器维度的例子,实际上,我们可以在同一个机器的同一个服务中启动多个消费者。

同一个消费群组的机器会分别消费主题中的不同分区,例如我们的主题有10个分区:

同一个主题可以有多个消费者群组,那么每一个消费者群组会消费主题中的所有消息。

4.2 分区分配策略

上面说过,群组中每一个消费者会消费特定的分区,分区与消费者的对应关系就由分区的分配策略决定。

在 Kafka 内部存在两种默认的分区分配策略:Range 和 RoundRobin。当以下事件发生时,Kafka 将会进行一次分区分配:

关于这两种分配策略,这里有篇文章写得不错:Kafka分区分配策略 这里简单总结一下:

4.4 消费者群组是如何协调的

上面说了分区的分配策略,那么这个策略由谁来执行,由谁来触发更新呢,这个就是消费组协调策略:

4.3 消费者相关配置

4.4 偏移量提交

偏移量的作用:用于记录消费者消费到的位置,用于避免重复消费和消息丢失。
消费者提交偏移量的途径:向_consumer_offset的特殊主题发送消息。

自动提交

如果enable.auto.commit被设置为true,则每过一定时间(通过auto.commit.interval.ms来设置,默认5s),会把poll()接收到的最大偏移量提交上去。

缺点:这种方式不是实时的,所有会出现重复消费的情况,减小auto.commit.interval.ms可以降低出现的概率,但是不能完全避免。

手动同步提交

可以手动调用commitSync()方法来手动提交,这个方法会把poll()方法返回的最新偏移量提交。
如果提交失败,则会一直重试,直到成功为止。

缺点:由于是同步的,所以会阻塞,降低吞吐量。

手动异步提交

可以手动调用commitAsync()方法来手动异步提交,同样会把最新的偏移量提交,但是不会失败重试,因为会有并发问题。

缺点:存在并发问题。

手动提交的偏移量默认是poll方法返回的新大值,但是也可以手动指定,加个参数即可。

4.5 消息回溯

seek(TopicPartition, Long)方法可以用于回溯消息,该方法指定从哪开始读取kafka的消费,下次poll时会从这个点开始读取。

4.6 再均衡监听器

kafka提供了再均衡监听器的机制,可以在rebalance时获取到通知,进行一些如关闭连接等操作。
再均衡监听器只需要实现ConsumerRebalanceListener。然后要subscribe时传入参数即可

        kafkaConsumer.subscribe(Collections.singletonList("test"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                //此方法会在再均衡开始之前调用
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                //此方法会在分区重新分配之后调用
            }
        });
上一篇下一篇

猜你喜欢

热点阅读