单机测试kafka

2020-09-30  本文已影响0人  chen_666

1.安装zookeeper

安装完成后,修改配置文件名

mv zoo_sample.cfg zoo.cfg

2.启动zookeeper

zkServer.sh start

2.安装kafka

安装完成后,修改/config/server.properties

# 改成IP地址,用于intelij访问
host.name=192.168.242.204

3.启动kafka

./kafka-server-start.sh ../config/server.properties

4.创建topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic

./kafka-topics.sh --list --zookeeper localhost:2181

5.创建消费者

./kafka-console-consumer.sh --zookeeper 192.168.242.204:2181 --topic my_topic --from-beginning

6.创建生产者

./kafka-console-producer.sh --broker-list 192.168.242.204:9092 --topic my_topic

7.编写java代码测试消息发送,注意一定要是同步的方式

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaTest {

    public  Properties props;
    @Before
    public void init()  {
        props = new Properties();
        props.put("bootstrap.servers", "192.168.242.204:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重试次数
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待时间
        props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    }


    @Test
    public void sendMsg() throws ExecutionException, InterruptedException{

        KafkaProducer producer = new KafkaProducer(props);
        for (int i = 0;i<100;i++){
            producer.send(new ProducerRecord<String, String>("my_topic","hello kafka"+i)).get();
        }
    }
}

上一篇 下一篇

猜你喜欢

热点阅读