kafka札記
一.基本概念
- Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker - Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处) - Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件 - Porducer
负责发布消息到Kafka broker - Consumer
消费消息。每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
二.Kafka架构及组件原理
image.png如上图所示,一个典型的kafka集群中包含若干producer,若干broker,若干consumer group,以及一个Zookeeper)集群。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。
-
Push && Pull
Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息, -
Topic && Partition
image.png
Topic在逻辑上可以被认为是一个queue。每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。
image.png
- 自定义Partition
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class HashPartitioner implements Partitioner {
public HashPartitioner(VerifiableProperties verifiableProperties) {}
@Override
public int partition(Object key, int numPartitions) {
if ((key instanceof Integer)) {
return Math.abs(Integer.parseInt(key.toString())) % numPartitions;
}
return Math.abs(key.hashCode() % numPartitions);
}
}
定义Partition分区,利用哈希算法把key相同的数据分到同一个分区中
三.Kafka的HA
- 背景
在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作则会造成整个系统的可用性降低。随着集群规模的增加
- Kafka HA设计解析
Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。
Kafka分配Replica的算法如下:
将所有Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上
将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker
- Propagate消息
- 选举Leader
Kafka在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
四.Kafka的Consumer
- Consumer Group
Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中开始同时支持将offset存于Zookeeper,这个offset基于客户程序提供给Kafka的名字来保存,这个名字被称为Consumer Group。Consumer Group是整个Kafka集群全局的,每一个 Consumer实例都属于一个Consumer Group,若不指定则属于默认的Group
image.png1.消息被消费后,并不会被删除,知识相应的offset加一
- 对于每条消息,在同一个Consumer Group里只会被一个Consumer消费
3.不通Consumer Group可消费同一条消息
五.KFKKA操作
step1:下載Kafka
tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1
step2:啓動服務
Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。
bin/zookeeper-server-start.sh config/zookeeper.properties &
step3:創建topic
创建一个叫做“test”的topic,它只有一个分区,一个副本。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
step4:發送消息
Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。
运行producer并在控制台中输一些消息,这些消息将被发送到服务端:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
step5啓動consumer
Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一个命令行consumer可以读取消息并输出到标准输出:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
step6:搭建一個或者多個broker集羣
刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上的:
首先为每个节点编写配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
在拷貝出的新文件中添加以下參數
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
把這兩個boker啓動
bin/kafka-server-start.sh config/server-1.properties &
...
bin/kafka-server-start.sh config/server-2.properties &
创建一个拥有3个副本的topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
现在我们搭建了一个集群,怎么知道每个节点的信息呢?运行“"describe topics”命令就可以了:
顯示分區和Topic情況
三.基於spark搭建的kakfa開發環境
-
版本信息
Spark Streaming 2.2.1与 Kafka 0.8.2.1或更高版本兼容。 -
操作sparkstreming
分别在Master、Worker1、Worker2节点启动Kafka集群。
root@master:~# nohup/usr/local/kafka_2.11-0.8.2.1/bin/kafka-server-start.sh /usr/local/kafka_2.11-
0.8.2.1/config/server.properties &
[1] 3359
root@worker1:~# nohup/usr/local/kafka_2.11-0.8.2.1/bin/kafka-server-start.sh/usr/local/kafka_2.11-0.8.2.1/config/server.properties &
[2] 2861
root@worker2:~# nohup/usr/local/kafka_2.11-0.8.2.1/bin/kafka-server-start.sh/usr/local/kafka_2.11-0.8.2.1/config/server.properties &
[1] 2820
使用Jps命令查看。
root@master:~# jps
3280 QuorumPeerMain
3412 Jps
3359 Kafka
root@worker1:~# jps
2861 Kafka
2910 Jps
2799 QuorumPeerMain
root@worker2:~# jps
2757 QuorumPeerMain
2853 Jps
- 創建topic
root@master:/usr/local/kafka_2.11-0.8.2.1/bin#kafka-topics.sh --create --zookeeper
192.168.189.1:2181,192.168.189.2:2181,192.168.189.3:2181 --replication-factor 2 --partitions 4 --topickafka_test
查看topic
root@master:/usr/local/kafka_2.11-0.8.2.1/bin#kafka-topics.sh --list --zookeeper
- 操作sparksteaming
/usr/local/spark-2.2.1-bin-hadoop2.6/bin/spark-submit --master spark://192.168.189.1:7077 \
--deploy-mode client \
--driver-memory 1g \
--driver-cores 1 \
--total-executor-cores 3 \
--executor-memory 1g \
--jars /usr/local/kafka_2.11-0.8.2.1/libs/kafka-clients-0.8.2.1.jar \
--class org.apache.spark.examples.streaming.KafkaWordCountProducer \
/usr/local/streaming-examples-test/spark-examples_2.11-2.2.1.jar192.168.189.1:9092,192.168.189.2:9092,192.168.189.3:9092 \
kafka_test 20 10
在脚本start-producer.sh中需加上kafka-clients-0.8.2.1.jar的Jar包,否则会提示以下异常,找不到类KafkaProducer。
- 启动start-producer.sh脚本,生产者向Kafka集群发送消息
root@master:/usr/local/streaming-examples-test#start-producer.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in[jar:file:/usr/local/alluxio-1.7.0-hadoop-2.6/client/alluxio-1.7.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/spark-2.2.1-bin-hadoop2.6/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Seehttp://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type[org.slf4j.impl.Log4jLoggerFactory]
INFO producer.ProducerConfig:ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
acks = 1
batch.size= 16384
reconnect.backoff.ms = 10
bootstrap.servers = [192.168.189.1:9092, 192.168.189.2:9092,192.168.189.3:9092]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = classorg.apache.kafka.common.serialization.StringSerializer
retries = 0
max.request.size = 1048576
block.on.buffer.full = true
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
metrics.sample.window.ms = 30000
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
linger.ms =0
client.id =
赋予start-consumer.sh脚本执行权限。
root@master:/usr/local/streaming-examples-test#chmod u+x start-consumer.sh
该脚本对应的KafkaWordCountProducer类的使用方法:
“Usage: KafkaWordCount<zkQuorum> <group> <topics> <numThreads>”
INFO scheduler.DAGScheduler:ResultStage 440 (print at
KafkaWordCount.scala:61) finished in 0.046 s
INFO scheduler.DAGScheduler: Job 117finished: print at KafkaWordCount.scala:61, took 0.067648 s
-------------------------------------------
Time: 1519454092000 ms
-------------------------------------------
(4,1352)
(8,1327)
(6,1461)
(0,1451)
(2,1493)
(7,1365)
(5,1405)
(9,1398)
(3,1428)
(1,1520)
INFO scheduler.JobScheduler: Finishedjob streaming job 1519454092000 ms.0 from job set of time 1519454092000 ms
NFOscheduler.JobScheduler: Total delay: 7.238 s for time 1519454092000 ms(execution: 0.290 s)