SPRING BOOT结合kafka
pom配置
```
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
```
代码设置
生产者
```
@RestController
public class KafkaController extends BaseController{
// 服务端接口日志
protected static final Logger logger = LoggerFactory.getLogger(KafkaController.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@CrossOrigin(origins = "*", maxAge = 3600)
@RequestMapping(value = "/send", method = RequestMethod.GET)
@ResponseBody
public JSONObject send(HttpServletRequest request, HttpServletResponse response) throws Exception {
logger.info(String.valueOf(RequestHolder.getId()));
String msg = request.getParameter("msg");
try {
kafkaTemplate.send("topic", msg);
JSONObject jsonObject = new JSONObject();
jsonObject.put("return_code", "0");
return jsonObject;
} catch (Exception e) {
logger.error("ExChangeController.commandDetail error.", e);
return getNetJson();
}
}
}
```
消费者
```
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaReceiver {
protected static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
@KafkaListener(topics = {"topic"})
public void listen(ConsumerRecord<?, ?> record) {
logger.debug("消费接收: topic = "+record.topic() +" , value = "+record.value());
}
}
```
application文件配置
```
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=10.241.95.105:9092,10.241.95.106:9092,10.241.95.107:9092
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默认消费者group id 同一个groupid只会有一个客户端收到消息
spring.kafka.consumer.group-id=kafka-group1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
```
PS
卡夫卡消息消费完之后并不会直接把这个消息从队列中移出,因此当有新的groupid加入进来的话,已发送的消息将会再次发送一遍,卡夫卡的消息是保存在磁盘中的,直到用户配置的过期时间到了才会删除
默认
# 消息失效期,7天log.retention.hours=168