Kafka程序员Hadoop

分布式消息通信Kafka(一) - 基本操作

2019-04-23  本文已影响3人  JavaEdge

0 相关源码

1 Kafka的简介

1.1 什么是 Kafka

Kafka 是一款分布式消息发布和订阅系统,具有高性能、高吞吐量的特点而被 广泛应用与大数据传输场景。
它是由 LinkedIn 公司开发,使用 Scala 语言编 写,之后成为 Apache 基金会的一个顶级项目。
Kafka 提供了类似 JMS 的特 性,但是在设计和实现上是完全不同的,而且他也不是 JMS 规范的实现。

1.2 Kafka 产生的背景

Kafka 作为一个消息系统,早起设计的目的是用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)。

1.3 Kafka 的应用场景

由于具有更好的吞吐量、内置分区、冗余及容错性的优点(每秒可以处理几十万消息),让 Kafka 成为了一个很好的大规模消息处理应用的解决方案。所以在企业级应用长,主要会应用于如下几个方面

1.3.1 行为跟踪

Kafka可以用于跟踪用户浏览页面、搜索及其他行为。通过发布-订阅模式实时记录到对应的 topic 中,通过后端大数据平台接入处理分析,并做更进一步的实时处理和监控

1.3.2 日志收集

日志收集方面,有很多比较优秀的产品,比如ApacheFlume,很多公司使用 Kafka 代理日志聚合。

日志聚合表示从服务器上收集日志文件,然后放到一个集中的平台(文件服务器)进行处理。

在实际应用开发中,我们应用程序的 log 都会输出到本地磁盘上, 排查问题的话通过 Linux 命令搞定,如果应用程序组成了负载均衡集群,并且集群的机器有几十台以上,那么想通过日志快速定位到问题,就是很麻烦的事情了。
所以一般都会做一个日志统一收集平台管理 log 日志,来快速查询重要应用的问题。所以很多公司的套路都是 把应用日志集中到 Kafka 上,然后分别导入到 es 和 hdfs 上,用来做实时检索分析和离线统计数据备份等。而另一方面,Kafka 本身又提供了很好的 API 来集成日志并且做日志收集

主要设计目标

2 为何使用消息系统

3 常用Message Queue对比

4 Kafka 本身的架构

4.1 典型的 Kafka 集群包含

4.1.1 若干 Producer

可以是应用节点产生的消息,也可以是通过 Flume 收集日志产生的事件

Producer使用 push 模式将消息发布到 broker,

4.1.2 若干 Broker

kafka 支持水平扩展

多个 broker 协同工作

4.1.3 若干Consumer Group

consumer 通过监听使用 pull 模式从 broker 订阅并消费消息

4.1.4 一个 zookeeper 集群

Kafka 通过 zookeeper 管理集群配置及服务协同.

producer 和 consumer 部署在各个业务逻辑中。三者通过 zookeeper 管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。

下图有一个细节是和其他 mq 中间件不同的点,producer 发送消息到 broker 的过程是 push,而 consumer 从 broker 消费消息的过程是 pull,主动去拉数据。而不是 broker 把数据主动发送给 consumer


Terminology

拓扑结构

kafka architecture 架构
  如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

Topic & Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。
为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。
若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1),如下图所示。
  

kafka topic partition
每个日志文件都是一个log entry序列,每个log entry包含一个4字节整型数值(值为N+5),1个字节的”magic value”,4个字节的CRC校验码,其后跟N个字节的消息体。每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:
  message length : 4 bytes (value: 1+4+n)
  “magic” value : 1 byte
  crc : 4 bytes
  payload : n bytes

这个log entry并非由一个文件构成,而是分成多个segment,每个segment以该segment第一条消息的offset命名并以“.kafka”为后缀。
另外会有一个索引文件,它标明了每个segment下包含的log entry的offset范围,如下图所示。

kafka partition event index

因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
  

kafka 顺序写磁盘
对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。
当然,因为磁盘限制,不可能永久保留所有数据(也没必要),因此Kafka提供两种策略删除旧数据。

例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示。

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。
另外,Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

Producer消息路由

Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。
如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。

在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。Paritition机制可以通过指定Producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JasonPartitioner<T> implements Partitioner {
    public JasonPartitioner(VerifiableProperties verifiableProperties) {}
    
    @Override
    public int partition(Object key, int numPartitions) {
        try {
            int partitionNum = Integer.parseInt((String) key);
            return Math.abs(Integer.parseInt((String) key) % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}

如果将上例中的类作为partition.class,并通过如下代码发送20条消息(key分别为0,1,2,3)至topic3(包含4个Partition)。

public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
        List messageList = new ArrayList<KeyedMessage<String, String>>();
        for(int j = 0; j < 4; j++){
            messageList.add(new KeyedMessage<String, String>("topic2", String.valueOf(j), String.format("The %d message for key %d", i,  j));
        }
        producer.send(messageList);
    }
  producer.close();
}

则key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是通过Java程序调用Consumer后打印出的消息列表。
  

kafka consumer rebalance

Consumer Group

(本节所有描述都是基于Consumer hight level API而非low level API)。
  使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。
  

kafka consumer group
  这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
  实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。下图是Kafka在Linkedin的一种简化部署示意图。
   kafka sample deployment in linkedin

下面这个例子更清晰地展示了Kafka Consumer Group的特性。首先创建一个Topic (名为topic1,包含3个Partition),然后创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,最后通过Producer向topic1发送key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时group2中的3个Consumer分别收到了key为1,2,3的消息。如下图所示。
  

kafka consumer group

Push vs. Pull

作为一个消息系统,Kafka遵循了传统的方式,选择由Producer向broker push消息并由Consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。
  push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。
  对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

Kafka delivery guarantee

有这么几种可能的delivery guarantee:

5 Kafka 的安装部署

下载安装包

5.1 Kafka 目录介绍

5.2 启动/停止 Kafka

  1. 需要先启动 zookeeper,如果没有搭建 zookeeper 环境,可以直接运行Kafka 内嵌的 zookeeper
    启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties &
  1. 进入 kafka 目录,运行
bin/kafka-server-start.sh -daemon config/server.properties &
  1. 进入 kafka 目录,运行
bin/kafka-server-stop.sh config/server.properties

6 Kafka 的基本操作

6.1 创建 topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 - -partitions 1 --topic test

6.2 查看 topic

./kafka-topics.sh --list --zookeeper localhost:2181

6.3 查看 topic 属性

bin/kafka-topics.sh --list --zookeeper localhost:2181

6.4 消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic recommend1 --from-beginning

6.5 发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic recommend1

to have launchd start kafka now and restart at login:

brew services start kafka

Or, if you don't want/need a background service you can just run:

  zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

To have launchd start zookeeper now and restart at login:

brew services start zookeeper

Or, if you don't want/need a background service you can just run:

zkServer start

关注公众号:JavaEdge,掌握更多Java流行核心技术

上一篇 下一篇

猜你喜欢

热点阅读