kafka

2024-08-08  本文已影响0人  温岭夹糕

写在开头

仅用于自己学习记录,本章节学习目的是快速上手kafka和体验go操作kafka

目录

文章目录

1.消息队列

消息队列MQ(Message Queue)也被称为中间件,不存储消息内容,只是消息的搬运工,具体表现在:

消息队列的基本形态,就是有N个生产者,N个消费者 image.png

在该模式下,生产者只需要向消息队列投递消息,生产者只需要等消息队列搬运消息,此时,生产者和消费者就解耦了

2.Kafka

kafka是一个分布式,支持多分区,基于zookeeper的分布式消息流平台(元数据都保存在zookeeper中,因此3.7版本之前都需要先安装zookeeper)它同时也是一款开源的基于发布订阅模式的消息引擎系统

为什么要学习kafka,对于数据密集型应用来说,kafka能很好帮助我们应对数据量的激增,举个例子,上游比如是300个示例的大型数据中心,下游是一个搜索和查询的引擎,中间件使用kafka隔离上下游业务,将上游激增的流量缓存起来,以平滑的方式传到到下游子系统中,避免了流量的不规则冲击。

2.1消息引擎系统

看名字就知道,它比MQ逼格更高,wiki上的介绍是

消息引擎是一组规范,企业利用这组规范在不同系统之间传递准确的消息,实现松耦合的异步式数据传递

即:

  1. 用于不同系统之间
  2. 传输的对象是消息

这么一看是不是和MQ大差不差,但是之所以把他叫做引擎,是它能把消息转换成一定的格式,即如何传输消息,如何设计待传输消息的格式都属于消息引擎设计的一部分(摩托车引擎把燃油转为动能,消息引擎也是如此,所以才叫引擎)。
实际上kafka在传输时使用的是纯二进制的字节序列

2.2为什么使用kafka

在这章开头举了如何对抗峰值流量例子,就是削峰填谷,缓冲上下游突发的流量,使其平滑,来保护下游服务

2.3Kafka术语

极客时间中有趣的解释

image.png

2.4集群配置参数

2.4.1Broker端参数

Broker需要配置存储信息,即Broker使用哪些磁盘,针对存储信息的重要参数有以下几个:

与zooKeeper相关设置:

Broker连接相关(客户端连接或与其他broker连接)

Topic相关

数据保留

2.4.1 Topic级别参数

Kafka支持为不同的topic设置不同的参数值,Topic级别参数会覆盖全局broker参数

如何设置topic级别参数?

  1. 创建时设置( Kafka 开放了kafka-topics命令供我们来创建 Topic,--config用于设置topic级别参数 )
  2. 修改时设置(更推荐使用该种)

3.快速上手kafka

我用的3.7.0 不需要额外安装zookeeper
参考Docker---apache/kafka

sudo docker pull apache/kafka:3.7.0

sudo docker run -d --name kafka -p 9092:9092 apache/kafka:3.7.0

但是这种方式有一个弊端,我的kafka是安装在云服务器上的,本地的windows上无法访问!!这时我们想到可能是上面的参数在作祟

advertised.listeners

我们进入容器查看

sudo docker  exec -it kafka /bin/bash

cd opt/kafka/config

cat server.properties | grep listeners

发现advertised.listeners的值为localhost:9092,只允许本地访问,我们需要将他修改成以下形式

//我在云服务器上,这个ip就是我云服务器的弹性公网ip
PLAINTEXT://ip:9092

但是很遗憾,在docker里该文件是只读,我们也没root权限,那么是否启动时修改配置参数就行了,可以,但很麻烦,根据kafka的docker介绍

Apache Kafka 支持多种代理配置,您可以通过环境变量覆盖这些配置。环境变量必须以 开头KAFKA_,代理配置中的任何点都应在相应的环境变量中指定为下划线。
需要注意的是,如果您要覆盖任何配置,则不会使用任何默认配置。

没错你不能光写一个

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localxxxxxx:9092 

还要连其他的一起补充

docker run -d  \
  --name broker \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
  -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
  -e KAFKA_NUM_PARTITIONS=3 \
  apache/kafka:latest

学习成本蹭蹭上去了,所以我选择直接复制一个配置文件给他启动
将kafka里的配置拷贝出一份

mkdir -p config
sudo docker  kafka:/opt/kafka/config/server.properties ./config

cd config
vim server.properties //修改成你的弹性公网ip

vim Dockerfile //写入下面两行
FROM apache/kafka:3.7.0
COPY server.properties /etc/kafka/docker

//构建kafka
sudo docker build -t="mykafka:1.0.0" .

//停止并删除之前的容器
sudo docker stop kafka
sudo docker rm kafka

//启动自己封装的镜像
sudo docker run -d -p 9092:9092 --name kafka mykafka:1.0.0

添加topic

//进入容器
sudo docker exec -it kafka /bin/bash
cd opt/kafka/bin

./kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic tests 

检查topic

./kafka-topics.sh --list --bootstrap-server localhost:9092 

3.1go连接kafka

选用sarama 因为用户多,注意现在文件移动到了IBM

go get -u github.com/IBM/sarama

下载消费者模拟工具模拟消费者消费消息

go install github.com/IBM/sarama/tools/kafka-console-consumer@latest

启动成功表示已经能成功连接远程kafka

 kafka-console-consumer -topic tests -brokers ip地址:9092

编写生产者

func TestProducer(t *testing.T) {
    cfg := sarama.NewConfig()
    cfg.Producer.Return.Successes = true
    cfg.Producer.Return.Errors = true
    cfg.Version = sarama.MaxVersion

    borkers := []string{"xxxxxxxx:9092"}

    producer, err := sarama.NewAsyncProducer(borkers, cfg)
    assert.NoError(t, err)
    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic: "tests",
        Value: sarama.StringEncoder("hello"),
    }

    producer.Input() <- msg

    select {
    case success := <-producer.Successes():
        t.Log(success.Partition, success.Offset)
                return
    case err := <-producer.Errors():
        t.Log("发送失败", err)
    }

}

消费端输出如下

Partition:      0
Offset: 2    
Key:
Value:  hello

生产者输出如下:

0,2
上一篇下一篇

猜你喜欢

热点阅读