Docker 安装kafka 并集成到spring boot

2020-04-11  本文已影响0人  倾国倾城林二狗

#1. 安装kafka前需要安装zookeeper,kafka是需要zookeeper管理的

docker pull wurstmeister/zookeeper

提示是否安装最后版本,Y

#2. 启动zookeeper容器

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

#3.安装kafka

docker pull wurstmeister/kafka

#4.启动kafka

docker run -d --name kafka -p 9092:9092  --env KAFKA_ADVERTISED_HOST_NAME=localhost -e KAFKA_ZOOKEEPER_CONNECT=218.25.54.37:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://218.25.54.37:9092 -e KAFKA_LISTENERS=PLAINTEXT://218.25.54.37:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" --net=host wurstmeister/kafka 

#5 进入到容器中

docker exec -it kafka bash

#6 进入bin文件夹

#7 创建topic 生产者

./kafka-topics.sh --create --zookeeper 218.25.54.37:2181 --replication-factor 1 --partitions 8 --topic test

./kafka-console-producer.sh --broker-list 218.25.54.37:9092 --topic test

#8 打开新的窗口,进入容器,进入bin文件夹 创建消费者

./kafka-console-consumer.sh --bootstrap-server 218.25.54.37:9092 --topic test --from-beginning

在生产者窗口输入消息,在消费者窗口就可以查看了。

生产者截图:

消费者截图:

kafka集成到spring boot

#1 pom 文件

<!--kafka-->

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

</dependency>

<dependency>

<groupId>com.google.code.gson</groupId>

<artifactId>gson</artifactId>

</dependency>

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import java.util.Optional;

@Component

public class KafkaReceiver {

private final org.slf4j.Loggerlog = LoggerFactory.getLogger(getClass());

@KafkaListener(topics = {"linziheng"})

public void listen(ConsumerRecord record) {

Optional kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {

Object message = kafkaMessage.get();

log.info("----------------- record =" + record);

log.info("------------------ message =" + message);

}

}

}

测试接口:

@Autowired

private KafkaTemplatekafkaTemplate;

private Gsongson =new GsonBuilder().create();

@ApiOperation("kafka测试接口")

@PostMapping("/testKafka")

public void getRsJournalByUserId() {

Message message =new Message();

message.setId(System.currentTimeMillis());

message.setMsg(UUID.randomUUID().toString());

message.setSendTime(new Date());

kafkaTemplate.send("linziheng",gson.toJson(message));

}

Message  类

@Data

public class Message {

private Longid;//id

    private Stringmsg;//消息

    private DatesendTime;//时间戳

}

请求接口后,控制台打印信息

上一篇下一篇

猜你喜欢

热点阅读