Kafka学习笔记
转自:https://www.cnblogs.com/qingyunzong/category/1212387.html
按照自己理解加粗了重点
一、前言,所谓消息队列
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。
有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。
点对点消息传递模式
在点对点消息系统中,消息持久化到一个队列中。
此时,将有一个或多个消费者消费队列中的数据。
但是一条消息只能被消费一次。
当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。
生产者发送一条消息到queue,只有一个消费者能收到。
该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:
发布-订阅消息传递模式
在发布-订阅消息系统中,消息被持久化到一个topic中。
与点对点消息系统不同的是,消费者可以订阅一个或多个topic,
消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,
数据被消费后不会立马删除。
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
二、消息队列的优点
-
解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。
消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。
这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 -
冗余(副本)
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。
许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,
需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。 -
扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,
只要另外增加处理过程即可。
不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。 -
灵活性&峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;
如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。
使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 -
可恢复性
系统的一部分组件失效时,不会影响到整个系统。
消息队列降低了进程间的耦合度,
所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 -
顺序保证
在大多使用场景下,数据处理的顺序都很重要。
大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
Kafka保证一个Partition内的消息的有序性。 -
缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。
例如,加载一张图片比应用过滤器花费更少的时间。
消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。
该缓冲有助于控制和优化数据流经过系统的速度。 -
异步通信
很多时候,用户不想也不需要立即处理消息。
消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。
想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
三、常用Message Queue对比
RabbitMQ
略
Redis
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。
虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。
对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。
测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。
实验表明:
入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;
出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
ZeroMQ
略
ActiveMQ
ActiveMQ是Apache下的一个子项目。
类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。
同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。
Kafka/Jafka
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。
具有以下特性:
- 快速持久化,可以在O(1)的系统开销下进行消息持久化;
- 高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;
- 完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;
- 支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。
Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
四、Kafka中的术语解释
在深入理解Kafka之前,先介绍一下Kafka中的术语。下图展示了Kafka的相关术语以及之间的关系:
上图中一个topic配置了3个partition。
Partition1有两个offset:0和1。
Partition2有4个offset。
Partition3有1个offset。
副本的id和副本所在的机器的id恰好相同。
如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。
集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。
Broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。
broker存储topic的数据。
- 如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
- 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
- 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,
但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
类似于数据库的表名
Partition
topic中的数据分割为一个或多个partition。
每个topic至少有一个partition。
(多个partition的情况下,一条消息存储在topic下的其中一个partition中)
每个partition中的数据使用多个segment文件存储。
partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。
如果topic有多个partition,消费数据时就不能保证数据的顺序。
在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
Producer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。
broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。
生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
Consumer Group
每个Consumer属于一个特定的Consumer Group
(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
Leader
,其中有且仅有一个作为Leader,
Leader是当前负责数据的读写的partition。
Follower
Follower跟随Leader,所有写请求都通过Leader路由,
数据变更会广播给所有Follower,Follower与Leader保持数据同步。
如果Leader失效,则从Follower中选举出一个新的Leader。
当Follower与Leader挂掉、卡住或者同步太慢,
leader会把这个follower从“in sync replicas”(ISR)列表中删除,
重新创建一个Follower。
总结
总结五、Kafka的架构
Kafka的架构如上图所示,
一个典型的Kafka集群中包含:
- 若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等)
- 若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高)
- 若干Consumer Group
- 一个Zookeeper集群
Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。
Producer使用push模式将消息发布到broker,
Consumer使用pull模式从broker订阅并消费消息。
六、Topics和Partition
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,
可以简单理解为必须指明把这条消息放进哪个queue里。
为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,
每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。
创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,
但是需要的资源也越多,同时也会导致更高的不可用性,
kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。
因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高
(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
解剖一个Topic
- 对于传统的message queue而言,一般会删除已经被消费的消息,
- 而Kafka集群会保留所有的消息,无论其被消费与否。
当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),
因此Kafka提供两种策略删除旧数据。
- 基于时间:例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据
- 基于Partition文件大小:在Partition文件超过1GB时删除旧数据
配置如下所示:
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,
所以这里删除过期文件与提高Kafka性能无关。
选择怎样的删除策略只与磁盘以及具体的需求有关。
另外,
Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。
这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。
当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。
因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,
也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,
因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
七、Producer消息路由
Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。
如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。
如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,
而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。
可以通过以下途径来指定Topic的默认Partition数量:
- (新建)在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定
- (新建)在创建Topic时通过参数指定
- (创建之后)同时也可以在Topic创建之后通过Kafka提供的工具修改
在发送一条消息时,可以指定这条消息的key,
Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。
Paritition机制可以通过指定Producer的paritition.class这一参数来指定,
该class必须实现kafka.producer.Partitioner接口。
八、Consumer Group
使用Consumer high level API时,
同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,
但多个Consumer Group可同时消费这一消息。
这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。
一个Topic可以对应多个Consumer Group。
如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。
要实现单播只要所有的Consumer在同一个Group里。
用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。
根据这一特性,
- 可以使用Storm这种实时流处理系统对消息进行实时在线处理,
- 同时使用Hadoop这种批处理系统进行离线处理,
- 还可以同时将数据实时备份到另一个数据中心,
只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。
九、Push vs. Pull
作为一个消息系统,Kafka遵循了传统的方式,
选择由Producer向broker push消息,
由Consumer从broker pull消息。
一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。
事实上,push模式和pull模式各有优劣。
- push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。
push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,
典型的表现就是拒绝服务以及网络拥塞。 - 而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适。
pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,
同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,
同时还能选择不同的提交方式从而实现不同的传输语义。
十、Kafka delivery guarantee
有这么几种可能的delivery guarantee:
At most once 消息可能会丢,但绝不会重复传输
At least one 消息绝不会丢,但可能会重复传输
Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。
但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,
那Producer就无法判断该条消息是否已经commit。
虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。
接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,可以选择commit,
该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。
该Consumer下一次再读该Partition时会从下一条开始读取。
如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。
当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。
如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。
但实际使用中应用程序并非在Consumer读取完数据就结束了,
而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
Kafka默认保证At least once,并且
允许通过设置Producer异步提交来实现At most once。而
Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。
十一、下载、安装与配置
- 下载地址:
http://kafka.apache.org/downloads.html
http://mirrors.hust.edu.cn/apache/ - 安装前提(安装ZK):
参考http://www.cnblogs.com/qingyunzong/p/8634335.html#_label4_0 - 安装(以版本kafka_2.11-0.8.2.0.tgz为例)
tar -zxvf kafka_2.11-0.8.2.0.tgz -C apps
cd apps/
ln -s kafka_2.11-0.8.2.0/ kafka
- 配置(部分配置为0.8之前的配置)
cd apps/kafka/config/
vi server.properties
//当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=0
//当前kafka对外提供服务的端口默认是9092
port=9092
//这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
host.name=hadoop1
//这个是borker进行网络处理的线程数
num.network.threads=3
//这个是borker进行I/O处理的线程数
num.io.threads=8
//发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400
//kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400
//这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=104857600
//消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,
//如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
log.dirs=/home/hadoop/log/kafka-logs
//默认的分区数,一个topic默认1个分区数
num.partitions=1
//每个数据目录用来日志恢复的线程数目
num.recovery.threads.per.data.dir=1
//默认消息的最大持久化时间,168小时,7天
log.retention.hours=168
//这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.segment.bytes=1073741824
//每隔300000毫秒去检查上面配置的log失效时间
log.retention.check.interval.ms=300000
//是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false
//设置zookeeper的连接端口
zookeeper.connect=192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181
//设置zookeeper的连接超时时间
zookeeper.connection.timeout.ms=6000
vi producer.properties
bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
#bootstrap.servers= 是0.9之后的配置参数,0.8及之前用metadata.broker.list=
vi consumer.properties
bootstrap.servers=192.168.1.4:9092,192.168.1.5:9092,192.168.1.6:9092
#bootstrap.servers= 是0.9之后的配置参数,0.8及之前用zookeeper.connect=
- 将kafka的安装包分发到其他节点
scp -r kafka_2.11-0.8.2.0/ hadoop2:$PWD
scp -r kafka_2.11-0.8.2.0/ hadoop3:$PWD
scp -r kafka_2.11-0.8.2.0/ hadoop4:$PWD
- 建软联
ln -s kafka_2.11-0.8.2.0/ kafka
- 修改环境变量
vi .bashrc
#Kafka
export KAFKA_HOME=/home/hadoop/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source ~/.bashrc
十二、启动
- 首先启动zookeeper集群
所有zookeeper节点都需要执行
zkServer.sh start
- 启动Kafka集群服务(所有,分别到各个集群(hadoop1~4)启动)
bin/kafka-server-start.sh config/server.properties
十三、创建的topic
- 创建
例:
通过kafka-topics.sh脚本来创建一个名为topic-test1并且副本数为3、分区数为3的topic
bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 3 --partitions 3 --topic topic-test1
- 查看topic副本信息
bin/kafka-topics.sh --describe --zookeeper hadoop1:2181 --topic topic-test1
- 查看已经创建的topic信息
bin/kafka-topics.sh --list --zookeeper hadoop1:2181
- 生产者发送消息
bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic topic-test1
hadoop1显示接收到消息
- 消费者消费消息
在hadoop2上消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --from-beginning --topic topic-test1