分布式消息队列kafka

2019-05-25  本文已影响0人  机灵鬼鬼

kafka三大特性

第一、发布和订阅

第二、实时的流处理

第三、安全地存储流数据在集群节点上

kafka的架构

producers、consumers、brocker

First a few concepts:

Kafka is run as a cluster on one or more servers that can span multiple datacenters.

解析:kafka是一个以集群形式运行在一个或多个机器上,跨多数据中心的服务。

The Kafka cluster stores streams of records in categories called topics.

解析:kafa集群使用topics把流数据记录存储时做分类。

Each record consists of a key, a value, and a timestamp.

解析:每一条记录包含一个key,一个value,和一个时间戳

查看官方文档的QUICKSTART模块

由于kafka依赖zookeeper,所以我们安装kafka之前要先安装zookeeper

Zookeeper的下载地址,统一从cdh5那个地址(http://archive.cloudera.com/cdh5/cdh/5/)下载。

解压zk

配置环境变量~/.bash_profile

配置zk的数据存储目录

下载kafka

1、单节点单broker的部署及使用

解压kafka

tar -zxvf kafka_2.11-2.2.0.tgz

设置kafka的根目录和启动bin目录配置在环境变量里面

PATH=$PATH:$HOME/bin

export KAFKA_HOME=/usr/local/kafka_2.11-2.1.1

export PATH=$KAFKA_HOME/bin:$PATH

配置环境变量当前登录用户的私有环境变量~/.bash_profile

配置kafka的配置文件server.properties

$KAFKA_HOME/config/server.properties

broker.id=0  //这个配置是broker的编号,且该编号唯一对应一个broker不能重复。

# The address the socket server listens on. It will get the value returned from

# java.net.InetAddress.getCanonicalHostName() if not configured

listeners=PLAINTEXT://10.101.3.3:9092,如果不设置他会默认本机hostname

log.dirs //存储kafka的日志

zookeeper.connect //zk的地址

启动zookeeper

如果你已经有了zookeeper,那就跳过该步骤,如果没有那就直接使用kafka自己安装包里的zookeeper,启动命令

bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

后台方式: ./kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties &

会话方式:./kafka-server-start.sh ${KAFKA_HOME}/config/server.properties

查看是否启动 jps 命令

查看kafka、zookeeper使用的配置文件   jps -m

创建一个topic:指定ZK

./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 1 --partitions 1 --topic mytopic

备注:1、创建topic的时候,需要使用--create命令 2、需要指定zk的地址--zookeeper 10.101.3.3:2181 3、指定副本系数--replication-factor 1 4、指定分区 --partitions 1  5、指定topic的名字--topic mytopic

查看所有已经创建的topic

bin/kafka-topics.sh --list --zookeeper 10.101.3.3:2181

生产消息:指定Broker(localhost最好是ip或者域名,不要使用localhost)

./kafka-console-producer.sh --broker-list 10.101.3.3:9092 --topic mytopic

备注:启动消息生产者,也就是启动一个broker,需要使用kafka-console-producer.sh命令指定broker的服务暴漏端口和对应的topic。如果你出现以下错误,就是因为ip或域名没有设置正确导致的

消费消息:指定生产者关联的Broker

./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9092 --topic mytopic --from-beginning

--from-beginning参数的解析

这个参数就是告诉消费者服务,要全部监听所有生产者的消息。如果不带有此参数,那么该消费服务只会接受启动时间之后监听的消息,之前的消息是不会监听的。

查看所有topic的设置详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 

查看某一个topic的 设置详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic这里只查看名叫test的topic的设置信息

关闭kafka的服务

bin/kafka-server-stop.sh

关闭前

关闭后

kafka服务消失

2、单节点多Broker部署及应用

首先看下图,是官网原图:意思就是把kafka的server.properties配置文件复制多份。

我们就按照官网说的来,我部署三个节点

三个broker的配置文件

需要把server-1.properties、server-1.properties、server-1.properties的broker.id和日志目录和listeners配置分别改一下

config/server-1.properties:

    broker.id=1

    listeners=PLAINTEXT://:9093

    log.dirs=/tmp/kafka-logs-1

config/server-2.properties:

    broker.id=2

    listeners=PLAINTEXT://:9094

    log.dirs=/tmp/kafka-logs-2

config/server-3.properties:

    broker.id=3

listeners=PLAINTEXT://:9095

    log.dirs=/tmp/kafka-logs-3

其他的两个同理设置

启动多broker(后台方式启动)

./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &

./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &

./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

启动后,可以看到3各kafka的服务。

kafka服务对应的配置文件:jps -m可以查看到

创建一个topic

./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 3 --partitions 1 --topic my_m_topic

备注:这时候创建topic时,需要根据broker的节点数量,指定副本系数--replication-factor,这时候就应该为3了。

查看所有已创建的topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

查看我们多副本topic的详情

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my_m_topic

其中Leader:代表主副本。Replicas:代表副本所在的broker机器编号1、2、3  Isr:代表存活的副本。

生产消息:指定Broker(localhost最好是ip或者域名,不要使用localhost),这里的--broker-list后面需要多个broker节点ip

./kafka-console-producer.sh --broker-list 10.101.3.3:9093,10.101.3.3:9094,10.101.3.3:9095 --topic my_m_topic


消费消息:指定生产者关联的zk

./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9093,10.101.3.3:9094,10.101.3.3:9095 --from-beginning --topic my_m_topic 

删除topic

server.properties 设置 delete.topic.enable=true

如果没有设置 delete.topic.enable=true,则调用kafka 的delete命令无法真正将topic删除,而是显示(marked for deletion)

调用命令删除topic:

./kafka-topics.sh --delete --zookeeper localhost:2181 --topic my_m3_topic

删除kafka存储目录(server.properties文件log.dirs配置,默认为"/data/kafka-logs")相关topic的数据目录。

recovery-point-offset-checkpoint和replication-offset-checkpoint中的1变为0,删除topic对应的名字

kafka的容错性

如果kafka的多个节点中,我们杀掉任何一个节点,都不会影响消息的传输。如果我们干掉leader节点,他内部会把其他副节点转正,成为leader,也不会影响消息传输。

上一篇下一篇

猜你喜欢

热点阅读