Springboot整合kafka的使用

2018-05-25  本文已影响0人  wayne_wzy

本文章针对自己在使用kafka和springboot过程上做一些记录。

1 .kafka介绍

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)
是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

2 .整合过程

2.1 pom依赖(和SpringBoot整合过程中添加此依赖即可)
  <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
  </dependency>
2.2 kafka相关配置
 在使用kafka过程中,尽量不要使用它自带的zookeper,因为有可能访问不通。项目中我使用的是yml配置文件,其配置如下:
#kafka相关配置
  kafka:
    #kafka服务器地址
    bootstrap-servers: 127.0.0.1:9092  
    #key-value序列化
    consumer:
      group-id: consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.3 kafka使用详解
/**
  * kafka配置类
  */
@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<String,String>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

}
/**
 * @author Wayne
 * @date 2018/5/25
 * <p>
 * desc: kafka消息生产端
 */
@Component
public class KafkaProducer  {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    public void sendMessage(String topic, String message) {
        // the KafkaTemplate provides asynchronous send methods returning a
        // Future
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);

        // you can register a callback with the listener to receive the result
        // success回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                ProducerRecord<String, String> producerRecord = result.getProducerRecord();
                String key = producerRecord.key();
                String value = producerRecord.value();
                RecordMetadata recordMetadata = result.getRecordMetadata();
                String topic = recordMetadata.topic();
                logger.info("send kafka message='{}' with offset={}", message, result.getRecordMetadata().offset());
                logger.info("key = '{}'" , key);
                logger.info("value = '{}'",value);
                logger.info("topic = '{}'",topic);
            }
            
            @Override
            public void onFailure(Throwable ex) {
                logger.debug("unable to send kafka message='{}'", message, ex);
            }
        });
    }
}

测试使用 :

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping(value = "/kafka")
    public Response kafkaTest() {
        KafkaMessage kafkaMessage = new KafkaMessage(KafkaMessageType.KAFKA_MESSAGE_TEST, "kafka测试");
        kafkaProducer.sendMessage(KafkaTopicConstant.USER_KAFKA_TOPIC, new Gson().toJson(kafkaMessage));
        return HttpUtil.ResponseSuccess();
    }

Created by wayne 2018/5/25

上一篇下一篇

猜你喜欢

热点阅读