一文搞懂C++消息队列

2019-08-08  本文已影响0人  晴空一垩

消息队列

消息队列在工作过程中,是个非常常用的基础知识点。

用于将一些任务放置在同一个线程中执行。一来不会对主线程产生影响,二来针对有先后关系的任务可以更好的维护。

下面,我们开始简单写个消息队列。希望大家都能理解。

单任务处理队列

这里采用qml进行验证,有一个按钮用于创建消息。对于这里讲述的qml与c++之间的通讯不熟悉的可以看我之前写的文章。

首先我们创建两个文件 ua4qml2.h,ua4qml2.cpp,内容分别是:

#ifndef UA4QML2_H
#define UA4QML2_H

#include <QObject>
#include <deque>
#include <string>
#include <mutex>
#include <thread>
#include <semaphore.h>

//消息队列中存放的消息结构体
struct Message{
    uint32_t type;
    std::string cmd;
};

class UA4Qml2 : public QObject
{
    Q_OBJECT
public:
    UA4Qml2(QObject *parent = 0);

    void sendMsg(const Message& msg);

signals:

public slots:
    void msgQueueSend(const QString str);
private:
    //处理消息队列
    void startProcMsg();
    static void* proc(void*);
private :
    std::deque<Message*> m_dequeMsgs;
    std::mutex m_mutex;
    sem_t m_sem;
    pthread_t m_thread;
};

#endif // UA4QML2_H

#include "ua4qml2.h"
#include <iostream>

UA4Qml2::UA4Qml2(QObject *parent) : QObject(parent)
{
    //创建消息队列
    startProcMsg();
}

void UA4Qml2::sendMsg(const Message&  msg)
{
    //往消息队列中,增加事件
    m_mutex.lock();
    m_dequeMsgs.emplace_back(new Message(msg));
    m_mutex.unlock();
    //通知分发事件
    sem_post(&m_sem);
}

//通过qml的按钮来进行操作
void UA4Qml2::msgQueueSend(const QString str)
{
    Message msg;
    msg.cmd = str.toStdString();
    msg.type = 1000;
    sendMsg(msg);
}

void UA4Qml2::startProcMsg()
{
    std::cout<<"进入消息队列"<<std::endl;
    std::thread t([this](){
        while(true){
            // 如果没有事件通知过来,则持续等待,这里使用sem_wait,可以使用别的方式来卡住线程,否则对cpu是种很大的消耗
            sem_wait(&m_sem);
            //如果有消息到来就不会开启等待
            std::cout<<"开始"<<std::endl;
            std::deque<Message*> dequeMsgs;
            //对于双端队列的操作需要加锁
            m_mutex.lock();
            // 这里进行内存的交换
            dequeMsgs.swap(m_dequeMsgs);
            m_mutex.unlock();
            for(auto it=dequeMsgs.begin();it!=dequeMsgs.end();it++){
                //处理任务,可以根据不同的type来选择不同的处理方式
                std::cout<<(*it)->type<<":::"<<(*it)->cmd<<std::endl;
                delete *it;
            }
            //这里注意下,我手动停止了2s,来模拟有个任务消耗过多的时间
            sleep(2);
            std::cout<<"结束"<<std::endl;
        }
    });
    t.detach();
}

接下来是qml的点击,对于怎么和c++通讯还是看 上篇文章

QQmlApplicationEngine *engine = new QQmlApplicationEngine();
engine->rootContext()->setContextProperty("$SigDispatcher", new UA4Qml2);
engine->load(QUrl(QStringLiteral("qrc:/main.qml")));
import QtQuick.Controls 1.4

ApplicationWindow {
    id: applicationWindow
    visible: true
    width: 640
    height: 480
    Button{
        id:msgQueue
        text: "消息队列"
        anchors.top: parent.top
        anchors.topMargin: 10
        onClicked: {
            //调用C++ 的方法
            $SigDispatcher.msgQueueSend("hello world");
        }
    }
}

接下来看输出

开始
1000:::hello world
结束
开始
1000:::hello world
1000:::hello world
1000:::hello world
结束
开始
结束
开始
结束

解析

  1. 使用linux中的信号量来进行等待操作,每调用sem_post 一次表示信号量+1,没调用 sem_wait 一次表示信号量-1 直到为0 则处于等待。

  2. 使用双端队列进行消息的存储。使用swap进行内存的交换,减少内存的拷贝。当然这里你们也可以使用别的数据结构来进行消息的存储。

  3. 有个死循环,一直持续处理消息,注意这里需要卡住线程 sem_wait

  4. 注意对存储的消息体操作的时候,需要加锁

看到这里你是否有发现存在的问题呢?

如果采用单线程的消息队列,则会有个问题,就是处理的消息不能有延时。一旦一个消息处理过慢,则对之后所有的消息都会产生影响。

多任务处理队列

#include "ua4qml2.h"
#include <iostream>

UA4Qml2::UA4Qml2(QObject *parent) : QObject(parent)
{
    startProcMsg();
}

void UA4Qml2::sendMsg(const Message&  msg)
{
    //往消息队列中,增加事件,注意加锁
    m_mutex.lock();
    m_dequeMsgs.emplace_back(new Message(msg));
    m_mutex.unlock();
    sem_post(&m_sem);
}

void UA4Qml2::msgQueueSend(const QString str)
{
    Message msg;
    msg.cmd = str.toStdString();
    msg.type = 1000;
    sendMsg(msg);
}

void UA4Qml2::startProcMsg()
{
    std::cout<<"进入消息队列"<<std::endl;
    //c++ 中采用#if 的方式来进行代码块的注释,这是java所没有的
    //这里采用了两种不同的线程方式,c开线程和c++开线程的两种方式
#if 1
    for(size_t i=0;i<4;++i)
    {
        pthread_create(&m_thread,NULL,proc,this);
    }
#else
    for(size_t i=0;i<4;++i)
    {
        std::thread t([this](){
            UA4Qml2::proc(this);
        });
        t.detach();
    }
#endif
}

void* UA4Qml2::proc(void* data)
{
    UA4Qml2* uA4Qml2 = (UA4Qml2*)data;
    while(true){
        sem_wait(&uA4Qml2->m_sem);
        //如果有消息到来就会执行
        uA4Qml2->m_mutex.lock();
        //这里是一个一个的获取,而非一次性获取所有的消息
        Message* it = *(uA4Qml2->m_dequeMsgs.begin());
        uA4Qml2->m_dequeMsgs.pop_front();
        uA4Qml2->m_mutex.unlock();
        std::cout<<(*it).type<<":::"<<(*it).cmd<<std::this_thread::get_id()<<std::endl;
        std::cout<<"卡住结束"<<std::endl;
        sleep(2);
        delete it;
    }
    return nullptr;
}

看下结果打印:

进入消息队列
1000:::hello world140150117496576 #【】
卡住结束
1000:::hello world140150100711168
卡住结束
1000:::hello world140150109103872
卡住结束
1000:::hello world140150017357568
卡住结束
1000:::hello world140150117496576 #【】
卡住结束

解析

  1. 这里使用了4个线程,来做多线程消息队列。
  2. 这里要注意因为使用了 Message* 所以要注意内存的释放与申请
  3. c++ 可以用宏定义 #if #error #define 来完成很多事情,c++的优点哦

欢迎关注我的微信公众号:
程序员学习分享录

上一篇下一篇

猜你喜欢

热点阅读