消息队列MQ(Kafka&RabbitMQ)

2. kafka安装

2018-06-23  本文已影响94人  阿飞的博客

这篇文章主要讲解kafka单机安装,以及集群安装。kafka的单机安装非常简单,只需按照下一段落提到的几步操作即可。

单机安装

  1. 下载
    首先从官方下载安装包,官方地址:http://kafka.apache.org/downloads。kafka安装包和一般安装包的命名方式不一样,我们看一个kafka包命名:kafka_2.11-1.1.0.tgz,其中2.11是scala的版本,1.1.0才是kafka的版本。官方强烈建议scala版本和服务器上的scala版本保持一致,避免引发一些不可预知的问题。官方原文如下:
    We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.11 is recommended).

  2. 解压
    将安装包放到服务器上某个位置,执行如下命令:

[afei@kafka opt]$ tar -xzf kafka_2.11-1.1.0.tgz
[afei@kafka opt]$ cd kafka_2.11-1.1.0
  1. 启动zk
    kafka依赖zookeeper,zookeeper集群可以自己搭建,也可以用kafka安装包中内置的shell脚本启动zookeeper。
    如果已经安装了zk单机或者zk集群(单机或者集群都可以,取决于你对高可用的要求),那么可以跳过这一步:
[afei@kafka kafka_2.11-1.1.0]$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-06-21 15:10:14,449] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2018-06-21 15:10:14,466] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-06-21 15:10:14,466] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-06-21 15:10:14,467] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
... ...

这种启动方式如果执行ctrl+z后会退出,启动的进程也会退出,所以建议增加-daemon参数启动:

bin/zookeeper-server-start.sh -daemon  config/zookeeper.properties

说明:笔者后面的测试,假设在10.0.55.209上部署了独立zk,而不是通过kafka安装包中的脚本部署zk,且端口为默认端口2181。

  1. 配置
    启动kafka之前,需要确认一些配置信息,kafka的配置信息在config/server.properties文件中,笔者配置kafka直接显示指定绑定的ip为10.0.55.229,所以需要增加如下一下配置:
host.name=10.0.55.229
port=9092
listeners=PLAINTEXT://10.0.55.229:9092
advertised.listeners=PLAINTEXT://10.0.55.229:9092
  1. 启动
    执行如下脚本即可:
[afei@izwz91rhzhed2c54e6yx87z kafka_2.11-1.1.0]$ bin/kafka-server-start.sh  config/server.properties 
[2018-06-21 15:15:41,830] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-06-21 15:15:42,373] INFO starting (kafka.server.KafkaServer)
[2018-06-21 15:15:42,374] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
... ...

这种启动方式如果执行ctrl+z后会退出,启动的进程也会退出,所以建议增加-daemon参数启动:

bin/kafka-server-start.sh -daemon  config/server.properties

while [ $# -gt 0 ]; do
  COMMAND=$1
  case $COMMAND in
    -name)
      DAEMON_NAME=$2
      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
      shift 2
      ;;
    -loggc)
      if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
        GC_LOG_ENABLED="true"
      fi
      shift
      ;;
    -daemon)
      DAEMON_MODE="true"
      shift
      ;;
    *)
      break
      ;;
  esac
done

# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
  nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi
  1. 发送消息
    发送消息之前,先创建topic(kafka能自动创建topic,通过配置auto.create.topics.enable决定,但是一般生产环境建议关闭该特性):
-- 创建topic
[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-topics.sh --create --zookeeper 10.0.55.209:2181 --replication-factor 1 --partitions 1 --topic TOPIC-TEST-AFEI
Created topic "TOPIC-TEST-AFEI".

-- 查看所有创建的topic集合(结果中包含了刚刚创建的名为TOPIC-TEST-AFEI的topic)
[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-topics.sh --list --zookeeper 10.0.55.209:2181
TOPIC-TEST-AFEI

使用kafka提供的kafka-console-producer.sh发送消息:

[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-console-producer.sh --broker-list 10.0.55.229:9092 --topic TOPIC-TEST-AFEI
>hello+world
>my+name+is+afei
  1. 接收消息
    使用kafka提供的kafka-console-consumer.sh接收消息:
[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server 10.0.55.229:9092 --topic TOPIC-TEST-AFEI --from-beginning
firt
hello+world
my+name+is+afei

集群安装

单机版kafka安装并可以成功发送&接收消息后,搭建kafka集群就比较简单了。

  1. 配置文件
    首先基于conf目录下的server.propreties文件,复制两个文件server-1.propreties,server-2.propreties
    server-1.propreties做如下变更:
broker.id=1
port=9093
log.dirs=/tmp/kafka-logs-1

server-2.properties做类似的变更:

broker.id=2
port=9094
log.dirs=/tmp/kafka-logs-2

由于笔者将三个kafka broker放在一台linux服务器上,所以port不能一样(如果每个broker一台服务器,那么port可以一致,生产环节肯定是每个broker部署在一台独立的服务器上)。但是无论三个broker是否在一台服务器上,broker.id一定是整个集群下全局唯一。否则无法正常启动broker。其原理是在zk上以broker.id的值写入对应的节点,如果写入失败,broker启动就失败。所以整个集群的broker.id要保证全局唯一。

  1. 启动kafka
    执行如下两个命令,分别再启动两个kafka broker:
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties
  1. 集群验证
    由于总计部署了3个broker,所以创建topic时能指定--replication-factor 3,如果不能指定,说明集群部署有问题:
[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-topics.sh --create --zookeeper 10.0.55.209:2181 --replication-factor 3 --partitions 5 --topic TOPIC-TEST-CLUSTER
Created topic "TOPIC-TEST-CLUSTER".

查看刚才创建的topic详情,可以看到总计5个分区(PartitionCount),且副本数为3(ReplicationFactor),且每个分区上有3个副本(通过Replicas的值可以得出),另外最后一列Isr(In-Sync Replicas)即表示处理同步状态的副本集合,这些副本与leader副本保持同步,没有任何同步延迟,另外Leader,Replicas Isr中的0,1,2就是broker id,对应配置文件conf/server.properties中的broker.id

[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-topics.sh --zookeeper 10.0.55.209:2181 --topic TOPIC-TEST-CLUSTER --describe
Topic:TOPIC-TEST-CLUSTER    PartitionCount:5    ReplicationFactor:3 Configs:
    Topic: TOPIC-TEST-CLUSTER   Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2
    Topic: TOPIC-TEST-CLUSTER   Partition: 1    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
    Topic: TOPIC-TEST-CLUSTER   Partition: 2    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    Topic: TOPIC-TEST-CLUSTER   Partition: 3    Leader: 0   Replicas: 0,2,1 Isr: 0,2,1
    Topic: TOPIC-TEST-CLUSTER   Partition: 4    Leader: 1   Replicas: 1,0,2 Isr: 1,0,2
上一篇下一篇

猜你喜欢

热点阅读