工作学习笔记

Kafka学习笔记

2019-06-14  本文已影响0人  Megahorn
转自:https://www.cnblogs.com/qingyunzong/category/1212387.html
按照自己理解加粗了重点

一、前言,所谓消息队列

一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。
有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。

点对点消息传递模式

在点对点消息系统中,消息持久化到一个队列中
此时,将有一个或多个消费者消费队列中的数据。
但是一条消息只能被消费一次
当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除
生产者发送一条消息到queue,只有一个消费者能收到
该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:

点对点消息传递模式

发布-订阅消息传递模式

在发布-订阅消息系统中,消息被持久化到一个topic中
与点对点消息系统不同的是,消费者可以订阅一个或多个topic,
消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费
数据被消费后不会立马删除
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息
在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:

发布-订阅消息传递模式

二、消息队列的优点


三、常用Message Queue对比

RabbitMQ

Redis

Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。
虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。
对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。
测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。
实验表明:
入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;
出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

ZeroMQ

ActiveMQ

ActiveMQ是Apache下的一个子项目。
类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。
同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。

Kafka/Jafka

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。
具有以下特性:

Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。


四、Kafka中的术语解释

在深入理解Kafka之前,先介绍一下Kafka中的术语。下图展示了Kafka的相关术语以及之间的关系:

Kafka的相关术语以及之间的关系
上图中一个topic配置了3个partition
Partition1有两个offset:0和1。
Partition2有4个offset。
Partition3有1个offset。
副本的id和副本所在的机器的id恰好相同。
如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。
集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。

Broker

Kafka 集群包含一个或多个服务器,服务器节点称为broker
broker存储topic的数据。

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,
但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
类似于数据库的表名

Partition

topic中的数据分割为一个或多个partition。
每个topic至少有一个partition。
(多个partition的情况下,一条消息存储在topic下的其中一个partition中)
每个partition中的数据使用多个segment文件存储。
partition中的数据是有序的,不同partition间的数据丢失了数据的顺序
如果topic有多个partition,消费数据时就不能保证数据的顺序。
在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1

Producer

生产者即数据的发布者,该角色将消息发布到Kafka的topic中。
broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。
生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition

Consumer

消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

Consumer Group

每个Consumer属于一个特定的Consumer Group
(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

Leader

\color{red}{多个partition的情况下,一条消息存储在topic下的其中一个partition中。}
\color{red}{每个partition有多个副本},其中有且仅有一个作为Leader,
Leader是当前负责数据的读写的partition。

Follower

Follower跟随Leader,所有写请求都通过Leader路由,
数据变更会广播给所有Follower,Follower与Leader保持数据同步
如果Leader失效,则从Follower中选举出一个新的Leader
当Follower与Leader挂掉、卡住或者同步太慢,
leader会把这个follower从“in sync replicas”(ISR)列表中删除,
重新创建一个Follower。

总结

总结

五、Kafka的架构

Kafka的架构

如上图所示,
一个典型的Kafka集群中包含:

Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。
Producer使用push模式将消息发布到broker,
Consumer使用pull模式从broker订阅并消费消息。


六、Topics和Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,
可以简单理解为必须指明把这条消息放进哪个queue里。
为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,
每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。
创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,
但是需要的资源也越多,同时也会导致更高的不可用性,
kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。
因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高
(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。


解剖一个Topic

当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),
因此Kafka提供两种策略删除旧数据。

配置如下所示:

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,
所以这里删除过期文件与提高Kafka性能无关。
选择怎样的删除策略只与磁盘以及具体的需求有关。
另外,
Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset
这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。
当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。

因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过
也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,
因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。


七、Producer消息路由

Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition
如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。
如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,
而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。
可以通过以下途径来指定Topic的默认Partition数量:

在发送一条消息时,可以指定这条消息的key,
Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。
Paritition机制可以通过指定Producer的paritition.class这一参数来指定,
该class必须实现kafka.producer.Partitioner接口。


八、Consumer Group

使用Consumer high level API时,
同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费
但多个Consumer Group可同时消费这一消息

Consumer Group
这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。
一个Topic可以对应多个Consumer Group。
如果需要实现广播,只要每个Consumer有一个独立的Group就可以了
要实现单播只要所有的Consumer在同一个Group里
用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。

实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理
根据这一特性,

只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。


九、Push vs. Pull

作为一个消息系统,Kafka遵循了传统的方式,
选择由Producer向broker push消息,
由Consumer从broker pull消息。
一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。
事实上,push模式和pull模式各有优劣。

对于Kafka而言,pull模式更合适。
pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,
同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,
同时还能选择不同的提交方式从而实现不同的传输语义。


十、Kafka delivery guarantee

有这么几种可能的delivery guarantee:

At most once   消息可能会丢,但绝不会重复传输
At least one   消息绝不会丢,但可能会重复传输
Exactly once    每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。

当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。
但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,
那Producer就无法判断该条消息是否已经commit。
虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。

接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,可以选择commit,
该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。
该Consumer下一次再读该Partition时会从下一条开始读取。
如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。
当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。
如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。
但实际使用中应用程序并非在Consumer读取完数据就结束了,
而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。

Kafka默认保证At least once,并且
允许通过设置Producer异步提交来实现At most once。而
Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式


十一、下载、安装与配置

tar -zxvf kafka_2.11-0.8.2.0.tgz -C apps
cd apps/
ln -s kafka_2.11-0.8.2.0/ kafka
cd apps/kafka/config/
vi server.properties
//当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=0
//当前kafka对外提供服务的端口默认是9092
port=9092
//这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
host.name=hadoop1
//这个是borker进行网络处理的线程数
num.network.threads=3
//这个是borker进行I/O处理的线程数
num.io.threads=8
//发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400
//kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400
//这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=104857600
//消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,
//如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
log.dirs=/home/hadoop/log/kafka-logs
//默认的分区数,一个topic默认1个分区数
num.partitions=1
//每个数据目录用来日志恢复的线程数目
num.recovery.threads.per.data.dir=1
//默认消息的最大持久化时间,168小时,7天
log.retention.hours=168
//这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.segment.bytes=1073741824
//每隔300000毫秒去检查上面配置的log失效时间
log.retention.check.interval.ms=300000
//是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false
//设置zookeeper的连接端口
zookeeper.connect=192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181
//设置zookeeper的连接超时时间
zookeeper.connection.timeout.ms=6000
vi producer.properties
bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
#bootstrap.servers= 是0.9之后的配置参数,0.8及之前用metadata.broker.list=
vi consumer.properties
bootstrap.servers=192.168.1.4:9092,192.168.1.5:9092,192.168.1.6:9092
#bootstrap.servers= 是0.9之后的配置参数,0.8及之前用zookeeper.connect=
scp -r kafka_2.11-0.8.2.0/ hadoop2:$PWD
scp -r kafka_2.11-0.8.2.0/ hadoop3:$PWD
scp -r kafka_2.11-0.8.2.0/ hadoop4:$PWD
ln -s kafka_2.11-0.8.2.0/ kafka
vi .bashrc 
#Kafka
export KAFKA_HOME=/home/hadoop/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source ~/.bashrc

十二、启动

zkServer.sh start
bin/kafka-server-start.sh config/server.properties

十三、创建的topic

bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 3 --partitions 3 --topic topic-test1
bin/kafka-topics.sh --describe --zookeeper hadoop1:2181 --topic topic-test1
bin/kafka-topics.sh --list --zookeeper hadoop1:2181
bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic topic-test1

hadoop1显示接收到消息

bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --from-beginning --topic topic-test1
上一篇下一篇

猜你喜欢

热点阅读