从kafka获取信息写入redis集群

2017-05-06  本文已影响0人  pingforever

1. 环境

2. 问题说明

接收GW网关设备发送的radius报文,量级在2000TPS左右,本来想用flume接收udp报文。测试时发现只能使用syslogudp方式,但是解析的报文会丢失很多字段排查后发现flume入库时报文信息被截断了。从git上查到几个udp插件,奈何不搞java了还要编译成jar包遂决定自己写。目前测试了单机从kafka获取数据并解析的效率在3W/s, 加上redis入库后降到1.5w/s

2. kafka连接

本来是想使用travisjeffery/jocko但是发现不支持group,所以使用的confluentinc/confluent-kafka-go 虽然星星不是太多,但是看到这个Confluent's Apache Kafka Golang client你懂的。

使用该模块前,需要安装librdkafka,在安装时遇到

make[1]: 离开目录“/slview/librdkafka-master/src-cpp”
make -C examples
make[1]: 进入目录“/slview/librdkafka-master/examples”
gcc -g -O2 -fPIC -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align -I../src rdkafka_example.c -o rdkafka_example  \
        ../src/librdkafka.a -lpthread -lz -lssl -lrt
/bin/ld: ../src/librdkafka.a(rdkafka_sasl_scram.o): undefined reference to symbol 'BIO_read'
/bin/ld: note: 'BIO_read' is defined in DSO /usr/local/ssl/lib/libcrypto.so.1.0.0 so try adding it to the linker command line
/usr/local/ssl/lib/libcrypto.so.1.0.0: could not read symbols: 无效的操作
collect2: 错误:ld 返回 1
make[1]: *** [rdkafka_example] 错误 1
make[1]: 离开目录“/slview/librdkafka-master/examples”

google好半天没发现解决方法,最后还是先避开吧。不编译ssl可以成功

./configure --disable-ssl

连接方式:

c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":    broker,
        "group.id":             group,
        "session.timeout.ms":   6000,
        "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "smallest"}})

3. redis连接

redis 3.2在建立集群是使用的内网地址,使用对应的外网地址无法连接,不清是否是配置问题

client := redis.NewClusterClient(&redis.ClusterOptions{
Addrs:        []string{"172.17.50.73:6379", "172.17.50.74:6379", "172.17.50.75:6379", "172.17.50.76:6379", "172.17.50.77:6379", "172.17.50.78:6379"},
ReadOnly:    false,
DialTimeout:  10 * time.Second,
ReadTimeout:  30 * time.Second,
WriteTimeout: 30 * time.Second,
PoolSize:    6000,
PoolTimeout:  30 * time.Second,
})

if client == nil {
return client, ConnectError
}

4. radius报文解析

使用bronze1man/radius模块,代码如下:

radinfo := make(map[string]interface{})

pac, err := radius.DecodePacket(secret, p)
if err != nil {
    log.Println("[pac.Decode]", err)
    return
}

for i := range pac.AVPs {
    Type := pac.AVPs[i].Type
    if Type == radius.EventTimestamp {
        Value := pac.AVPs[i].Decode(pac).([]uint8)
        timestamp := binary.BigEndian.Uint32([]byte(Value))
        timestr := time.Unix(int64(timestamp), 0).Format("2006-01-02 15:04:05")
        Typestr := fmt.Sprintf("%s", Type)
        radinfo[Typestr] = timestr
        //fmt.Printf("Type: %s    Value: %s\n", Type, timestr)
    } else {
        Typestr := fmt.Sprintf("%s", Type)

        if _, ok := Item[Typestr]; ok {
            Value := pac.AVPs[i].GetValue()
            radinfo[Typestr] = Value
        }
    }
}

5. 入库后结果

172.17.50.76:6379> hgetall 86xxxxxxxxxx
 1) "SessionID"
 2) "73aa0e5b1fca2b97"
 3) "APN"
 4) "ctnet"
 5) "Status"
 6) "Stop"
 7) "StartTime"
 8) ""
 9) "MDN"
10) "xxxxxxxxx"
11) "IPAddr"
12) "x.x.x.x"
13) "StopTime"
14) "2017-05-06 14:45:44"
15) "TerminateCause"
16) "UserRequest"
17) "Duration"
18) "2846"
19) "OutputOctets"
20) "0"
21) "InputOctets"
22) "0"
上一篇下一篇

猜你喜欢

热点阅读