go项目Kafka示例

2023-08-19  本文已影响0人  yichen_china

消费者

KafkaConsumer.go

package cws
import (
    "github.com/IBM/sarama"
    "log"
    "os"
    "os/signal"
)

func KafkaConsumer() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    client, err := sarama.NewClient([]string{"localhost:9192", "localhost:9292", "localhost:9392"}, config)
    defer client.Close()
    if err != nil {
        panic(err)
    }
    consumer, err := sarama.NewConsumerFromClient(client)

    defer consumer.Close()
    if err != nil {
        panic(err)
    }
    // get partitionId list
    partitions, err := consumer.Partitions("my_topic")
    if err != nil {
        panic(err)
    }

    for _, partitionId := range partitions {
        // create partitionConsumer for every partitionId
        partitionConsumer, err := consumer.ConsumePartition("my_topic", partitionId, sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }

        go func(pc *sarama.PartitionConsumer) {
            defer (*pc).Close()
            // block
            for message := range (*pc).Messages() {
                value := string(message.Value)
                log.Printf("Partitionid: %d; offset:%d, value: %s\n", message.Partition, message.Offset, value)
            }

        }(&partitionConsumer)
    }
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    select {
    case <-signals:

    }
}

消费组

KafkaConsumerGroup.go

package cws

import (
    "context"
    "fmt"
    "github.com/IBM/sarama"
    "os"
    "os/signal"
    "sync"
)

type consumerGroupHandler struct {
    name string
}

func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
        // 手动确认消息
        sess.MarkMessage(msg, "")
//手动的话需要提交下。
        sess.Commit()
    }
    return nil
}

func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {
    wg.Done()
    for err := range (*group).Errors() {
        fmt.Println("ERROR", err)
    }
}

func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, name string) {
    fmt.Println(name + "start")
    wg.Done()
    ctx := context.Background()
    for {
        topics := []string{"my_topic"}
        handler := consumerGroupHandler{name: name}
        err := (*group).Consume(ctx, topics, handler)
        if err != nil {
            panic(err)
        }
    }
}

func KafkaConsumerGroup() {
    var wg sync.WaitGroup
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = false
    config.Version = sarama.V0_10_2_0
    client, err := sarama.NewClient([]string{"localhost:9192", "localhost:9292", "localhost:9392"}, config)
    defer client.Close()
    if err != nil {
        panic(err)
    }
    group1, err := sarama.NewConsumerGroupFromClient("c1", client)
    if err != nil {
        panic(err)
    }
    group2, err := sarama.NewConsumerGroupFromClient("c2", client)
    if err != nil {
        panic(err)
    }
    group3, err := sarama.NewConsumerGroupFromClient("c3", client)
    if err != nil {
        panic(err)
    }
    defer group1.Close()
    defer group2.Close()
    defer group3.Close()
    wg.Add(3)
    go consume(&group1, &wg, "c1")
    go consume(&group2, &wg, "c2")
    go consume(&group3, &wg, "c3")
    wg.Wait()
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    select {
    case <-signals:
    }
}

生产者

KafkaProducer.go

package cws

import (
    "github.com/IBM/sarama"
    "log"
    "os"
    "os/signal"
    "sync"
)

func main() {
    config := sarama.NewConfig()

    config.Producer.Return.Successes = true
    config.Producer.Partitioner = sarama.NewRandomPartitioner

    client, err := sarama.NewClient([]string{"192.168.0.104:9192", "localhost:9292", "localhost:9392"}, config)
    defer client.Close()
    if err != nil {
        panic(err)
    }
    producer, err := sarama.NewAsyncProducerFromClient(client)
    if err != nil {
        panic(err)
    }

    // Trap SIGINT to trigger a graceful shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var (
        wg                          sync.WaitGroup
        enqueued, successes, errors int
    )

    wg.Add(1)
    // start a groutines to count successes num
    go func() {
        defer wg.Done()
        for range producer.Successes() {
            successes++
        }
    }()

    wg.Add(1)
    // start a groutines to count error num
    go func() {
        defer wg.Done()
        for err := range producer.Errors() {
            log.Println(err)
            errors++
        }
    }()

ProducerLoop:
    for {
        message := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")}
        select {
        case producer.Input() <- message:
            enqueued++

        case <-signals:
            producer.AsyncClose() // Trigger a shutdown of the producer.
            break ProducerLoop
        }
    }

    wg.Wait()

    log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}

上一篇下一篇

猜你喜欢

热点阅读