kafka c++客户端结合avro c++使用指南

2018-11-22  本文已影响0人  安安爸Chris

avro是什么这里不赘述了。不懂的同学请翻阅资料。

kafka c++客户端的安装请参考我的另外一篇《kafka c++客户端安装指南》

为了方便,kafka c++客户端直接用了cppkafka。

安装avro c++

下载地址 https://avro.apache.org/releases.html

我安装的版本是1.8.3,大家自行参考

安装目录在lang/c++下,使用cmake编译。编译完成后会生成
动态库和静态库
libavrocpp_s.a
libavrocpp.so.1.8.3-SNAPSHOT.0

avro c++使用

参考官方教程 http://avro.apache.org/docs/1.8.0/api/cpp/html/
主要的使用代码demo,在lang/c++/samples下都有。参考着看。

和cppkafka配合使用需要注意的地方

avro encode的时候,一定要注意编码的转换。下面是参考示例。

    // 自定义结构,对应的avro json未request.json。
    // 内容省略
    request req; 

    auto out = avro::memoryOutputStream();  // 获取输出流
    avro::EncoderPtr e = avro::binaryEncoder(); // 编码器
    e->init(*out); // 用输出流初始化编码器

    avro::encode(*e, req); // 将req内容编码

    // 编码后的输出流需要转输入流,再转给cppkafka的Buffer对象接收
    auto in = avro::memoryInputStream(*out);

    // 注意avro中底层储存字节都是unsigned char,而不是常见的string中的char!
    // 所以先用unsigned char的stringbuf接收流中的内容
    basic_stringbuf<unsigned char> strbuf;
    basic_ostream<unsigned char> os(&strbuf);

    const unsigned char *h = NULL;
    const unsigned char *p = NULL;
    size_t total=0;
    size_t n = 0;
    while (in->next(&p, &n)) {
        os.write(p, n);
        if(total == 0) {
            h = p;
        }
        total += n;
    }    

    // 初始化kafka broker list
    cppkafka::Configuration config = {
            { "metadata.broker.list", "192.168.2.90:9092" }
    };

    // 这个是个绕人的地方,初始化cppkafka buffer的时候不可以使用stringbuf的str函数。会导致编码错乱。
    cppkafka::Buffer buf(p, n);
    cppkafka::MessageBuilder builder("test");
    builder.payload(buf);
    cppkafka::Producer producer(config);
    producer.produce(builder);
    producer.flush();
上一篇下一篇

猜你喜欢

热点阅读