眼君的大数据之路

Kafka使用笔记(一、概述)

2020-10-09  本文已影响0人  眼君

概述

Apache Kafka是一个分布式的发布-订阅消息系统,能够支撑海量数据的数据传递。在离线和实时的消息处理业务系统中,Kafka都有广泛的应用。Kafka将消息持久化到磁盘中,并对消息创建了备份保证了数据的安全。Kafka在保证了较高的处理速度的同时,又能保证处理的低延迟和数据的零丢失。

特性

Kafka有如下特性:

  1. 高吞吐量、低延迟:Kafka每秒可以处理几十万条消息,其延迟最低只有几毫秒,每个主题可以分多个分区,消费组对分区进行消费操作;
  2. 可扩展性:Kafka集群支持热扩展;
  3. 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
  4. 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
  5. 高并发:支持数千个客户端同时读写;

技术优势

可伸缩性

Kafka的两个重要特性造就了它的可伸缩性。

  1. Kafka集群在运行期间可以轻松地扩展或收缩(可以添加或删除代理),而不会宕机。
  2. 可以扩展一个Kafka主题来包含更多的分区,由于一个分区无法扩展到多个代理,所以它的容量受到代理磁盘空间的限制。能够增加分区和代理的数量意味着单个主题可以存储的数据量是没有限制的。
容错性和可靠性

Kafka的设计方式使某个代理的故障能够被集群中其他代理检测到。由于每个主题都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此类故障中恢复并继续运行。

吞吐量

Kafka的代理能够以超快的速度有效地存储和检索数据。

基本概念

Producer

Producer即生产者,是数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

Consumer

消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

Topic

在Kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为topic。如果Kafka看作一个谁库,topic可以理解为数据库中的一张表,topic的名字即为表名。

Partition

topic中的数据分割为一个或多个partition,每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消费顺序的场景下,需要将partition数目设为1。

Partition offset

每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。

Replicas of partition

副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为follower的partition中消费数据,而是从为leader的partition中读取数据。副本之间是一主多从的关系。

Broker

Kafka集群包含一个或多个服务器,服务器节点称为broker,broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition;如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个brocker不存储该topic的partition数据;如果brocker数量少于某topic的partition数量,那么一个broker可能存储多个partition。在实际生产环境中,尽量避免这种情况发生,因为这种情况容易导致Kafka集群数据不均衡。

Leader和Follower

每个partition有多个副本,其中仅有一个作为Leader,Leader是当前负责数据读写的partition。Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,使其与Leader保持数据同步。当Leader失效时,会从Follower中选举出一个新的Leader。而当Follower挂掉、卡住或者同步太慢,Leader会把这个Follower从ISR列表中删除,重新创建一个Follower。

Zookeeper

Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由Zookeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。

AR(Assigned Replicas)

分区中所有副本统称AR。

ISR(In-Sync Replicas)

所有与Leader部分保持一定程度的副本组成ISR。

OSR(Out-Sync Replicas)

与Leader副本同步滞后过多的副本。

HW(High Watermark)

高水位,标识一个特定的offset,消费者只能拉到这个offset之前的消息。

LEO(Log End Offset)

日志末端位移,记录了该副本底层日志中下一条消息的位移值。

安装与配置

安装JDK

下载安装JDK,配置好环境变量(JAVA_HOME、JRE_HOME、CLASSPATH、PATH)。

安装配置Zookeeper

Zookeeper是安装Kafka集群的必要组件,Kafka通过Zookeeper来实施对元数据信息的管理,包括集群、主题、分区等内容。
在官网http://zookeeper.apache.org/下载安装包到指定目录,配置好环境变量ZOOKEEPER_HOME。

cd ${ZOOKEEPER_HOME}/conf
cp zoo_sample.cfg zoo.cfg

zoo.cfg配置文件如下:

# The number of milliseconds of each tick
# zk服务器心跳时间
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
# 投票选举新Leader的初始化时间
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
# 数据目录
dataDir=/tmp/zookeeper/data
# 日志目录
dataLogDir=/tmp/zookeeper/log
# the port at which the clients will connect
# zookeeper对外服务端口,保持默认
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

以下命令,启动zookeeper:

bin/zkServer.sh start

Kafka的安装和配置

Kafka是通过Scala语言编写的,所以安装Kafka前需要先安装配置Scala。
之后,下载Kafka安装包解压,配置环境变量。
在kafka目录下执行以下命令启动:

bin/kafka-server-start.sh config/server.properties

在配置文件config/server/properties中需要关注以下几个参数:

# The id of the broker. This must be set to a unique integer for each broker.
# 表示broker的编号,如果集群中有多个broker,则每个broker编号需要设置不同
broker.id=0

#broker对外提供的服务入口地址
listeners=PLAINTEXT://:9092

#设置存放消息日志文件的地址
log.dirs=/tmp/kafka/log 

#Kafka所需Zookeeper集群地址
zookeeper.connect=localhost:2181

消息的生产与消费

创建主题

以下命令用于创建一个主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topicname --partitions 2 --replication-factor 1

--zookeeper:指定了Kafka所连接的Zookeeper服务地址
--topic:指定了所要创建主题的名称
--partitions:指定了分区数
--replication-factor:指定副本数
--create:创建主题的动作命令

以下命令用于展示所有主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --list

以下命令用于查看主题详情:

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicname

以下命令用于启动消费端接收消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname

--bootstrap-server:指定了连接Kafka集群的地址
--topic 指定了消费端订阅的主题

以下命令用于启动生产端发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname

--broker-list:指定了连接Kafka集群的地址
--topic 指定了消费端订阅的主题

Java程序进行Kafka的收发消息

与Kafka的Java客户端相关的Maven依赖如下:

<properties>
    <scala.version>2.11</scala.version>
    <slf4j.version>1.7.21</slf4j.version>
    <kafka.version>2.4.0</kafka.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
  </dependencies>

创建生产者:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerFastStart {
    //Kafka集群地址
    private static final String brokerList = "localhost:9092";
    //主题名称
    private static final String topic = "topicname";

    public static void main(String[] args) {
        Properties properties = new Properties();
        //设置Key序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,10);
        //设置value序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //设置集群地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        //KafkaProducer线程安全
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(properties);
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(topic,"Kafka-demo-01","hello,kafka!");
        try{
            producer.send(record);
        }catch (Exception e){
            e.printStackTrace();
        }
        producer.close();
    }
}

创建消费者:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerFastStart {
    //Kafka集群地址
    private static final String brokerList = "localhost:9092";
    //主题名称
    private static final String topic = "topicname";
    //消费组
    private static final String groupId = "group.demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        //设置Key序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //设置value序列化器
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //设置集群地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        //设置消费组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record:records){
                System.out.println(record.value());
            }
        }
    }
}

注意,需要验证Linux的防火墙是否关闭:

firewall-cmd --state

如果防火墙是开启的,可以启动以下命令关闭:

systemctl stop firewalld.service
systemctl disable firewalld.service
上一篇 下一篇

猜你喜欢

热点阅读