rabbitMQ+redis+socket.io实现消息推送

2020-03-06  本文已影响0人  web_柚子小记

用pm2起一个nodejs服务,安装amqp-connection-manager,ioredis:
1)建立amqp连接,创建channels,绑定queue,并消费:

const amqp = require('amqp-connection-manager');
const amqpConfig = {
  "amqp": {
    "cluster": [
      "amqp://用户名:密码@服务器ip:5672"
    ],
    "exchange": "****"
  }
}
amqp.connect(amqpConfig.amqp.cluster, { json: true })
        .createChannel({ setup: channelSetup })
        .waitForConnect().then(() => {
            console.log("MQ client is listening for messages.");
        }).catch(err => {
            console.log("MQ client setup failed.", err);
        });

function channelSetup(ch) {
        return ch.assertExchange(Config.amqp.exchange, 'direct', { durable: true })
            .then(function (exchg) {
                //消息类型
                var qp_new_task = `push.queue.task.new`;
                var qks = [
                    { "queue": qp_new_task, "key": qp_new_task, "durable": true }
                ];

                _.each(qks, function (qk) {
                    ch.assertQueue(qk.queue, { durable: qk.durable }).then(function (qok) {
                        return qok.queue;
                    }).then(function (queue) {
                        return ch.bindQueue(queue, exchg.exchange, qk.key).then(function () {
                            return queue;
                        });
                    }).then(function (queue) {
                        console.log("*** Message Queue [" + queue + "] ready");

                        return ch.consume(queue, function (msg) {
                            console.log('[receive message from amqp]')
                        }, { noAck: true });
                    });
                });
            });
    }
如果amqp连接成功,会在amqp的管理界面看到如下节点信息: image.png

2)建立redis连接:

const Redis = require('ioredis');
function createRedisClient(){
const config = {
"redis": {
   "password": "***",
   "cluster": [
     {
       "host": "192.168.0.0",
       "port": 1234
     },
     {
       "host": "192.168.0.1",
       "port": 1234
     }
   ]
 }}
//建立单个redis service连接:
return new Redis({
     host: config.redis.cluster[0].host,
     port: config.redis.cluster[0].port,
     password: config.redis.password
});
//建立多个redis service连接:
//new Redis.Cluster(config.redis.cluster, {
//    redisOptions: {
//      password: config.redis.password
//    }
//});
}
//向redis注册用户信息:
createRedisClient().sadd(userId, userToken);

3)socket.io的存在是桥梁作用,socket.io分为socket.io和socket.io-client。在web端实例化socket.io-client,用户登录后,获取到用户信息和token,推送给nodejs服务中的socket.io。socket.io接收到用户信息后在redis中注册。当amqp推送消息过来时,nodejs服务会消费到消息队列中的这条消息,然后找到对应的userId,利用socket.io推送给socket.io-client,从而实现聊天系统中的消息推送。

实现效果: image.png image.png

采坑记录:
首先检查服务器上rabbitMQ和redis的端口是否正常访问
-- 检查方法:linux和centos:telnet 10.20.66.37 5672
macos: nc -vz -w 2 10.20.66.37 5672

-- 如果rabbitMQ和redis使用了docker容器,需配置rabbitMQ和redis端口的对外映射。打开docker_compose,配置对外映射: image.png

补充说明:

exchange和消息类型的定义可以在amqp的管理界面中添加: image.png image.png

--------补充更新-------
当我们在聊天系统中发送消息时,首先会调用后台的api,后台api会将消息放到amqp的消息队列中,其次,每个连接amqp的nodejs服务相当于一个消费者,消息队列中一旦有消息会被消费者消费。假设当前有两个nodejs服务同时在消费,当amqp消息队列中有消息产生时,如果被其中一个nodejs服务消费掉,则另外一个nodejs将消费不到这条消息。换句话说,当有多个nodejs存在时,只会有一个nodejs接收到amqp发来的消息

后续会整理个demo出来,更不动了。。

上一篇下一篇

猜你喜欢

热点阅读