【8】安装kafka

2018-11-12  本文已影响2人  07b287742148

安装

  1. 官网下载安装包
    zookeeper
kafka
  1. zookeeper安装
解压修改配置项

vi zk/conf/zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/apps/tmp/zk/zkdata
dataLogDir=/root/apps/tmp/zk/zkdatalog
clientPort=2181
server.1=master:2888:3888
server.2=slave01:2888:3888
server.3=slave02:2888:3888

发送zookeeper文件夹到各个集群节点

每个集群节点的dataDir目录下创建myid文件,
内容和上面的server.id对应

启动
zk/bin/zkServer.sh start

  1. kafka安装
解压修改配置项
vim kafka/config/server.properties

broker.id=0 //集群各个节点的id要不等且唯一
host.name=master
log.dirs=/root/apps/tmp/kafka-logs
zookeeper.connect=master:2181,slave01:2181,slave02:2181

发送kafka文件夹到各个集群节点

关闭防火墙
service iptables stop
chkconfig iptables off

测试:
    bin目录下启动
    ./kafka-server-start.sh -daemon ../config/server.properties
    
    bin目录下查看topic,
    kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
    
    创建topic
    ./kafka-topics.sh --create --zookeeper master:12181 --replication-factor 2 --partitions 1 --topic test_topic
    
    发布消息到topic
    ./kafka-console-producer.sh --broker-list H30:9092 --topic test_topic
    
    消费消息
    ./kafka-console-consumer.sh --zookeeper localhost:12181 --topic test_topic --from-beginning

概念

  1. 高吞吐量,分布式,发布订阅消息系统;
  2. 具有高性能,持久化,副本备份,横向扩展能力;
  3. 起到解耦,削峰,异步处理作用。

基本组成

  1. producer
  2. consumer
  3. topic
  4. broker
  5. message

特点

  1. 发布订阅模型

消息模型可以分为两种:队列和发布-订阅式。

    队列:处理方式是一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。

    发布-订阅模型:消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。


Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组(consumer group)。

    假如所有的消费者都在一个组中,那么这就变成了queue模型。
    假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。

    更通用的, 可以创建一些消费者组作为逻辑上的订阅者。每个组包含数目不等的消费者,一个组内多个消费者可以用来扩展性能和容错。
  1. 分区
创建一个topic时,同时可以指定分区数目,分区是针对topic不是message;

kafka在接收到生产者发送的消息之后,会根据不同策略将消息存储到不同的分区中。

    策略:随机策略,负载均衡,自定义指定分区
 
好处:
    并发读写,速度快
    
    分布式存储,利于扩展
    
    加快数据恢复,某台机器挂掉,每个topic仅需恢复一部分数据
    
坏处:
    分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性;
    

分区结构:
    1个分区分为多个segment;
    
    每个segment包含两个文件log和index;
    
    log是一个segment的消息元数据
    index是log的稀疏分布索引
    
    
每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费。

分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。类似主键ID。
    
一个消费组消费partition,需要保存offset记录消费到哪,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,而且很容易出现重复消费。
在0.10版本后,kafka把这个offset的保存,从zk总剥离,保存在一个名叫__consumeroffsets topic的topic中。



topic配置的清理策略是compact。总是保留最新的key,其余删掉。
一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。




  1. 数据可靠性和一致性
Kafka集群保持所有的消息,无论消息是否被消费了,直到它们过期,。
Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。server.properties配置文件配置

Partition的多个replica中一个为Leader,其余为follower
Producer只与Leader交互,把数据写入到Leader中
Followers从Leader中拉取数据进行数据同步
Consumer只从Leader拉取数据

ISR:所有不落后的replica集合, 不落后有两层含义:距离上次FetchRequest的时间不大于某一个值或落后的消息数不大于某一个值,Leader失败后会从ISR中选取一个Follower做Leader

数据可靠性保证
当Producer向Leader发送数据时,可以通过acks参数设置数据可靠性的级别
0: 不论写入是否成功,server不需要给Producer发送Response,如果发生异常,server会终止连接,触发Producer更新meta数据;
1: Leader写入成功后即发送Response,此种情况如果Leader fail,会丢失数据
-1: 等待所有ISR接收到消息后再给Producer发送Response,这是最强保证


仅设置acks=-1也不能保证数据不丢失,当Isr列表中只有Leader时,同样有可能造成数据丢失。要保证数据不丢除了设置acks=-1, 还要保证ISR的大小大于等于2,具体参数设置:
request.required.acks:设置为-1 等待所有ISR列表中的Replica接收到消息后采算写成功;
min.insync.replicas: 设置为大于等于2,保证ISR中至少有两个Replica


数据一致性保证
一致性定义:若某条消息对client可见,那么即使Leader挂了,在新Leader上数据依然可以被读到.
HW-HighWaterMark: client可以从Leader读到的最大msg offset,即对外可见的最大offset, HW=max(replica.offset)
对于Leader新收到的msg,client不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被client消费,这样就保证了如果Leader fail,该消息仍然可以从新选举的Leader中获取。

对于来自内部Broker的读取请求,没有HW的限制。同时,Follower也会维护一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)

上一篇 下一篇

猜你喜欢

热点阅读