Spring-Kafka(九)—— 配置消息过滤器
2018-09-13 本文已影响1人
海苔胖胖
消息过滤器
消息过滤器可以在消息抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据再交由KafkaListener处理。
配置消息其实是非常简单的额,只需要为监听容器工厂配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
这里我们将消息转换为long类型,判断该消息为基数还是偶数,把所有基数过滤,监听容器只接收偶数。
@Component
public class FilterListener {
private static final Logger log= LoggerFactory.getLogger(TaskListener.class);
@Autowired
private ConsumerFactory consumerFactory;
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
//配合RecordFilterStrategy使用,被过滤的信息将被丢弃
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy() {
@Override
public boolean filter(ConsumerRecord consumerRecord) {
long data = Long.parseLong((String) consumerRecord.value());
log.info("filterContainerFactory filter : "+data);
if (data % 2 == 0) {
return false;
}
//返回true将会被丢弃
return true;
}
});
return factory;
}
@KafkaListener(id = "filterCons", topics = "topic.quick.filter",containerFactory = "filterContainerFactory")
public void filterListener(String data) {
//这里做数据持久化的操作
log.error("topic.quick.filter receive : " + data);
}
}
测试方法中,我们将当前时间戳发送到Kafka中。
@Test
public void testFilter() throws InterruptedException {
kafkaTemplate.send("topic.quick.filter", System.currentTimeMillis()+"");
}
额,这篇有点短,实在是不知道要讲什么了,Kafka官方文档也是一段描述,连代码都没有,后期想到了再补充吧。。。