golang kafka小试消息队列

2018-06-23  本文已影响670人  Jancd

Kafka 安装配置、更多资料请参考其官网。

启动 kafka server

在这之前需要启动 zookeeper 做服务治理(单机)。

$ bin/zkServer.sh status conf/zoo_sample.cfg

如提示权限限制加上 sudo

启动 kafka server

$ bin/kafka-server-start.sh config/server.properties

启动消息队列(本部分仅为测试 server)

新建 Topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

$ bin/kafka-topics.sh --list --zookeeper localhost:2181 (test)

1. 启动 Producer

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

2. 启动 Consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

此时在 Producer 端发送消息,在 Consumer 就会显示,如下图所示。

(上图中 Consumer 多出了好几个消息是我截图之前测试发出的)


Action

本文使用 sarama 库作为 kafka 的 go API。sarama 库没有给出很具体的文档,可以参考其源码。

Producer

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    addr := []string{"localhost:9092"}

    producer, err := sarama.NewSyncProducer(addr, config)
    if err != nil {
        panic(err)
    }

    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic:     "hello",
        Partition: int32(-1),
        Key:       sarama.StringEncoder("key"),
    }

    var value string
    for {
        _, err := fmt.Scanf("%s", &value)
        if err != nil {
            break
        }
        msg.Value = sarama.ByteEncoder(value)
        fmt.Println(value)

        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
            fmt.Println("Send message Fail")
        }
        fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
    }
}

Consumer

package main

import (
    "fmt"
    "sync"
    "github.com/Shopify/sarama"
)

var (
    wg  sync.WaitGroup
)

func main() {
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        panic(err)
    }

    partitionList, err := consumer.Partitions("hello")
    if err != nil {
        panic(err)
    }

    for partition := range partitionList {
        pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }

        defer pc.AsyncClose()

        wg.Add(1)

        go func(sarama.PartitionConsumer) {
            defer wg.Done()
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
            }

        }(pc)
    }
    wg.Wait()
    consumer.Close()
}

结果如下:

上一篇下一篇

猜你喜欢

热点阅读