kafka node 使用

2020-04-02  本文已影响0人  小旎子_8327

KafkaClient:

概念:能够直接连接Kafka brokers的client
初始化:const client = new kafka.KafkaClient({kafkaHost: '10.3.100.196:9092'}); 没有填写kafkaHost默认是:localhost:9092

Producer

初始化:Producer(KafkaClient, [options], [customPartitioner])
例:

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    client = new kafka.KafkaClient(),
    producer = new Producer(client);

发送消息:
send(payloads, cb)

payloads:数组,item形如json
{
   topic: 'topicName',
   messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
   key: 'theKey', // string or buffer, only needed when using keyed partitioner
   partition: 0, // default 0
   attributes: 2, // default: 0
   timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10+)
}
cb: 处理成功或失败的回调函数

创建Topic
createTopics(topics, cb)

HighLevelProducer

HighLevelProducer(KafkaClient, [options], [customPartitioner])
send(payloads, cb)
createTopics(topics, async, cb)

ProducerStream

ProducerStream (options)

案例:
使用Transform去更新数据

Consumer

Consumer(client, payloads, options)
on('error', function (err) {})
on('offsetOutOfRange', function (err) {})

addTopics(topics, cb, fromOffset)
removeTopics(topics, cb)
commit(cb)
setOffset(topic, partition, offset)
pause()
resume()
pauseTopics(topics)
resumeTopics(topics)
close(force, cb)

ConsumerStream

ConsumerStream(client, payloads, options)

ConsumerGroup

ConsumerGroup(options, topics)

上一篇下一篇

猜你喜欢

热点阅读