kafka 2021-11-23

2021-11-23  本文已影响0人  9_SooHyun

Kafka 基础

Golang sarama使用kafka案例

基本概念

// comsumer.go
// consumer实现了Consumer接口
type consumer struct {
    conf            *Config
    // PartitionConsumer processes Kafka messages from a given topic and partition.
    children        map[string]map[int32]*partitionConsumer 
    brokerConsumers map[*Broker]*brokerConsumer
    client          Client
    lock            sync.Mutex
}

// 而consumer的Close()、Topics()、Partitions()实际都是调用client的同名方法。例如:
func (c *consumer) Topics() ([]string, error) {
    return c.client.Topics()
}

// sarama提供了两个方法返回一个consumer instance
NewConsumer & NewConsumerFromClient
// comsumer_group.go
// 接口ConsumerGroup
type ConsumerGroup interface {
 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
 Errors() <-chan error
 Close() error
}
// Consume是消费者组进行消息消费的方法
// Consume joins a cluster of consumers for a given list of topics and 
// starts a 【blocking ConsumerGroupSession through the ConsumerGroupHandler】.

// consumerGroup是接口ConsumerGroup的具体实现
type consumerGroup struct {
    //一个consumerGroup包含了一个client和一个consumer,通常是为了继承client和consumer的方法
    client Client
    config   *Config
    consumer Consumer
    groupID  string
    memberID string
    errors   chan error

    lock      sync.Mutex
    closed    chan none  // type none struct{}
    closeOnce sync.Once

    userData []byte
}

// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
// It also provides hooks for your consumer group session life-cycle and allow you to
// trigger logic before or after the consume loop(s).
// 指可以自己实现Setup(ConsumerGroupSession)和Cleanup(ConsumerGroupSession)
//
// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
// ensure that all state is safely protected against race conditions.
type ConsumerGroupHandler interface {
    // Setup is run at the beginning of a new session, before ConsumeClaim.
    // session开始前会调用setup
    Setup(ConsumerGroupSession) error

    // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
    // but before the offsets are committed for the very last time.
    // session结束后会调用Cleanup
    Cleanup(ConsumerGroupSession) error

    // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
    // Once the Messages() channel is closed, the Handler must finish its processing
    // loop and exit.
    ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

一个comsumergroup会在一个consumerGroupSession内进行消息的消费

type consumerGroupSession struct {
    parent       *consumerGroup
    memberID     string
    generationID int32
    handler      ConsumerGroupHandler // 就是一个个comsumer

    claims  map[string][]int32
    offsets *offsetManager
    ctx     context.Context
    cancel  func()

    waitGroup       sync.WaitGroup
    releaseOnce     sync.Once
    hbDying, hbDead chan none
}

comsumergroup的消费动作调用链如下
*comsumergroup.Consume()
-> *comsumergroup.newSession()
-> *consumerGroupSession.consume(topic, partition)
-> ConsumerGroupHandler.ConsumeClaim(*consumerGroupSession, claim)

consumerGroup使用样例
t := "test_topic"

// #### 实现sarama.ConsumerGroupHandler ####
// Consumer represents a Sarama consumergroup consumer
type Consumer struct {
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    fmt.Println("consumer Setup")
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    fmt.Println("consumer Cleanup")
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    // NOTE:
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
    for message := range claim.Messages() {
        if message.Topic == t {
             // ProcessMessage
             fmt.Println(message.Value)
        }
    }
    return nil
}
// #### 实现sarama.ConsumerGroupHandler ####


// get a kafka config
config := sarama.NewConfig()
// 获得comsumergroup
brokers := []string{"127.0.0.1:9092"}
groupname := "my_group"
consumerG, err := sarama.NewConsumerGroup(brokers, groupname, config)

// Iterate over consumer sessions.
ctx := context.Background()
topics := []string{t}
for {
  handler := Consumer{}
  // `Consume` should be called inside an infinite loop, when a
  // server-side rebalance happens, the consumer session will need to be
  // recreated to get the new claims
  // consumerG.Consume里面会起一个consumerGroupSession,放在for循环里面,每次都是新的session,方便响应rebalance
  err := consumerG.Consume(ctx, topics, &handler)
  if err != nil {
  fmt.Println(err.Error())
  return err
  }
}
上一篇 下一篇

猜你喜欢

热点阅读