Spring Boot开发 之 Kafka

2018-11-05  本文已影响16人  诺之林

本文的示例代码参考KafkaBasic

目录

Kafka

搭建

docker run --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime -d zookeeper

docker run --name kafka -p 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=192.168.1.76:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.76 --env KAFKA_ADVERTISED_PORT=9092 -v /etc/localtime:/etc/localtime -d wurstmeister/kafka

测试

docker exec -it kafka /bin/bash

cd /opt/kafka_2.11-2.0.0/
# 创建topic
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test

bin/kafka-topics.sh --list --zookeeper zookeeper:2181
# 消息生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# hello kafka
docker exec -it kafka /bin/bash

cd /opt/kafka_2.11-2.0.0/
# 消息消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# hello kafka

Spring

spring init -b 1.5.6.RELEASE -dweb,kafka --build gradle KafkaBasic && cd KafkaBasic
vim ./src/main/resources/application.properties
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=test

Model

vim ./src/main/java/com/example/KafkaBasic/Message.java
package com.example.KafkaBasic;

import java.util.Date;

public class Message {
    private Long id;

    private String msg;

    private Date sendTime;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}

Sender

vim ./src/main/java/com/example/KafkaBasic/Sender.java
package com.example.KafkaBasic;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

@Component
public class Sender {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public String send() throws JsonProcessingException {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());

        ObjectMapper objectMapper = new ObjectMapper();
        String json = objectMapper.writeValueAsString(message);

        kafkaTemplate.send("test", json);
        return json;
    }
}
vim ./src/main/java/com/example/KafkaBasic/SenderConfig.java
package com.example.KafkaBasic;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class SenderConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String,Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "0");
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return  new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

    @Bean
    public Sender sender() {
        return new Sender();
    }
}

Receiver

vim ./src/main/java/com/example/KafkaBasic/Receiver.java
package com.example.KafkaBasic;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
public class Receiver {

    @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println(message);
        }
    }
}

Controller

vim ./src/main/java/com/example/KafkaBasic/TestsController.java
package com.example.KafkaBasic;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/tests")
public class TestsController {

    @Autowired
    private Sender sender;

    @GetMapping
    public String index() throws JsonProcessingException {
        return sender.send();
    }
}
./gradlew bootrun
curl localhost:8080/tests | json
{
  "id": 1541415087205,
  "msg": "3068cc02-8122-459d-b8cf-02ac240ae573",
  "sendTime": 1541415087205
}
终端打印:
{"id":1541415087205,"msg":"3068cc02-8122-459d-b8cf-02ac240ae573","sendTime":1541415087205}

参考

上一篇下一篇

猜你喜欢

热点阅读