kafka搭建和使用

2019-04-30  本文已影响0人  yushu_bd

一、安装启动

下载地址:https://www-us.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
解压后,放到/home/work/kafka目录下,大致目录如下:

bin  config  libs  LICENSE  NOTICE  site-docs

启动zk

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

zk启动后,默认会监听2181端口

image.png
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties

kakfa启动后,默认监听9092端口(很多中间件都有自己的端口)


image.png

二、参数详解

2.1 server参数

  1. advertised.listeners:发布到ZooKeeper以供客户端使用,如果未设置,则将使用“listeners”的值。
  2. listeners: 监听器列表 ,将主机名指定为0.0.0.0以绑定到所有接口。将hostname保留为空以绑定到默认接口。如:PLAINTEXT:// myhost:9092,TRACE://:9091 PLAINTEXT://0.0.0.0:9092,TRACE:// localhost:9093
  3. num.network.threads: 处理client请求的线程数
  4. num.io.threads:处理请求中io的线程数
  5. socket.request.max.bytes:请求数据长度最大值,默认是104857600(100M)
  6. log.retention.hours:未处理数据最长保留时间,默认168h(7天)

2.2 producer参数

  1. bootstrap.servers: kafka的server
  2. retries:提交参数
    3.compression.type: data的压缩方式包括:none, gzip, snappy, lz4, zstd

2.3 consumer参数

  1. group.id: 消费分组的唯一标识字符串
  2. auto.offset.reset: 当无初始化offset或者offset不存在时,该怎么做?有三个值 [latest, earliest, none, anything else ],latest表示同步到最新的offset,earliest同步到最早的offset,none 如果之前的offset找不到,则抛异常,anything else 则直接抛异常
  3. enable.auto.commit: 是否自动提交offset
  4. auto.commit.interval.ms:自动提交offset的间隔(毫秒),设置间隔是个学问,设置不好会造成频繁rebalance

三、基础命令

创建topic

#创建test topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTopic

查看topic 列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

删除topics

bin/kafka-topics.sh --delete --topic myTopic --zookeeper localhost:2181

查看topic的消费延时情况

bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic myTopic

生成者

bin/kafka-console-producer.sh --topic myTopics --broker-list localhost:9092

消费者

#--from-beginning 是消息offset=0开始,默认则是从当前offset开始;
#group 是用来标识消息消费分组的,当一个消息多个消费者消费时,每个消费者需要制定消费分组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --from-beginning --group default_group 
上一篇下一篇

猜你喜欢

热点阅读