分布式消息队列kafka


kafka三大特性
第一、发布和订阅
第二、实时的流处理
第三、安全地存储流数据在集群节点上
kafka的架构

First a few concepts:
Kafka is run as a cluster on one or more servers that can span multiple datacenters.
解析:kafka是一个以集群形式运行在一个或多个机器上,跨多数据中心的服务。
The Kafka cluster stores streams of records in categories called topics.
解析:kafa集群使用topics把流数据记录存储时做分类。
Each record consists of a key, a value, and a timestamp.
解析:每一条记录包含一个key,一个value,和一个时间戳

查看官方文档的QUICKSTART模块

Zookeeper的下载地址,统一从cdh5那个地址(http://archive.cloudera.com/cdh5/cdh/5/)下载。
解压zk
配置环境变量~/.bash_profile
配置zk的数据存储目录
下载kafka

1、单节点单broker的部署及使用
解压kafka
tar -zxvf kafka_2.11-2.2.0.tgz
设置kafka的根目录和启动bin目录配置在环境变量里面
PATH=$PATH:$HOME/bin
export KAFKA_HOME=/usr/local/kafka_2.11-2.1.1
export PATH=$KAFKA_HOME/bin:$PATH
配置环境变量当前登录用户的私有环境变量~/.bash_profile
配置kafka的配置文件server.properties
$KAFKA_HOME/config/server.properties
broker.id=0 //这个配置是broker的编号,且该编号唯一对应一个broker不能重复。
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured
listeners=PLAINTEXT://10.101.3.3:9092,如果不设置他会默认本机hostname
log.dirs //存储kafka的日志
zookeeper.connect //zk的地址
启动zookeeper
如果你已经有了zookeeper,那就跳过该步骤,如果没有那就直接使用kafka自己安装包里的zookeeper,启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka
后台方式: ./kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties &
会话方式:./kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
查看是否启动 jps 命令

查看kafka、zookeeper使用的配置文件 jps -m

创建一个topic:指定ZK
./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 1 --partitions 1 --topic mytopic
备注:1、创建topic的时候,需要使用--create命令 2、需要指定zk的地址--zookeeper 10.101.3.3:2181 3、指定副本系数--replication-factor 1 4、指定分区 --partitions 1 5、指定topic的名字--topic mytopic

查看所有已经创建的topic
bin/kafka-topics.sh --list --zookeeper 10.101.3.3:2181

生产消息:指定Broker(localhost最好是ip或者域名,不要使用localhost)
./kafka-console-producer.sh --broker-list 10.101.3.3:9092 --topic mytopic
备注:启动消息生产者,也就是启动一个broker,需要使用kafka-console-producer.sh命令指定broker的服务暴漏端口和对应的topic。如果你出现以下错误,就是因为ip或域名没有设置正确导致的

消费消息:指定生产者关联的Broker
./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9092 --topic mytopic --from-beginning

--from-beginning参数的解析
这个参数就是告诉消费者服务,要全部监听所有生产者的消息。如果不带有此参数,那么该消费服务只会接受启动时间之后监听的消息,之前的消息是不会监听的。
查看所有topic的设置详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181

查看某一个topic的 设置详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic这里只查看名叫test的topic的设置信息

关闭kafka的服务
bin/kafka-server-stop.sh
关闭前

关闭后

2、单节点多Broker部署及应用
首先看下图,是官网原图:意思就是把kafka的server.properties配置文件复制多份。

我们就按照官网说的来,我部署三个节点

需要把server-1.properties、server-1.properties、server-1.properties的broker.id和日志目录和listeners配置分别改一下
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
config/server-3.properties:
broker.id=3
listeners=PLAINTEXT://:9095
log.dirs=/tmp/kafka-logs-3


启动多broker(后台方式启动)
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

启动后,可以看到3各kafka的服务。

kafka服务对应的配置文件:jps -m可以查看到

创建一个topic
./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 3 --partitions 1 --topic my_m_topic
备注:这时候创建topic时,需要根据broker的节点数量,指定副本系数--replication-factor,这时候就应该为3了。
查看所有已创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看我们多副本topic的详情
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my_m_topic

其中Leader:代表主副本。Replicas:代表副本所在的broker机器编号1、2、3 Isr:代表存活的副本。
生产消息:指定Broker(localhost最好是ip或者域名,不要使用localhost),这里的--broker-list后面需要多个broker节点ip
./kafka-console-producer.sh --broker-list 10.101.3.3:9093,10.101.3.3:9094,10.101.3.3:9095 --topic my_m_topic

消费消息:指定生产者关联的zk
./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9093,10.101.3.3:9094,10.101.3.3:9095 --from-beginning --topic my_m_topic
删除topic
server.properties 设置 delete.topic.enable=true
如果没有设置 delete.topic.enable=true,则调用kafka 的delete命令无法真正将topic删除,而是显示(marked for deletion)
调用命令删除topic:
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic my_m3_topic
删除kafka存储目录(server.properties文件log.dirs配置,默认为"/data/kafka-logs")相关topic的数据目录。
recovery-point-offset-checkpoint和replication-offset-checkpoint中的1变为0,删除topic对应的名字
kafka的容错性
如果kafka的多个节点中,我们杀掉任何一个节点,都不会影响消息的传输。如果我们干掉leader节点,他内部会把其他副节点转正,成为leader,也不会影响消息传输。
