kafka搭建和使用
2019-04-30 本文已影响0人
yushu_bd
一、安装启动
下载地址:https://www-us.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
解压后,放到/home/work/kafka目录下,大致目录如下:
bin config libs LICENSE NOTICE site-docs
启动zk
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
zk启动后,默认会监听2181端口
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
kakfa启动后,默认监听9092端口(很多中间件都有自己的端口)
image.png
二、参数详解
2.1 server参数
- advertised.listeners:发布到ZooKeeper以供客户端使用,如果未设置,则将使用“listeners”的值。
- listeners: 监听器列表 ,将主机名指定为0.0.0.0以绑定到所有接口。将hostname保留为空以绑定到默认接口。如:PLAINTEXT:// myhost:9092,TRACE://:9091 PLAINTEXT://0.0.0.0:9092,TRACE:// localhost:9093
- num.network.threads: 处理client请求的线程数
- num.io.threads:处理请求中io的线程数
- socket.request.max.bytes:请求数据长度最大值,默认是104857600(100M)
- log.retention.hours:未处理数据最长保留时间,默认168h(7天)
2.2 producer参数
- bootstrap.servers: kafka的server
- retries:提交参数
3.compression.type: data的压缩方式包括:none, gzip, snappy, lz4, zstd
2.3 consumer参数
- group.id: 消费分组的唯一标识字符串
- auto.offset.reset: 当无初始化offset或者offset不存在时,该怎么做?有三个值 [latest, earliest, none, anything else ],latest表示同步到最新的offset,earliest同步到最早的offset,none 如果之前的offset找不到,则抛异常,anything else 则直接抛异常
- enable.auto.commit: 是否自动提交offset
- auto.commit.interval.ms:自动提交offset的间隔(毫秒),设置间隔是个学问,设置不好会造成频繁rebalance
三、基础命令
创建topic
#创建test topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTopic
查看topic 列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
删除topics
bin/kafka-topics.sh --delete --topic myTopic --zookeeper localhost:2181
查看topic的消费延时情况
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic myTopic
生成者
bin/kafka-console-producer.sh --topic myTopics --broker-list localhost:9092
消费者
#--from-beginning 是消息offset=0开始,默认则是从当前offset开始;
#group 是用来标识消息消费分组的,当一个消息多个消费者消费时,每个消费者需要制定消费分组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --from-beginning --group default_group