Springboot整合kafka的使用
2018-05-25 本文已影响0人
wayne_wzy
本文章针对自己在使用kafka和springboot过程上做一些记录。
1 .kafka介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)
是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
2 .整合过程
2.1 pom依赖(和SpringBoot整合过程中添加此依赖即可)
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.2 kafka相关配置
在使用kafka过程中,尽量不要使用它自带的zookeper,因为有可能访问不通。项目中我使用的是yml配置文件,其配置如下:
#kafka相关配置
kafka:
#kafka服务器地址
bootstrap-servers: 127.0.0.1:9092
#key-value序列化
consumer:
group-id: consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.3 kafka使用详解
/**
* kafka配置类
*/
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<String,String>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
HashMap<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
/**
* @author Wayne
* @date 2018/5/25
* <p>
* desc: kafka消息生产端
*/
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
public void sendMessage(String topic, String message) {
// the KafkaTemplate provides asynchronous send methods returning a
// Future
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
// you can register a callback with the listener to receive the result
// success回调
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
ProducerRecord<String, String> producerRecord = result.getProducerRecord();
String key = producerRecord.key();
String value = producerRecord.value();
RecordMetadata recordMetadata = result.getRecordMetadata();
String topic = recordMetadata.topic();
logger.info("send kafka message='{}' with offset={}", message, result.getRecordMetadata().offset());
logger.info("key = '{}'" , key);
logger.info("value = '{}'",value);
logger.info("topic = '{}'",topic);
}
@Override
public void onFailure(Throwable ex) {
logger.debug("unable to send kafka message='{}'", message, ex);
}
});
}
}
测试使用 :
@Autowired
private KafkaProducer kafkaProducer;
@GetMapping(value = "/kafka")
public Response kafkaTest() {
KafkaMessage kafkaMessage = new KafkaMessage(KafkaMessageType.KAFKA_MESSAGE_TEST, "kafka测试");
kafkaProducer.sendMessage(KafkaTopicConstant.USER_KAFKA_TOPIC, new Gson().toJson(kafkaMessage));
return HttpUtil.ResponseSuccess();
}
Created by wayne 2018/5/25