kafka node 使用
KafkaClient:
概念:能够直接连接Kafka brokers的client
初始化:const client = new kafka.KafkaClient({kafkaHost: '10.3.100.196:9092'}); 没有填写kafkaHost默认是:localhost:9092
Producer
初始化:Producer(KafkaClient, [options], [customPartitioner])
例:
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient(),
producer = new Producer(client);
发送消息:
send(payloads, cb)
payloads:数组,item形如json
{
topic: 'topicName',
messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
key: 'theKey', // string or buffer, only needed when using keyed partitioner
partition: 0, // default 0
attributes: 2, // default: 0
timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10+)
}
cb: 处理成功或失败的回调函数
创建Topic
createTopics(topics, cb)
HighLevelProducer
HighLevelProducer(KafkaClient, [options], [customPartitioner])
send(payloads, cb)
createTopics(topics, async, cb)
ProducerStream
ProducerStream (options)
-
highWaterMark
size of write buffer (Default: 100) -
kafkaClient
options see KafkaClient -
producer
options for Producer see HighLevelProducer
案例:
使用Transform去更新数据
Consumer
Consumer(client, payloads, options)
on('error', function (err) {})
on('offsetOutOfRange', function (err) {})
addTopics(topics, cb, fromOffset)
removeTopics(topics, cb)
commit(cb)
setOffset(topic, partition, offset)
pause()
resume()
pauseTopics(topics)
resumeTopics(topics)
close(force, cb)
ConsumerStream
ConsumerStream(client, payloads, options)
ConsumerGroup
ConsumerGroup(options, topics)