go kafka 的使用(修正消费未消费的数据)

2019-07-26  本文已影响0人  MorningandSun

首先去下载sarama

1.生产者

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

func produce(value, msgType string) bool {
    config := sarama.NewConfig()
    // 等待服务器所有副本都保存成功后的响应
    config.Producer.RequiredAcks = sarama.WaitForAll
    // 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    // 是否等待成功和失败后的响应
    config.Producer.Return.Successes = true

    // 使用给定代理地址和配置创建一个同步生产者
    producer, err := sarama.NewSyncProducer([]string{"192.168.0.121:9092"}, config)
    if err != nil {
        panic(err)
    }

    defer producer.Close()

    //构建发送的消息,
    msg := &sarama.ProducerMessage{
        Topic:     "test",                      //包含了消息的主题
        Partition: int32(10),                   //
        Key:       sarama.StringEncoder("key"), //
    }

    for {
        // _, err := fmt.Scanf("%s", &value)
        // if err != nil {
        //  a := err.Error
        //  fmt.Println(a)
        // }
        // fmt.Scanf("%s", &msgType)
        fmt.Println("msgType = ", msgType, ",value = ", value)
        msg.Topic = msgType
        //将字符串转换为字节数组
        msg.Value = sarama.ByteEncoder(value)
        //fmt.Println(value)
        //SendMessage:该方法是生产者生产给定的消息
        //生产成功的时候返回该消息的分区和所在的偏移量
        //生产失败的时候返回error
        partition, offset, err := producer.SendMessage(msg)

        if err != nil {
            fmt.Println("Send message Fail")
            return false
        }
        fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
        return true
    }
}

2.消费者

package main

import (
    "encoding/json"
    "fmt"
    "sync"

    "github.com/Shopify/sarama"
)

var (
    wg sync.WaitGroup
)

func Consumer(topics []string, ip []string, grouid string) {
    defer wg.Done()
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true
    config.Consumer.Offsets.Initial = sarama.OffsetNewest

    // init consumer
    consumer, err := cluster.NewConsumer(ip, grouid, topics, config)
    if err != nil {
        log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", grouid, err)
        return
    }
    defer consumer.Close()

    // trap SIGINT to trigger a shutdown
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    // consume errors
    go func() {
        for err := range consumer.Errors() {
            log.Printf("%s:Error: %s\n", grouid, err.Error())
        }
    }()

    // consume notifications
    go func() {
        for ntf := range consumer.Notifications() {
            log.Printf("%s:Rebalanced: %+v \n", grouid, ntf)
        }
    }()

    // consume messages, watch signals
    var successes int
Loop:
    for {
        select {
        case msg, ok := <-consumer.Messages():
            if ok {
fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", grouid, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
    consumer.MarkOffset(msg, "") //这里是给消费过的offset 打上标记 下次启动从这里进行消费
            }
        case <-signals:
            break Loop
        }
}

3.以上经过测试可以使用 来源 (稍微有一点点改动)https://studygolang.com/articles/17912

上一篇下一篇

猜你喜欢

热点阅读