kafka札記

2018-07-31  本文已影响27人  文子轩

一.基本概念

二.Kafka架构及组件原理

image.png

如上图所示,一个典型的kafka集群中包含若干producer,若干broker,若干consumer group,以及一个Zookeeper)集群。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。

Topic在逻辑上可以被认为是一个queue。每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。


image.png
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class HashPartitioner implements Partitioner {

public HashPartitioner(VerifiableProperties verifiableProperties) {}

@Override
public int partition(Object key, int numPartitions) {
  if ((key instanceof Integer)) {
      return Math.abs(Integer.parseInt(key.toString())) % numPartitions;
  }
return Math.abs(key.hashCode() % numPartitions);
  }
}

定义Partition分区,利用哈希算法把key相同的数据分到同一个分区中

三.Kafka的HA

在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作则会造成整个系统的可用性降低。随着集群规模的增加

Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。
  Kafka分配Replica的算法如下:

将所有Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上
将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker

image.png

Kafka在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

四.Kafka的Consumer

Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中开始同时支持将offset存于Zookeeper,这个offset基于客户程序提供给Kafka的名字来保存,这个名字被称为Consumer Group。Consumer Group是整个Kafka集群全局的,每一个 Consumer实例都属于一个Consumer Group,若不指定则属于默认的Group

image.png

1.消息被消费后,并不会被删除,知识相应的offset加一

  1. 对于每条消息,在同一个Consumer Group里只会被一个Consumer消费

3.不通Consumer Group可消费同一条消息

五.KFKKA操作

step1:下載Kafka

tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1

step2:啓動服務

Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。

bin/zookeeper-server-start.sh config/zookeeper.properties &

step3:創建topic

创建一个叫做“test”的topic,它只有一个分区,一个副本。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

step4:發送消息

Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。

运行producer并在控制台中输一些消息,这些消息将被发送到服务端:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

step5啓動consumer

Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一个命令行consumer可以读取消息并输出到标准输出:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

step6:搭建一個或者多個broker集羣

刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上的:
首先为每个节点编写配置文件:

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

在拷貝出的新文件中添加以下參數

config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

把這兩個boker啓動

bin/kafka-server-start.sh config/server-1.properties &
...
bin/kafka-server-start.sh config/server-2.properties &

创建一个拥有3个副本的topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

现在我们搭建了一个集群,怎么知道每个节点的信息呢?运行“"describe topics”命令就可以了:
顯示分區和Topic情況

三.基於spark搭建的kakfa開發環境

root@master:~# nohup/usr/local/kafka_2.11-0.8.2.1/bin/kafka-server-start.sh /usr/local/kafka_2.11-

0.8.2.1/config/server.properties &

[1] 3359



root@worker1:~# nohup/usr/local/kafka_2.11-0.8.2.1/bin/kafka-server-start.sh/usr/local/kafka_2.11-0.8.2.1/config/server.properties &

[2] 2861



root@worker2:~# nohup/usr/local/kafka_2.11-0.8.2.1/bin/kafka-server-start.sh/usr/local/kafka_2.11-0.8.2.1/config/server.properties &

[1] 2820



使用Jps命令查看。

root@master:~# jps

3280 QuorumPeerMain

3412 Jps

3359 Kafka

root@worker1:~# jps

2861 Kafka
 
2910 Jps

2799 QuorumPeerMain

root@worker2:~# jps

2757 QuorumPeerMain

2853 Jps

root@master:/usr/local/kafka_2.11-0.8.2.1/bin#kafka-topics.sh --create --zookeeper

192.168.189.1:2181,192.168.189.2:2181,192.168.189.3:2181 --replication-factor 2 --partitions 4 --topickafka_test

查看topic

root@master:/usr/local/kafka_2.11-0.8.2.1/bin#kafka-topics.sh --list --zookeeper

/usr/local/spark-2.2.1-bin-hadoop2.6/bin/spark-submit  --master spark://192.168.189.1:7077 \

               --deploy-mode client \

               --driver-memory 1g \

                --driver-cores 1 \

               --total-executor-cores 3 \

               --executor-memory 1g \

               --jars /usr/local/kafka_2.11-0.8.2.1/libs/kafka-clients-0.8.2.1.jar \

              --class org.apache.spark.examples.streaming.KafkaWordCountProducer \
               /usr/local/streaming-examples-test/spark-examples_2.11-2.2.1.jar192.168.189.1:9092,192.168.189.2:9092,192.168.189.3:9092  \
                 kafka_test 20 10

在脚本start-producer.sh中需加上kafka-clients-0.8.2.1.jar的Jar包,否则会提示以下异常,找不到类KafkaProducer。

root@master:/usr/local/streaming-examples-test#start-producer.sh

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in[jar:file:/usr/local/alluxio-1.7.0-hadoop-2.6/client/alluxio-1.7.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/usr/local/spark-2.2.1-bin-hadoop2.6/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Seehttp://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type[org.slf4j.impl.Log4jLoggerFactory]

INFO producer.ProducerConfig:ProducerConfig values:

   compression.type = none

   metric.reporters = []

   metadata.max.age.ms = 300000

   metadata.fetch.timeout.ms = 60000

    acks = 1

    batch.size= 16384

   reconnect.backoff.ms = 10

   bootstrap.servers = [192.168.189.1:9092, 192.168.189.2:9092,192.168.189.3:9092]

   receive.buffer.bytes = 32768

   retry.backoff.ms = 100

   buffer.memory = 33554432

    timeout.ms = 30000

   key.serializer = classorg.apache.kafka.common.serialization.StringSerializer

    retries = 0

   max.request.size = 1048576

   block.on.buffer.full = true

   value.serializer = class org.apache.kafka.common.serialization.StringSerializer

   metrics.sample.window.ms = 30000

   send.buffer.bytes = 131072

   max.in.flight.requests.per.connection = 5

   metrics.num.samples = 2

    linger.ms =0

    client.id =

赋予start-consumer.sh脚本执行权限。

  root@master:/usr/local/streaming-examples-test#chmod u+x start-consumer.sh

该脚本对应的KafkaWordCountProducer类的使用方法:

“Usage: KafkaWordCount<zkQuorum> <group> <topics> <numThreads>”

INFO scheduler.DAGScheduler:ResultStage 440 (print at

KafkaWordCount.scala:61) finished in 0.046 s
 
INFO scheduler.DAGScheduler: Job 117finished: print at KafkaWordCount.scala:61, took 0.067648 s

-------------------------------------------

Time: 1519454092000 ms

-------------------------------------------

(4,1352)

(8,1327)

(6,1461)

(0,1451)

(2,1493)

(7,1365)

(5,1405)

(9,1398)

(3,1428)

(1,1520)

INFO scheduler.JobScheduler: Finishedjob streaming job 1519454092000 ms.0 from job set of time 1519454092000 ms

NFOscheduler.JobScheduler: Total delay: 7.238 s for time 1519454092000 ms(execution: 0.290 s)

上一篇下一篇

猜你喜欢

热点阅读