程序园技术初心简友广场

C# 简单操作 RabbitMQ 发送与接收队列消息

2019-11-22  本文已影响0人  triplestudio

一、前言

为了解藕下发指令功能,加入了 RabbitMQ 中间件,目的很简单,就是使用独立出来的消息中间件,使得两端的应用互不影响,你重启你的,我发送我的。

根据此应用场景,对组件功能的需求也比较简单。考虑到类似的简单场景也具备一定的通用性,于是进行简易封装。

期望的外部方法主要有:

这里我们使用默认的 exchange,也即不涉及这个概念。

引用程序集,如:rabbitmq-dotnet-client-3.6.6-dotnet-4.5.zip

https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/

二、先看使用方式

为了简洁,这里在同一个控制台上完成同一队列的发送与接收,手动输入发送内容,接收显示发送的内容。

(1)形成通话对象

参数如其名,将相应的信息作为参数传递即可。

RabbitMQQueueTalker rmt = new RabbitMQQueueTalker(
        host: "localhost",      
        username: "guest", 
        password: "guest", 
        sendQueueName: "QueueTalk", 
        receiveQueueName: "QueueTalk", 
        durable: false);

(2)绑定消息响应方法

// 消息接收响应
rmt.OnMessage(s => {
    Console.WriteLine(String.Format("receive message : {0}", s));
}); 

(3)发送消息

rmt.SendMessage(message);

完整调用过程

class Program
{
    static void Main(string[] args)
    {
        RabbitMQQueueTalker rmt = new RabbitMQQueueTalker(
            host: "localhost", 
            username: "guest", 
            password: "guest", 
            sendQueueName: "QueueTalk", 
            receiveQueueName: "QueueTalk", 
            durable: false);

        // 消息接收响应
        rmt.OnMessage(s => {
            Console.WriteLine(String.Format("receive message : {0}", s));
        }); 

        // 输入并发送消息
        while (true)
        {
            // input message
            string message = Console.ReadLine();
            if (rmt.SendMessage(message))
            {
                Console.WriteLine("send message :{0}", message);
            }
        }
    }
}
运行结果如下:

三、封装实现

3.1 启动连接

应用需要连接能够断线重连,这里定义 GetConnection 方法,实现在未建立时新建连接,建立之后则由断线重连属性 AutomaticRecoveryEnabled 完成断线之后的恢复。

private IConnection GetConnection()
{
    if (_Connection != null) return _Connection;

    var factory = new ConnectionFactory()
    {
        HostName = this.Host,
        UserName = this.UserName,
        Password = this.Password,
        RequestedHeartbeat = 10,
        AutomaticRecoveryEnabled = true
    };

    try
    {
        factory.RequestedConnectionTimeout = 6000;
        _Connection = factory.CreateConnection();

        // 阻塞解除之后检测接收通道是否还打开
        _Connection.ConnectionUnblocked += (o, e) => {
            BuildReceiveChannel();
        };

        return _Connection;
    }
    catch (Exception se)
    {
        Debug.WriteLine(se.Message);
        Debug.WriteLine(se.StackTrace);
        return null;
    }
}

3.2 发送通道与发送方法

默认队列,在通道建立的过程中进行队列声明 QueueDeclare

// 获取发送通道
private IModel GetSendChannel()
{
    if (_SendChannel != null && !_SendChannel.IsClosed) return _SendChannel;

    var conn = GetConnection();
    if (conn == null) return null;

    try
    {
        _SendChannel = conn.CreateModel();

        _SendChannel.QueueDeclare(queue: SendQueueName,
                            durable: Durable,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

        return _SendChannel;
    }
    catch (Exception se)
    {
        Debug.WriteLine(se.Message);
        Debug.WriteLine(se.StackTrace);
        return null;
    }
}

对于临时指定队列名称的,则在方法中动态进行队列声明 QueueDeclare。开放的方法如下。

/// <summary>
/// 向默认发送队列发送消息
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public bool SendMessage(string message)
{
    return SendMessage(SendQueueName, message);
}

/// <summary>
/// 向指定队列发送消息
/// </summary>
/// <param name="queueName"></param>
/// <param name="message"></param>
/// <returns></returns>
public bool SendMessage(string queueName, string message)
{
    var channel = GetSendChannel();
    if (channel == null) return false;

    try
    {
        if (SendQueueName != queueName)
        {
            channel.QueueDeclare(queue: queueName,
                            durable: Durable,   
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);
        }

        var body = Encoding.UTF8.GetBytes(message);

        // 发送消息
        channel.BasicPublish(exchange: "",
                                routingKey: queueName,
                                basicProperties: null,
                                body: body);
        return true;
    }
    catch (Exception se)
    {
        Debug.WriteLine(se.Message);
        Debug.WriteLine(se.StackTrace);
        return false;
    }
}

3.3 接收通道与接收方式

定义响应方法列表,用于在消息事件中逐个调用。

private List<Action<string>> ReceiveActionList = new List<Action<string>>();

建立接收通道,并绑定接收事件。

// 建立接收通道
private IModel BuildReceiveChannel()
{
    if (_ReceiveChannel != null && !_ReceiveChannel.IsClosed) return _ReceiveChannel;

    var conn = GetConnection();
    if (conn == null) return null;

    try
    {
        _ReceiveChannel = conn.CreateModel();

        _ReceiveChannel.QueueDeclare(queue: ReceiveQueueName,
                        durable: Durable,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

        var consumer = new EventingBasicConsumer(_ReceiveChannel);

        // 绑定消息事件
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            foreach (Action<string> action in ReceiveActionList)
            {
                action(message);
            }                    
        };
        // 启动消费者
        _ReceiveChannel.BasicConsume(queue: ReceiveQueueName,
                                noAck: true,
                                consumer: consumer);

        return _ReceiveChannel;
    }
    catch (Exception se)
    {
        Debug.WriteLine(se.Message);
        Debug.WriteLine(se.StackTrace);
        return null;
    }
}

向外开放添加响应方法的功能。

/// <summary>
/// 添加消息到达响应方法
/// </summary>
/// <param name="action"></param>
public void OnMessage(Action<string> action)
{
    lock (ReceiveActionList)
    {
        ReceiveActionList.Add(action);
    }

    BuildReceiveChannel();
}

详细源码

https://github.com/triplestudio/helloworld/blob/master/RabbitMQTest/RabbitMQQueueTalker.cs

上一篇 下一篇

猜你喜欢

热点阅读