SPRING BOOT结合kafka

2019-03-22  本文已影响0人  蜡笔广志

pom配置

```

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

</dependency>

```

代码设置

生产者 

```

@RestController

public class KafkaController extends BaseController{

// 服务端接口日志

protected static final Logger logger = LoggerFactory.getLogger(KafkaController.class);

@Autowired

    private KafkaTemplate<String, String> kafkaTemplate;

@CrossOrigin(origins = "*", maxAge = 3600)

@RequestMapping(value = "/send", method = RequestMethod.GET)

@ResponseBody

public JSONObject send(HttpServletRequest request, HttpServletResponse response) throws Exception {

logger.info(String.valueOf(RequestHolder.getId()));

String msg = request.getParameter("msg");

try {

kafkaTemplate.send("topic", msg);

JSONObject jsonObject = new JSONObject();

jsonObject.put("return_code", "0");

return jsonObject;

} catch (Exception e) {

logger.error("ExChangeController.commandDetail error.", e);

return getNetJson();

}

}

}

```

消费者

```

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

@Component

public class KafkaReceiver {

protected static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);

    @KafkaListener(topics = {"topic"})

    public void listen(ConsumerRecord<?, ?> record) {

    logger.debug("消费接收:  topic = "+record.topic() +" , value = "+record.value());

    }

}

```

application文件配置

```

#============== kafka ===================

# 指定kafka 代理地址,可以多个

spring.kafka.bootstrap-servers=10.241.95.105:9092,10.241.95.106:9092,10.241.95.107:9092

#=============== provider  =======================

spring.kafka.producer.retries=0

# 每次批量发送消息的数量

spring.kafka.producer.batch-size=16384

spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================

# 指定默认消费者group id    同一个groupid只会有一个客户端收到消息

spring.kafka.consumer.group-id=kafka-group1

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

```

PS

卡夫卡消息消费完之后并不会直接把这个消息从队列中移出,因此当有新的groupid加入进来的话,已发送的消息将会再次发送一遍,卡夫卡的消息是保存在磁盘中的,直到用户配置的过期时间到了才会删除

默认

# 消息失效期,7天log.retention.hours=168

上一篇 下一篇

猜你喜欢

热点阅读