Kafka demo

2021-05-08  本文已影响0人  小玉1991

kafka在工作中用到,需要有生产者提供数据。最后自己做了一个demo出来。可以正常发送和接受数据。数据可以通过可视化工具 Offset Explorer 2 查看。
先上代码:

pom.xml

<properties>
        <java.version>1.8</java.version>
        <kafka-clients.version>0.9.0.1</kafka-clients.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- kafka begin -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka-clients.version}</version>
        </dependency>
        <!-- kafka end -->
    </dependencies>

productor 注意:这个要用send().get()方式同步发送消息。没有get()是异步发生,因为main程序很快就结束了,所以消息就没有发送出去。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class Producer {

    private final org.apache.kafka.clients.producer.KafkaProducer producer;
//    public final static String TOPIC = "op_move_error_event";
    public final static String TOPIC = "op_move_event_dev";

    private Producer() {

        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01.dev.in.songguo7.com:9092,kafka02.dev.in.songguo7.com:9092,kafka03.dev.in.songguo7.com:9092\n");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer(props);
    }

    void produce() {
        try {
            producer.send(new ProducerRecord<>("op_move_event_dev", "1111", "{move_type:123}")).get();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("error--->"+e);
        }
        System.out.println("ok");
    }

    public static void main(String[] args) {
        new Producer().produce();
    }
}

Consumer:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class Consumer {

    private final KafkaConsumer consumer;

    private Consumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01.dev.in.songguo7.com:9092,kafka02.dev.in.songguo7.com:9092,kafka03.dev.in.songguo7.com:9092\n");
        //group 代表一个消费组
        props.put("group.id", "7_bike_geo_new");

        props.put("enable.auto.commit", true);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        consumer=new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList(Producer.TOPIC));
    }

    void consume() {
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(1);
            if (!consumerRecords.isEmpty()) {
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("TopicName: " + consumerRecord.topic() + " Partition:" +
                            consumerRecord.partition() + " Offset:" + consumerRecord.offset() + "" +
                            " Msg:" + consumerRecord.value());
                    //进行逻辑处理
                }

            }
        }
    }
    public static void main(String[] args) {
        new Consumer().consume();
    }
}

结果:


可视化工具
consumer
上一篇下一篇

猜你喜欢

热点阅读