Kafka集群环境搭建

2018-10-10  本文已影响0人  孤尘F

1.安装scala环境

第一种方法

1.访问scala的官网这里下载最新版的scala。

2.解压缩文件包,可将其移动至/usr/local/share

mv /download/scalapath /usr/local/share

3.修改环境变量,在mac下使用sudo su进入管理员权限,修改配置文件profile,

vim /etc/profile

在文件的末尾加入

export PATH="$PATH:/usr/local/share/scala/bin"

:wq!保存退出,重启终端,完成scala的配置安装。

第二种方法

如果本机有安装Ruby的话则安装更加简单,可以借助Homebrew进行安装。 首先安装Homebrew

ruby -e “$(curl -fsSL https://raw.github.com/mxcl/homebrew/go)”

再进行scala的安装brew install scala

2.安装ZooKeeper

安装三个ZooKeeper节点
下载ZooKeeper,地址
执行如下命令安装ZooKeeper:

tar -zxvf zookeeper-3.4.13.tar.gz
mv zookeeper-3.4.13.tar.gz zookeeper
mkdir /Users/XXX/kafka/data_logs
#依次创建三个zk节点的目录
#/Users/XXX/kafka/data_logs/zookeeper1目录下创建myid文件,文件中只含有此zk的id:1
mkdir /Users/XXX/kafka/data_logs/zookeeper1
#/Users/XXX/kafka/data_logs/zookeeper2目录下创建myid文件,文件中只含有此zk的id:2
mkdir /Users/XXX/kafka/data_logs/zookeeper2
#/Users/XXX/kafka/data_logs/zookeeper3目录下创建myid文件,文件中只含有此zk的id:3
mkdir /Users/XXX/kafka/data_logs/zookeeper3

zk配置文件创建:

#进入zookeeper-3.4.13.tar.gz解压之后的目录
cd /Users/XXX/kafka/zookeeper
#在conf目录下依次创建zoo1.cfg、zoo2.cfg、zoo3.cfg三个配置文件,内容如下:

zoo1.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=5
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=2
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/Users/fureitakara/kafka/data_logs/zookeeper1
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

zoo2.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=5
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=2
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/Users/fureitakara/kafka/data_logs/zookeeper2
# the port at which the clients will connect
clientPort=2182
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

zoo3.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=5
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=2
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/Users/fureitakara/kafka/data_logs/zookeeper3
# the port at which the clients will connect
clientPort=2183
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

注:上面三个配置文件的场景是在同一台机器配置多个zookeeper节点的情况下进行使用,如果是在多台机器上配置zookeeper节点的话,只需要一个zoo.cfg文件即可。

启动zookeeper:
如果是在一台机器上启动多个不同的zookeeper节点的话,使用如下命令:

java -cp zookeeper-3.4.13.jar:lib/slf4j-api-1.7.25.jar:lib/slf4j-log4j12-1.7.25.jar:lib/log4j-1.2.17.jar:conf org.apache.zookeeper.server.quorum.QuorumPeerMain conf/zoo1.cfg
java -cp zookeeper-3.4.13.jar:lib/slf4j-api-1.7.25.jar:lib/slf4j-log4j12-1.7.25.jar:lib/log4j-1.2.17.jar:conf org.apache.zookeeper.server.quorum.QuorumPeerMain conf/zoo2.cfg
java -cp zookeeper-3.4.13.jar:lib/slf4j-api-1.7.25.jar:lib/slf4j-log4j12-1.7.25.jar:lib/log4j-1.2.17.jar:conf org.apache.zookeeper.server.quorum.QuorumPeerMain conf/zoo3.cfg

如果是在不同的机器上启动zookeeper节点的话,直接使用如下命令即可:

bin/zkServer.sh start conf/zoocfg

检查整个集群的状态:

bin/zkServer.sh status conf/zoo1.cfg
bin/zkServer.sh status conf/zoo2.cfg
bin/zkServer.sh status conf/zoo3.cfg

此时整个zookeeper集群搭建完毕

3.安装kafka集群

下载kafka地址

tar -zxvf kafka_2.12-2.0.0.tgz
mv kafka_2.12-2.0.0.tgz kafka_2.12
#zookeeper安装过程中已创建可以不用重复创建
mkdir /Users/XXX/kafka/data_logs
#创建kafka log目录
mkdir /Users/XXX/kafka/data_logs/kafka1
mkdir /Users/XXX/kafka/data_logs/kafka2
mkdir /Users/XXX/kafka/data_logs/kafka3

进入kafka解压目录

cd /Users/XXX/kafka/kafka

在config目录下构建三个kafka服务端配置文件:
server.properties:

#每个kafka服务id不同
broker.id=0
#每个kafka服务监听端口不一致
listeners=PLAINTEXT://127.0.0.1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
#每个kafka服务目录不一致
log.dirs=/Users/fureitakara/kafka/data_logs/kafka1
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#配置zk集群
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true

这个配置文件创建三份,上面有注释的地方进行每个服务的区分
启动kafka服务

bin/kafka-server-start.sh -daemon config/server1.properties
bin/kafka-server-start.sh -daemon config/server2.properties
bin/kafka-server-start.sh -daemon config/server3.properties

使用jps命令查看kafka服务状态:

jps |grep Kafka

kafka集群搭建完毕

4.验证

创建topic:

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --create --topic test-topic --partitions 3 --replication-factor 3

出现如输出即为成功:

fulibaodeMacBook-Pro:kafka fureitakara$ bin/kafka-topics.sh --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --create --topic test-topic --partitions 3 --replication-factor 3
Created topic "test-topic".

进一步验证:

#列出所有topic
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 -list
test-topic
#test-topic的详细描述
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --describe --topic test-topic
Topic:test-topic    PartitionCount:3    ReplicationFactor:3 Configs:
    Topic: test-topic   Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
    Topic: test-topic   Partition: 1    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    Topic: test-topic   Partition: 2    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

消息的发送与消费:

#生产者测试:
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic test-topic
>this is a test message
>hello,kafka
>haha
#消费者测试
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic test-topic --from-beginning
hello,kafka
this is a test message
haha

吞吐量测试:

#生产者吞吐量测试:
bin/kafka-producer-perf-test.sh --topic test-topic --num-records 500000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 acks=-1
433179 records sent, 86601.2 records/sec (16.52 MB/sec), 1043.5 ms avg latency, 1494.0 max latency.
500000 records sent, 90285.301553 records/sec (17.22 MB/sec), 1075.92 ms avg latency, 1494.00 ms max latency, 1271 ms 50th, 1441 ms 95th, 1473 ms 99th, 1489 ms 99.9th.
#消费者吞吐量测试
bin/kafka-consumer-perf-test.sh --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --messages 500000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2018-10-10 19:45:56:721, 2018-10-10 19:45:57:436, 95.3667, 133.3800, 500000, 699300.6993, 45, 670, 142.3384, 746268.6567
上一篇下一篇

猜你喜欢

热点阅读