Go语言用例

golang如何使用sarama访问kafka

2018-12-15  本文已影响2人  CodingCode

golang如何使用sarama访问kafka

下面一个客户端代码例子访问kafka服务器,来发送和接受消息。

使用方式

  1. 命令行参数
$ ./kafkaclient -h
Usage of ./client:
  -ca string
        CA Certificate (default "ca.pem")
  -cert string
        Client Certificate (default "cert.pem")
  -command string
        consumer|producer (default "consumer")
  -host string
        Common separated kafka hosts (default "localhost:9093")
  -key string
        Client Key (default "key.pem")
  -partition int
        Kafka topic partition
  -tls
        TLS enable
  -topic string
        Kafka topic (default "test--topic")
  1. 作为producer启动
$ ./kafkaclient -command producer \
  -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command producer \
  -tls -cert client.pem -key client.key -ca ca.pem \
  -host kafka1:9093,kafka2:9093

producer发送消息给kafka:

> aaa
2018/12/15 07:11:21 Produced message: [aaa]
> bbb
2018/12/15 07:11:30 Produced message: [bbb]
> quit
  1. 作为consumer启动
$ ./kafkaclient -command consumer \
  -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command consumer \
  -tls -cert client.pem -key client.key -ca ca.pem \
  -host kafka1:9093,kafka2:9093

consumer从kafka接受消息:

2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]

完整源代码如下

这个代码使用到了Shopify/sarama库,请自行下载使用。

$ cat kafkaclient.go
package main

import (
    "flag"
    "fmt"
    "log"
    "os"
    "io/ioutil"
    "bufio"
    "strings"

    "crypto/tls"
    "crypto/x509"

    "github.com/Shopify/sarama"
)

var (
    command     string
    tlsEnable   bool
    hosts       string
    topic       string
    partition   int
    clientcert  string
    clientkey   string
    cacert      string
)

func main() {
    flag.StringVar(&command,    "command",      "consumer",         "consumer|producer")
    flag.BoolVar(&tlsEnable,    "tls",          false,              "TLS enable")
    flag.StringVar(&hosts,      "host",         "localhost:9093",   "Common separated kafka hosts")
    flag.StringVar(&topic,      "topic",        "test--topic",      "Kafka topic")
    flag.IntVar(&partition,     "partition",    0,                  "Kafka topic partition")
    flag.StringVar(&clientcert, "cert",         "cert.pem",         "Client Certificate")
    flag.StringVar(&clientkey,  "key",          "key.pem",          "Client Key")
    flag.StringVar(&cacert,     "ca",           "ca.pem",           "CA Certificate")
    flag.Parse()

    config := sarama.NewConfig()
    if tlsEnable {
        //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
        tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
        if err != nil {
            log.Fatal(err)
        }

        config.Net.TLS.Enable = true
        config.Net.TLS.Config = tlsConfig
    }
    client, err := sarama.NewClient(strings.Split(hosts, ","), config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    if command == "consumer" {
        consumer, err := sarama.NewConsumerFromClient(client)
        if err != nil {
            log.Fatal(err)
        }
        defer consumer.Close()
        loopConsumer(consumer, topic, partition)
    } else {
        producer, err := sarama.NewAsyncProducerFromClient(client)
        if err != nil {
            log.Fatal(err)
        }
        defer producer.Close()
        loopProducer(producer, topic, partition)
    }
}

func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
    // load client cert
    clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
    if err != nil {
        return nil, err
    }

    // load ca cert pool
    cacert, err := ioutil.ReadFile(cacertfile)
    if err != nil {
        return nil, err
    }
    cacertpool := x509.NewCertPool()
    cacertpool.AppendCertsFromPEM(cacert)

    // generate tlcconfig
    tlsConfig := tls.Config{}
    tlsConfig.RootCAs = cacertpool
    tlsConfig.Certificates = []tls.Certificate{clientcert}
    tlsConfig.BuildNameToCertificate()
 // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
    return &tlsConfig, err
}

func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
    scanner := bufio.NewScanner(os.Stdin)
    fmt.Print("> ")
    for scanner.Scan() {
        text := scanner.Text()
        if text == "" {
        } else if text == "exit" || text == "quit" {
            break
        } else {
            producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
            log.Printf("Produced message: [%s]\n",text)
        }
        fmt.Print("> ")
    }
}

func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
    partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
    if err != nil {
        log.Println(err)
        return
    }
    defer partitionConsumer.Close()

    for {
        msg := <-partitionConsumer.Messages()
        log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
    }
}

编译:

$ go build kafkaclient.go
上一篇下一篇

猜你喜欢

热点阅读