IT@程序员猿媛SpringBoot精选

SpringBoot2.1 Redis 发布/订阅模式

2019-12-19  本文已影响0人  程就人生

Redis支持发布/订阅模式,在发布订阅模式中,主要涉及到三个角色:

在SpringBoot2.1.4版本,先来一个简单的发布订阅demo,这里有一个发布者,发布了两个通道;有三个订阅者,第一个订阅者订阅第一个通道,第二个订阅者订阅第二个通道,第三个订阅者同时订阅两个通道。

首先,需要在pom文件中引入必须的架包;

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.1.0</version><!-- 加版本号,是预防在使用@Test注解测试时,找不到JedisPubSub类 -->
        </dependency>       
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

第二步,上测试demo;

import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
/**
 * Jedis发布/订阅者测试
 * @author 程就人生
 * @date 2019年12月19日
 */
public class JedisTest {

    //发布者
    @Test
    public void pub(){      
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        //发布者1
        jedis.publish("publish", "我是发布者,我发布了一些内容,aass");
        jedis.publish("publish2", "我是发布者2,我发布了一些内容,bbbbb");
    }
    
    //第一个订阅者    
    //@Test
    public void sub1(){
        Jedis jedis = new Jedis("127.0.0.1", 6379); 
        jedis.subscribe(new JedisPubSub(){
            @Override
            public void onMessage(String channel, String message) {
                System.out.println("第一个订阅者");
                System.out.println("订阅的通道为:" + channel);
                System.out.println("接收到的内容为:" + message);
            }
        }, "publish");
    }
    
    //第二个订阅者    
    //@Test
    public void sub2(){
        Jedis jedis = new Jedis("127.0.0.1", 6379); 
        jedis.subscribe(new JedisPubSub(){
            @Override
            public void onMessage(String channel, String message) {
                System.out.println("第二个订阅者");
                System.out.println("订阅的通道为:" + channel);
                System.out.println("接收到的内容为:" + message);
            }
        }, "publish2");
    }
        
    //第三个订阅者    
    //@Test
    public void sub3(){
        Jedis jedis = new Jedis("127.0.0.1", 6379); 
        jedis.subscribe(new JedisPubSub(){
            @Override
            public void onMessage(String channel, String message) {
                System.out.println("第三个订阅者");
                System.out.println("订阅的通道为:" + channel);
                System.out.println("接收到的内容为:" + message);
            }
        }, "publish", "publish2");
    }
}

最后,测试;先把订阅者一个一个启动起来,最后再启动发布者,这时可以看到每个订阅者都收到了自己该收到的消息;

第一个订阅者
订阅的通道为:publish
接收到的内容为:我是发布者,我发布了一些内容,aass

第二个订阅者
订阅的通道为:publish2
接收到的内容为:我是发布者2,我发布了一些内容,bbbbb

第三个订阅者
订阅的通道为:publish
接收到的内容为:我是发布者,我发布了一些内容,aass
第三个订阅者
订阅的通道为:publish2
接收到的内容为:我是发布者2,我发布了一些内容,bbbbb

在这个demo中,使用的是Jedis框架,使用极其简单;但是,在实际的开发中,肯定没有这么简单,如何才能和SpringBoot更好的地结合呢,这就需要单独地写Redis的配置文件;

首先,还是pom文件里引入必须的架包;

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

第二步,redis配置文件,这里使用了Lettuce框架,也可以使用Jedis框架,Jedis框架配置参数设置比较麻烦;

package com.example.demo.config;

import java.io.Serializable;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * redis配置
 * @author 程就人生
 * @date 2019年12月19日
 */
@Configuration
@EnableCaching//开启注解式缓存
public class SRedisConfig extends CachingConfigurerSupport {

    /**
     * 配置lettuce连接池(多数据源的公共参数)
     * GenericObjectPoolConfig不是线程安全的
     * @return
     */
    @SuppressWarnings("rawtypes")
    @Bean
    @ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
    public GenericObjectPoolConfig redisPool() {
        return new GenericObjectPoolConfig<>();
    }

    /**
     * 配置数据源
     *
     * @return
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.redis")
    public RedisStandaloneConfiguration redisConfig() {
        return new RedisStandaloneConfiguration();
    }

    /**
     *
     * @param config
     * @param redisConfig
     * @return
     */
    @SuppressWarnings("rawtypes")
    @Bean("factory")
    @Primary
    public LettuceConnectionFactory factory(GenericObjectPoolConfig config, RedisStandaloneConfiguration redisConfig) {
      LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(config).build();
      return new LettuceConnectionFactory(redisConfig, clientConfiguration);
    }
    
    /**
     *
     * @param factory
     * @return
     */
    @Bean("redisTemplate")
    public RedisTemplate<Serializable, Serializable> redisTemplate(@Qualifier("factory") RedisConnectionFactory factory) {
        
        RedisTemplate<Serializable, Serializable> template = new RedisTemplate<>();
      
        template.setConnectionFactory(factory);
    
        //使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());
          
        template.setValueSerializer(new JdkSerializationRedisSerializer());
          
        template.setHashKeySerializer(new StringRedisSerializer());
          
        template.setHashValueSerializer(new JdkSerializationRedisSerializer());
        
        //开启事务
        template.setEnableTransactionSupport(true);
          
        template.afterPropertiesSet();
        
        return template;
    }
}

第三步,测试代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 发布订阅测试
 * @author 程就人生
 * @date 2019年12月19日
 */
@RestController
public class Index1Controller {

    @Autowired
    LettuceConnectionFactory factory;
    
    //发布次数
    public static int i=0;
    
    //订阅
    @GetMapping("/sub")
    public void sub(){
        factory.getConnection().subscribe(new MessageListener(){
            @Override
            public void onMessage(Message message, byte[] pattern) {
                System.out.println(message);
            }
            
        }, "Lettuce通道".getBytes());
    }
    
    //发布
    @GetMapping("/pub")
    public void pub(){
        factory.getConnection().publish("Lettuce通道".getBytes(), ("我是Lettuce"+(i++)).getBytes());
    }
    
}

最后,测试;启动入口程序,先订阅再发布,这样订阅者才能收到发布者发布的消息,先发布再订阅,那么订阅事件之前发布的信息将不能收到。

测试结果图
上一篇下一篇

猜你喜欢

热点阅读