kafka基础知识

2019-07-24  本文已影响0人  西葫芦炒胖子

github地址:https://github.com/douzixiansheng/MQ/blob/master/kafka_basic.md

Kafka

Kafka是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力.(pub-sub模型)

维基百科

基于kafka-zookeeper 的分布式消息队列系统总体架构如下:

Kafka 架构说明

Kafka 四大核心

Kafka 基础概念

参数 描述
bootstrap.servers 该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查找到其他的broker的信息。(建议提供两个broker信息,一旦其中一个宕机,生产者仍然能够连接到集群上)
key.serializer broker 希望接受到的消息的键和值都是字节数组.生产者接口允许使用参数化类型,因此可以把java对象作为键和值发送给broker。
value.serializer value.serializer 指定的类会将值序列化。如果键和值都是字符串,可以使用key.serializer一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器.

生产者的配置

参数 描述
acks acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消费写入是成功的。acks = 0 生产者在成功写入消息之前不会等待任何来自服务器的响应。(缺点:无法确认消费是否成功;优点:高吞吐量);acks = 1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消费无法到达首领节点(比如首领节点奔溃,新的首领还没有被选举处理),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。acks = all 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
buffer.memory 该参数用来设置生产者缓冲区的大小,生产者用它缓冲要发送到服务器的消息。0.9.0.0 版本被替换成了 max.block.ms,表示在抛出异常之前可以阻塞一段时间
compression.type 默认情况下为none,消费发送时不会被压缩。该参数可以设置为snappy、gzip或lz4,它指定了消息被发送给broker之前使用哪一种压缩算法进行压缩。1. snappy 压缩算法有Google发明,它占用较少的CPU,却能提供较好的性能和相当可观的压缩比(比较关注性能和网路带宽) 2. gzip 压缩算法一般会占用较多的CPU,但会提供更高的压缩比(网络带宽有限次采用)
retries 生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这中情况下,retries参数是值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,可以通过retry.backoff.ms参数来改变这个时间间隔.
batch.size 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该采纳数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。1. 批次设置很大 不会造成延迟,只会占用更多的内存 2. 批次设置很小 因为生产者需要更频繁地发送消息,会增加一些额外的开销
linger.ms 该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。
client.id 该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里
request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间
max.block.ms 该参数指定了在调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常
max.request.size 该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,可以指单个请求里所有消息总的大小。

Consumer

Broker(代理)

分区(Partitions)

偏移量(offset)

副本

日志



进入到日志目录查看

kafka_tree.png



在kafka的文件存储中,同一个topic下有多个不同的Partition,每个partition都为一个目录,而每一个目录又被平均分配成多个大小相等的Segment File中,Segment File 包括一个日志数据文件和两个索引文件(偏移量索引文件和消息时间戳索引文件)。

Starting offset: 12
baseOffset: 12 lastOffset: 12 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1560145317620 size: 136 magic: 2 compresscodec: NONE crc: 692425981 isvalid: true
| offset: 12 CreateTime: 1560145317620 keysize: -1 valuesize: 66 sequence: -1 headerKeys: [] payload: {"path":"/matchingHandler.userCancel","query":{"uid":"200208305"}}
baseOffset: 13 lastOffset: 13 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 136 CreateTime: 1560237884593 size: 136 magic: 2 compresscodec: NONE crc: 2666499378 isvalid: true
| offset: 13 CreateTime: 1560237884593 keysize: -1 valuesize: 66 sequence: -1 headerKeys: [] payload: {"path":"/matchingHandler.userCancel","query":{"uid":"200208631"}}
baseOffset: 14 lastOffset: 14 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 272 CreateTime: 1560323094331 size: 136 magic: 2 compresscodec: NONE crc: 926590729 isvalid: true
| offset: 14 CreateTime: 1560323094331 keysize: -1 valuesize: 66 sequence: -1 headerKeys: [] payload: {"path":"/matchingHandler.userCancel","query":{"uid":"200208305"}}
baseOffset: 15 lastOffset: 15 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 408 CreateTime: 1560421275824 size: 136 magic: 2 compresscodec: NONE crc: 3081157511 isvalid: true
| offset: 15 CreateTime: 1560421275824 keysize: -1 valuesize: 66 sequence: -1 headerKeys: [] payload: {"path":"/matchingHandler.userCancel","query":{"uid":"200208300"}}
baseOffset: 16 lastOffset: 16 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 544 CreateTime: 1560426875868 size: 136 magic: 2 compresscodec: NONE crc: 101509911 isvalid: true

Segment文件命名规则:Partition全局的第一个Segment从0开始,后续每个Segment文件名为上一个Segment文件最后一条消息的offset值。数值最大为64位long型,19位数字字符长度,没有数字用0填充。

关键字 解释
offset 消息在partition中的绝对offset。能表示这是partition的第多少条消息
message message大小
CRC32 用crc32校验message
magic 表示本次发布kafka服务程序协议版本号
attributes 表示为独立版本、或标识压缩类型、或编码类型
key length 表示key的长度,当key为-1时,K byte key字段不填
key 可选
value bytes payload 实际消息数据

index 文件的存储方式

分段策略

属性名 含义 默认值
log.roll.{hours,ms} 日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment 168(7 day)
log.segment.bytes 每个segment的最大容量。到达指定容量时,将强制生成一个新的segment 1G(-1 为不限制)
log.retention.check.interval.ms 日志片段文件检查的周期时间 60000

日志刷新策略
Kafka的日志实际上是开始是在缓存中的,然后根据一定策略定期一批一批写入到日志文件中去,以提高吞吐量.

属性名 含义 默认值
log.flush.interval.messages 消息达到多少条时将数据写入到日志文件 10000
log.flush.interval.ms 当达到该时间时,强制执行一次flush null
log.flush.shceduler.interval.ms 周期性检查,是否需要将信息flush 暂时没有找到

集群成员关系

处理请求

参数 描述
Request type API key
Request version broker可以处理不同版本的客户端请求,并根据客户端版本做出不同的响应
Correlation ID 一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志里(用于诊断问题)
Client ID 用于标识发送请求的客户端

控制器

Kafka 消费过程分析

kafka复制原理

ISR(副本同步队列)

疑问

持续更新...

上一篇下一篇

猜你喜欢

热点阅读