Springboot2.x 实现redis的 发布/订阅 模型

2020-08-27  本文已影响0人  骑蚂蚁上高速_jun

1 . redis的基本配置 参照上一篇文章

  1. configuration 配置redis
package cn.waimaolang.demo.configura;

import cn.waimaolang.demo.command.MyMessageListenCommand;
import cn.waimaolang.demo.service.MessageConsumerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.io.Serializable;


@Configuration
public class RedisConfigura {
    /**
     * 使用基本的redis
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate<String, Serializable> redisTemplate(
            LettuceConnectionFactory factory
    ) {
        RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer()); // redis 序列化数据的方式
        /* 能与其他语言 相互编码缓存 value编码方式,如果与其他语言混合开发项目,需要获取相同的缓存,
        则使用此种方式编码 */
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);        

        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }

    /**
     * 发布/订阅
     * RedisMessageListenerContainer提供订阅消息的多路分发,这样多个订阅可以共享同一个Redis连接.
     * 需要 自行实现监听 类 MyMessageListenCommand
     * 使用 名为  tenmao.blog.channel  作为消息管道
     */
    @Bean
    public RedisMessageListenerContainer redisContainer(LettuceConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(new MyMessageListenCommand(), new ChannelTopic("tenmao.blog.channel"));
        return container;
    }

    /**
     * 发布/订阅 实现2 ,需要调用listenerAdapter 方法
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     * 需要 自行实现 MessageConsumerService 类receiveMessage() 方法
     */
    @Bean
    public RedisMessageListenerContainer container(
        RedisConnectionFactory connectionFactory,
        MessageListenerAdapter listenerAdapter
    ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("queue"));
        return container;
    }

    /**
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(MessageConsumerService receiver) {
        return new MessageListenerAdapter(new MessageConsumerService(), "receiveMessage");
    }

}

  1. 实现监听类 MyMessageListenCommand 作为 消费端逻辑
package cn.waimaolang.demo.command;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;


@Slf4j
public class MyMessageListenCommand  implements MessageListener {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void onMessage(@NonNull Message message, byte[] pattern) {

        logger.error("message received: {}", message);
    }
}

4 . 实现生产者

@Autowired
RedisTemplate redisTemplate;
redisTemplate.convertAndSend("tenmao.blog.channel", "hello world");
上一篇下一篇

猜你喜欢

热点阅读