java操作kafka生产消费
2017-10-15 本文已影响71人
先生_吕
前言
kafka版本更新之后,其java调用的API也发生了变化,具体是从2.11.0.9.0之后(不包括0.9.0)版本发生了变化,变化之后的API操作更为简洁方便,下面是新版本后的生产消费实现方式,旧版本方式请浏览
http://www.jianshu.com/p/d30419c8ffd4
生产者实例
import kafka.serializer.StringEncoder;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.Producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
/**
* @author lvfang
* @create 2017-10-15 11:17
* @desc
**/
public class KafkaProduce extends Thread {
private String topic;//主题
private String src;//数据源
public KafkaProduce(String topic,String src){
super();
this.topic = topic;
this.src = src;
}
//创建生产者
private Producer<Integer, String> createProducer(){
Properties properties = new Properties();
//zookeeper单节点
properties.put("zookeeper.connect","192.168.90.240:2181");
properties.put("serializer.class", StringEncoder.class.getName());
// 声明kafka集群的 broker
//kafka单节点
properties.put("metadata.broker.list", "192.168.90.240:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.90.240:9092");
return new KafkaProducer<Integer, String>(properties);
}
public void run() {
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(src));
// 创建生产者
Producer producer = createProducer();
String line = null;
// 循环发送消息到kafka
while ((line = br.readLine()) != null) {
System.out.println("生产数据为:"+line);
producer.send(new ProducerRecord(topic,line + "\n"));
// 发送消息的时间间隔
Thread.sleep(3000);//TimeUnit.SECONDS.sleep(333);
}
} catch (Exception e) {
} finally {
try {
if (br != null) br.close();
} catch (IOException e) {}
}
}
public static void main(String[] args) {
// 使用kafka集群中创建好的主题 test
new KafkaProduce("htb_position_test","D:/testdata/htb_position_test_data.txt").start();
}
}
2017-10-15_175407.png
消费者实例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
/**
* @author lvfang
* @create 2017-10-15 11:17
* @desc
**/
public class KafkaCusumer extends Thread {
private String topic;//主题
public final String SRC = "D:/testdata/testData.txt";
public KafkaCusumer(String topic){
super();
this.topic = topic;
}
//创建消费者
private Consumer<String, String> createConsumer(){
Properties properties = new Properties();
//声明zookeeper集群链接地址
properties.put("zookeeper.connect","192.168.90.240:2181");
//必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
properties.put("group.id", "group1");
properties.put("zookeeper.session.timeout.ms", "100000");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.90.240:2181");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new KafkaConsumer<String, String>(properties);
}
@Override
public void run() {
//创建消费者
Consumer consumer = createConsumer();
consumer.subscribe(Arrays.asList(topic));
System.out.println("消费者对象:" + consumer);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println(records);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key()+record.value());
System.out.printf("接收到: ", record.offset(), record.key(), record.value());
}
}
}
public static void main(String[] args) {
// 使用kafka集群中创建好的主题 test
new KafkaCusumer("htb_position_test").start();
}
}
2017-10-15_175519.png