玩转大数据spark大数据

kafka集群搭建、避坑指南以及常用操作

2016-09-29  本文已影响4034人  东皇Amrzs

首先,kafka的搭建是建立在zookeeper的基础上的,不懂zookeeper怎么搭建的可以先看我上一篇文章
zookeeper环境搭建以及避坑指南

一、下载并解压kafka。

可以从 http://kafka.apache.org/downloads.html 这里下载最新的kafka镜像放到/usr/local/kafka目录里

mkdir /usr/local/kafka
cd /usr/local/kafka  
tar -zxvf kafka_2.10-0.8.2.2.tgz 

二、创建日志目录

kafka的日志目录正常情况下会很大(连续跑一周上百G很容易),可以专门挂载一个盘来存储kafka的目录,这里我挂载到了/data目,当然,如果你是用了我下面的配置参数,那每个日志文件最多也就1个G,除非你的业务需要日志文件保留很长时间,否则就按照第三步的配置来就行了

mkdir /data/kafka-logs   

三、修改配置文件(列出来的都是可避坑配置项,血与泪的教训)

kafka的配置文件为conf/server.properties修改此配置文件即可

注意,只有broker.idlisterners的要和具体的节点相对应,剩下的是直接相同的。下文中列出来的都是需要修改或者添加的

############################# Server Basics #############################
broker.id=0
############################# Socket Server Settings #############################
listeners=PLAINTEXT://192.168.1.150:9092 #如果不配置,默认取PLAINTEXT://your.host.name:9092
############################# Log Basics #############################
log.dirs=/home/xzp/kafka-logs
delete.topic.enable=true                 #通过配置此项使得删除topic的指令生效
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion
log.cleanup.policy=delete # 日志的清除策略:直接删除
log.retention.hours=72  # 日志保存时间为3天 
log.segment.bytes=1073741824 # 每个日志文件的最大的大小,这里为1GB

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#配置zookeeper集群url,这里很重要
zookeeper.connect=192.168.1.145:2181,192.168.1.150:2181,192.168.1.138:2181
  
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

尤其注意这里的 Log Retention Policy配置项,清除无用的日志文件很重要.

四、启动kafka:

这里注意,记得要先启动zookeeper。确保zookeeper启动以后再执行kafka的启动命令:

QuorumPeerMain就是zookeeper进程
bin/kafka-server-start.sh -daemon config/server.properties

七、检测是否成功启动:

有kafka进程存在即成功启动

八、kafka常用操作流程:

Step 1: Start the server
后台方式启动,推荐第一次配置的新手不要加入-daemon参数,看看控制台输出的是否有success.

bin/kafka-server-start.sh -daemon config/server.properties 

Step 2: Create a topic(replication-factor一定要大于1,否则kafka只有一份数据,leader一旦崩溃程序就没有输入源了,分区数目视输入源而定)

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic nodeHlsTest

Step 3: Describe a topic

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic nodeHlsTest

step 3: list the topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

step 4: send some message

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nodeHlsTest

step 5: start a consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nodeHlsTest --from-beginning

step 6: delete a topic
要事先在 serve.properties 配置 delete.topic.enable=true

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic nodeHlsTest
# 如果仍然只是仅仅被标记了删除(zk中并没有被删除),那么启动zkCli.sh,输入如下指令
rmr /brokers/topics/nodeHlsTest
上一篇下一篇

猜你喜欢

热点阅读