java操作kafka生产消费

2017-10-15  本文已影响71人  先生_吕

前言

kafka版本更新之后,其java调用的API也发生了变化,具体是从2.11.0.9.0之后(不包括0.9.0)版本发生了变化,变化之后的API操作更为简洁方便,下面是新版本后的生产消费实现方式,旧版本方式请浏览
http://www.jianshu.com/p/d30419c8ffd4

生产者实例
import kafka.serializer.StringEncoder;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.Producer;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

/**
 * @author lvfang
 * @create 2017-10-15 11:17
 * @desc
 **/
public class KafkaProduce extends Thread {
    private String topic;//主题

    private String src;//数据源

    public KafkaProduce(String topic,String src){
        super();
        this.topic = topic;
        this.src = src;
    }

    //创建生产者
    private Producer<Integer, String> createProducer(){
        Properties properties = new Properties();

        //zookeeper单节点
        properties.put("zookeeper.connect","192.168.90.240:2181");
        properties.put("serializer.class", StringEncoder.class.getName());
        // 声明kafka集群的 broker

        //kafka单节点
        properties.put("metadata.broker.list", "192.168.90.240:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.90.240:9092");

        return new KafkaProducer<Integer, String>(properties);
    }

    public void run() {
        BufferedReader br = null;
        try {

            br = new BufferedReader(new FileReader(src));
            // 创建生产者
            Producer producer = createProducer();

            String line = null;
            // 循环发送消息到kafka
            while ((line = br.readLine()) != null) {
                System.out.println("生产数据为:"+line);
                producer.send(new ProducerRecord(topic,line + "\n"));

                // 发送消息的时间间隔
                Thread.sleep(3000);//TimeUnit.SECONDS.sleep(333);
            }
        } catch (Exception e) {
        } finally {
            try {
                if (br != null) br.close();
            } catch (IOException e) {}
        }
    }

    public static void main(String[] args) {
        // 使用kafka集群中创建好的主题 test
        new KafkaProduce("htb_position_test","D:/testdata/htb_position_test_data.txt").start();
    }
}

2017-10-15_175407.png
消费者实例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;


/**
 * @author lvfang
 * @create 2017-10-15 11:17
 * @desc
 **/
public class KafkaCusumer extends Thread  {
    private String topic;//主题

    public final String SRC = "D:/testdata/testData.txt";

    public KafkaCusumer(String topic){
        super();
        this.topic = topic;
    }

    //创建消费者
    private Consumer<String, String> createConsumer(){
        Properties properties = new Properties();
        //声明zookeeper集群链接地址
        properties.put("zookeeper.connect","192.168.90.240:2181");
        //必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
        properties.put("group.id", "group1");
        properties.put("zookeeper.session.timeout.ms", "100000");
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.90.240:2181");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new KafkaConsumer<String, String>(properties);

    }

    @Override
    public void run() {
        //创建消费者
        Consumer consumer = createConsumer();

        consumer.subscribe(Arrays.asList(topic));
        System.out.println("消费者对象:"  + consumer);
        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(100);
            System.out.println(records);
            for (ConsumerRecord<String, String> record : records) {

                System.out.println(record.key()+record.value());
                System.out.printf("接收到: ", record.offset(), record.key(), record.value());
            }
        }
    }

    public static void main(String[] args) {
        // 使用kafka集群中创建好的主题 test
        new KafkaCusumer("htb_position_test").start();
    }
}

2017-10-15_175519.png
上一篇下一篇

猜你喜欢

热点阅读