C# MQTT 工具封装

2023-12-26  本文已影响0人  Messix_1102

需要安装MQTTnet nugut 包,二话不说,贴代码

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System.Text;

namespace Utility
{
    /// <summary>
    /// MQTT 服务代理
    /// </summary>
    public class MQTTProxy
    {
        /// <summary>
        /// MQTT 客户端
        /// </summary>
        private IMqttClient _client { get; set; }
        /// <summary>
        /// MQTT 连接选项
        /// </summary>
        private MqttClientOptions _mqttClientOptions { get; set; }
        /// <summary>
        /// 订阅主题
        /// </summary>
        private HashSet<string> _subscribeTopics { get; set; } = new HashSet<string>();
        /// <summary>
        /// 消息处理程序
        /// </summary>
        private Func<MqttApplicationMessageReceivedEventArgs, Task> _messageHandler { get; set; }
        /// <summary>
        /// 重新连接次数
        /// </summary>
        private int reconnectedTimes = 1;

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="host"></param>
        /// <param name="port"></param>
        /// <param name="username"></param>
        /// <param name="password"></param>
        /// <param name="clientId"></param>
        public MQTTProxy(
            string host,
            int port,
            string username,
            string password,
            string clientId)
        {
            try
            {
                // 初始化连接参数
                _mqttClientOptions = new MqttClientOptionsBuilder()
                    .WithTcpServer(host, port)
                    .WithCredentials(username, password)
                    .WithClientId(clientId)
                    // 连接断开, 会话不删除
                    // 意味着会话队列里的数据还在其他节点上, 重新连接能接着消费
                    // 如果数据丢失无所谓就设置为true
                    .WithCleanSession(false) 
                    .Build();
                // 创建连接
                MqttFactory factory = new MqttFactory();
                _client = factory.CreateMqttClient();
                _client.ConnectAsync(_mqttClientOptions).Wait();
                // 断线自动重连
                _client.DisconnectedAsync += Reconnect;
            }
            catch
            {
                throw new Exception("MQTT 连接失败, 请检查连接信息");
            }
        }

        /// <summary>
        /// 发布消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="messageBody"></param>
        /// <returns></returns>
        public async Task<string> Publish(string topic, string messageBody)
        {
            try
            {
                MqttApplicationMessage message = new MqttApplicationMessageBuilder()
                    .WithTopic(topic)
                    // AtLeastOnce 至少发成功一次
                    .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                    .WithRetainFlag(false)
                    .WithPayload(messageBody)
                    .Build();
                await _client.PublishAsync(message);
                return string.Empty;
            }
            catch 
            {
                return "消息发送失败";
            }
        }

        /// <summary>
        /// 订阅主题
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public string Subscribe(string topic, Func<MqttApplicationMessageReceivedEventArgs, Task> callback)
        {
            try
            {
                _subscribeTopics.Add(topic);
                _messageHandler = callback;

                // 添加消息处理函数
                if (callback != null)
                {
                    _client.ApplicationMessageReceivedAsync += callback;
                }
                else
                {
                    _client.ApplicationMessageReceivedAsync += MessageHandler;
                }
                // AtLeastOnce 直到订阅成功一次
                _client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
                return string.Empty;
            }
            catch
            {
                return "订阅失败";
            }
        }

        /// <summary>
        /// 订阅主题
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public string MultipSubscribe(List<string> topics, Func<MqttApplicationMessageReceivedEventArgs, Task> callback)
        {
            try
            {
                _messageHandler = callback;

                // 添加消息处理函数
                if (callback != null)
                {
                    _client.ApplicationMessageReceivedAsync += callback;
                }
                else
                {
                    _client.ApplicationMessageReceivedAsync += MessageHandler;
                }
                foreach(string topic in topics)
                {
                    _subscribeTopics.Add(topic);
                    _client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
                }
                return string.Empty;
            }
            catch
            {
                return "订阅失败";
            }
        }

        /// <summary>
        /// 默认消息处理程序
        /// </summary>
        /// <param name="e"></param>
        /// <returns></returns>
        private async Task MessageHandler(MqttApplicationMessageReceivedEventArgs e)
        {
            Thread.Sleep(2000);
            await Task.Run(() =>
            {
                Console.WriteLine(e.ApplicationMessage.Topic);
                Console.WriteLine(Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment));
            });
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="topic"></param>
        public async Task UnSubscribe(string topic)
        {
            await _client.UnsubscribeAsync(topic);
            _subscribeTopics.Remove(topic);
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="topic"></param>
        public async Task UnSubscribeAll()
        {
            foreach(string topic in _subscribeTopics)
            {
                await _client.UnsubscribeAsync(topic);
            }
            _subscribeTopics.Clear();
        }

        /// <summary>
        /// 断线重连
        /// </summary>
        /// <param name="e"></param>
        /// <returns></returns>
        private async Task Reconnect(MqttClientDisconnectedEventArgs e)
        {
            Console.WriteLine("重新连接被调用");
            if (_mqttClientOptions == null || _client.IsConnected)
            {
                return;
            }
            await Task.Run(() =>
            {
                try
                {
                    _client.ConnectAsync(_mqttClientOptions).Wait();
                    if (_client.IsConnected)
                    {
                        // 连接成功重新订阅
                        if (_subscribeTopics.Count() > 0 && _messageHandler != null)
                        {
                            foreach (string topic in _subscribeTopics)
                            {
                                _client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
                            }
                        }

                        reconnectedTimes = 1;
                        Console.WriteLine("重连成功");
                    }
                }
                catch 
                {
                    Console.WriteLine($"重连第{reconnectedTimes}次失败");
                    reconnectedTimes += 1;
                }
            });
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose() 
        {
            if (_client != null && _client.IsConnected)
            {
                _client.DisconnectedAsync -= Reconnect;
                _client.DisconnectAsync().Wait();
            }
            if (_client != null)
            {
                _client.Dispose();
            }
        }

        /// <summary>
        /// Finalize 函数释放资源
        /// 如果用户忘记手动调用Dispose方法, 那么在MQTTProxy被垃圾回收时会释放资源
        /// </summary>
        ~MQTTProxy()
        {
            Dispose();
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读