Java技术升华计算机Kafka

kafka——生产者原理解析

2020-11-24  本文已影响0人  小波同学

一、为什么分区

kafka有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,也就是说kafka的消息组织方式实际上是三级结构: 主题---分区---消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中保存多份。官网上的这张图非常清晰地展示了kafka的三级结构,如下:

其实分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够放置到不同节点机器上,而数据库的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立的执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点记起来增加整体系统的吞吐量。

二、分区策略

分区的原因

分区的原则

我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
    this(topic, partition, timestamp, key, value, (Iterable)null);
}

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
    this(topic, partition, (Long)null, key, value, headers);
}

public ProducerRecord(String topic, Integer partition, K key, V value) {
    this(topic, partition, (Long)null, key, value, (Iterable)null);
}

public ProducerRecord(String topic, K key, V value) {
    this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}

public ProducerRecord(String topic, V value) {
    this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
}

RoundRobinPartitioner(轮询策略)源码:

public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {
    }
}

三、数据可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到
producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

3.1、副本数据同步策略

方案 优点 缺点
半数以上完成同步,就发送 ack 延迟低 选举新的 leader 时,容忍 n 台节点的故障,需要 2n+1 个副本
全部完成同步,才发送ack 选举新的 leader 时,容忍 n 台节点的故障,需要 n+1 个副本 延迟高

Kafka 选择了第二种方案,原因如下:

3.2、ISR

采用第二种方案之后,设想以下情景:leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack。这个问题怎么解决呢?

Leader 维护了一个动态的in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间 未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。

3.3、ack 应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,
所以没必要等 ISR 中的 follower 全部接收成功。

所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,
选择以下的配置。

3.4、故障处理细节

LEO——Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset。
HW——High Watermark::指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

四、Exactly Once 语义

在分布式消息传递一致性语义上有下面三种:

在kafka0.11版本之前是无法在kafka内实现exactly once 精确一次性保证的语义的,在0.11之后的版本,我们可以结合新特性 幂等性以及acks=-1 来实现kafka生产者的exactly once。

在0.11版本之前要实现exactly once语义只能通过外部系统如hbase的rowkey实现基于主键的去重。

五、幂等性解读:

在生产者配置文件 producer.properties 设置参数 enable.idompotence = true 即可启用幂等性。

Kafka的幂等性其实就是将原来需要在下游进行的去重操作放在了数据上游。开启幂等性的生产者在初始化时会被分配一个PID(producer ID),该生产者发往同一个分区(Partition)的消息会附带一个序列号(Sequence Number),Broker 端会对<PID, Partition, SeqNumber>作为该消息的主键进行缓存,当有相同主键的消息提交时,Broker 只会持久化一条。但是生产者重启时PID 就会发生变化,同时不同的 分区(Partition)也具有不同的编号,所以生产者幂等性无法保证跨分区和跨会话的 Exactly Once。

事务:kafka在0.11 版本引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

六、生产者事务:

Kafka引入了一个新的组件Transaction Coordinator,它管理了一个全局唯一的事务ID(Transaction ID),并将生产者的PID和事务ID进行绑定,当生产者重启时虽然PID会变,但仍然可以和Transaction Coordinator交互,通过事务ID可以找回原来的PID,这样就保证了重启后的生产者也能保证Exactly Once 了。

同时,Transaction Coordinator 将事务信息写入 Kafka 的一个内部 Topic,即使整个kafka服务重启,由于事务状态已持久化到topic,进行中的事务状态也可以得到恢复,然后继续进行。

七、生产者客户端的基本架构图

由上图可以看出:KafkaProducer有两个基本线程:

写入流程

producer 写入消息序列图如下所示:


流程说明:

参考:
http://kafka.apache.org/0110/documentation.html

https://www.cnblogs.com/tugeboke/p/11760387.html

https://blog.csdn.net/wsdc0521/article/details/108604420

https://www.cnblogs.com/kcxg/p/12009524.html

https://blog.csdn.net/qq_37502106/article/details/80271800

https://www.cnblogs.com/sodawoods-blogs/p/8969513.html

上一篇下一篇

猜你喜欢

热点阅读