Redis发布与订阅

2020-08-19  本文已影响0人  蓝色Hippie

一、发布与订阅

实际中,redis很少使用发布与订阅来代替MQ角色。

二、使用redis客户端实现

    Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。一个redis客户端可以订阅任意多大频道channel,一个频道也可以被多个客户端订阅。

    1.创建并监听频道

发布

    2.向创建的频道发送一条消息

订阅

三、demo

1.订阅者需要继承抽象类JedisPubSub;重写方法onMessage接收发布者发送的消息

Farmer Worker Programmer

2.定义发布者

3.测试类

public class Main {

    public static final String CHANNEL_NAME = "MyChannel";

    public static final String REDIS_HOST = "localhost";

    public static final int REDIS_PORT = 6379;

    private final static Logger logger = Logger.getLogger(Main.class);

    private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();

    private final static JedisPool JEDIS_POOL =

            new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0);

    public static void main(String[] args) throws Exception {

        PropertyConfigurator.configure("src/log4j.properties");

        /*订阅者redis客户端*/

        final Jedis farmerJedis = JEDIS_POOL.getResource();

        final Jedis workerJedis = JEDIS_POOL.getResource();

        final Jedis programmerJedis = JEDIS_POOL.getResource();

        /*发布者redis*/

        final Jedis publisherJedis = JEDIS_POOL.getResource();

        final Farmer farmer = new Farmer();

        final Worker worker = new Worker();

        final Programmer programmer  = new Programmer();

        //订阅线程:接收消息,因为Jedis是以阻塞的方式等待发布者消息的,所以每个Jedis客户端必须对应一个独立的线程。不然只会有第一个Jedis接受到消息。

        new Thread(new Runnable() {

            public void run() {

                try {

                    farmerJedis.subscribe(farmer,CHANNEL_NAME);

                    logger.info("Subscription ended.");

                } catch (Exception e) {

                    logger.error("Subscribing failed.", e);

                }

            }

        }).start();

        new Thread(new Runnable() {

            public void run() {

                try {

                    workerJedis.subscribe(worker,CHANNEL_NAME);

                    logger.info("Subscription ended.");

                } catch (Exception e) {

                    logger.error("Subscribing failed.", e);

                }

            }

        }).start();

        new Thread(new Runnable() {

            public void run() {

                try {

                    programmerJedis.subscribe(programmer,CHANNEL_NAME);

                    logger.info("Subscription ended.");

                } catch (Exception e) {

                    logger.error("Subscribing failed.", e);

                }

            }

        }).start();

        //主线程:发布消息到CHANNEL_NAME频道上

        WeatherServer weatherServer = new WeatherServer();

        weatherServer.publishMessage(publisherJedis,CHANNEL_NAME,"rain");

        farmerJedis.close();

        workerJedis.close();

        programmerJedis.close();

    }

}

上一篇 下一篇

猜你喜欢

热点阅读