golang kafka 消费者

2023-07-25  本文已影响0人  東玖零

背景:应用需要在某个事件完成后写表记录,以前这个事件有kafka消息,于是监听即可。

自己也不是特别了解,经过摸索可以正常使用,并添加了一些注释,如有不还正确请指正,上代码吧!

//导入头文件
"github.com/Shopify/sarama"
"github.com/astaxie/beego"
cluster "github.com/bsm/sarama-cluster"

var kafkaTag = "kafka message"

func StartKafka() {
    // 配置
    config := cluster.NewConfig()
    // 设定是否需要返回错误信息
    config.Consumer.Return.Errors = true
    // 为成员分配主题分区的策略
    config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    // 这里 sarama.OffsetNewest 就从最新的开始消费,即该 consumer 启动之前产生的消息都无法被消费
    // 如果改为 sarama.OffsetOldest 则会从最旧的消息开始消费,即每次重启 consumer 都会把该 topic 下的所有消息消费一次
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    // 每 1 秒钟提交一次 offset
    config.Consumer.Offsets.CommitInterval = 1 * time.Second
    //Notifications 返回消费者期间发生的通知的通道
    // 重新平衡。 如果您的配置是,通知只会通过此通道发出
    config.Group.Return.Notifications = true

    // 创建消费者
    brokers := strings.Split(beego.AppConfig.String("kafka_url"), ",")
    groupId := "kafka-groupId-01" // 消息者组,每个后台应用创建者独有,如果你使用其他后台代码中,可能会导致自己或其他人收不消息
    topics := []string{"kafka-topics-01"} // 订阅主题
    consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
    if err != nil {
        logs.Error(kafkaTag, "new consumer error: ", err.Error())
        return
    }
    defer consumer.Close()

    // trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    // 接收错误
    go func() {
        for err := range consumer.Errors() {
            logs.Error(kafkaTag, "consumer error:", err.Error())
        }
    }()

    // 打印一些rebalance的信息
    go func() {
        for ntf := range consumer.Notifications() {
            logs.Info(kafkaTag, "consumer notification:", ntf)
        }
    }()

    // 循环从通道中获取消息
    for {
        select {
        case msg, ok := <-consumer.Messages():
            if ok {
                logs.Info(kafkaTag, "Topic = ", msg.Topic, "Partition = ", msg.Partition, "Offset = ", msg.Offset, "Key = ", string(msg.Key), "Value", string(msg.Value))
                if msg.Topic == "kafka-topics-01" {
                    // 自己解析 使用json库解析msg.Value
                    // 然后写表
                }
                consumer.MarkOffset(msg, "") // 上报offset
            } else {
                logs.Error(kafkaTag, "监听服务失败")
            }
        case <-signals:
            return
        }
    }
}

配置文件中
kafka_url = kafka地址1,kafka地址2,kafka地址3

如果不使用strings.Split进行分个割,启动会报错,报错如下:

kafka message new consumer error: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

其实就是把多个地址当一个地址去启动了。

上一篇 下一篇

猜你喜欢

热点阅读