一文搞懂C++消息队列
2019-08-08 本文已影响0人
晴空一垩
消息队列
消息队列在工作过程中,是个非常常用的基础知识点。
用于将一些任务放置在同一个线程中执行。一来不会对主线程产生影响,二来针对有先后关系的任务可以更好的维护。
下面,我们开始简单写个消息队列。希望大家都能理解。
单任务处理队列
这里采用qml进行验证,有一个按钮用于创建消息。对于这里讲述的qml与c++之间的通讯不熟悉的可以看我之前写的文章。
首先我们创建两个文件 ua4qml2.h
,ua4qml2.cpp
,内容分别是:
#ifndef UA4QML2_H
#define UA4QML2_H
#include <QObject>
#include <deque>
#include <string>
#include <mutex>
#include <thread>
#include <semaphore.h>
//消息队列中存放的消息结构体
struct Message{
uint32_t type;
std::string cmd;
};
class UA4Qml2 : public QObject
{
Q_OBJECT
public:
UA4Qml2(QObject *parent = 0);
void sendMsg(const Message& msg);
signals:
public slots:
void msgQueueSend(const QString str);
private:
//处理消息队列
void startProcMsg();
static void* proc(void*);
private :
std::deque<Message*> m_dequeMsgs;
std::mutex m_mutex;
sem_t m_sem;
pthread_t m_thread;
};
#endif // UA4QML2_H
#include "ua4qml2.h"
#include <iostream>
UA4Qml2::UA4Qml2(QObject *parent) : QObject(parent)
{
//创建消息队列
startProcMsg();
}
void UA4Qml2::sendMsg(const Message& msg)
{
//往消息队列中,增加事件
m_mutex.lock();
m_dequeMsgs.emplace_back(new Message(msg));
m_mutex.unlock();
//通知分发事件
sem_post(&m_sem);
}
//通过qml的按钮来进行操作
void UA4Qml2::msgQueueSend(const QString str)
{
Message msg;
msg.cmd = str.toStdString();
msg.type = 1000;
sendMsg(msg);
}
void UA4Qml2::startProcMsg()
{
std::cout<<"进入消息队列"<<std::endl;
std::thread t([this](){
while(true){
// 如果没有事件通知过来,则持续等待,这里使用sem_wait,可以使用别的方式来卡住线程,否则对cpu是种很大的消耗
sem_wait(&m_sem);
//如果有消息到来就不会开启等待
std::cout<<"开始"<<std::endl;
std::deque<Message*> dequeMsgs;
//对于双端队列的操作需要加锁
m_mutex.lock();
// 这里进行内存的交换
dequeMsgs.swap(m_dequeMsgs);
m_mutex.unlock();
for(auto it=dequeMsgs.begin();it!=dequeMsgs.end();it++){
//处理任务,可以根据不同的type来选择不同的处理方式
std::cout<<(*it)->type<<":::"<<(*it)->cmd<<std::endl;
delete *it;
}
//这里注意下,我手动停止了2s,来模拟有个任务消耗过多的时间
sleep(2);
std::cout<<"结束"<<std::endl;
}
});
t.detach();
}
接下来是qml的点击,对于怎么和c++通讯还是看 上篇文章
QQmlApplicationEngine *engine = new QQmlApplicationEngine();
engine->rootContext()->setContextProperty("$SigDispatcher", new UA4Qml2);
engine->load(QUrl(QStringLiteral("qrc:/main.qml")));
import QtQuick.Controls 1.4
ApplicationWindow {
id: applicationWindow
visible: true
width: 640
height: 480
Button{
id:msgQueue
text: "消息队列"
anchors.top: parent.top
anchors.topMargin: 10
onClicked: {
//调用C++ 的方法
$SigDispatcher.msgQueueSend("hello world");
}
}
}
接下来看输出
开始
1000:::hello world
结束
开始
1000:::hello world
1000:::hello world
1000:::hello world
结束
开始
结束
开始
结束
解析
-
使用linux中的信号量来进行等待操作,每调用
sem_post
一次表示信号量+1,没调用sem_wait
一次表示信号量-1 直到为0 则处于等待。 -
使用双端队列进行消息的存储。使用swap进行内存的交换,减少内存的拷贝。当然这里你们也可以使用别的数据结构来进行消息的存储。
-
有个死循环,一直持续处理消息,注意这里需要卡住线程
sem_wait
-
注意对存储的消息体操作的时候,需要加锁
看到这里你是否有发现存在的问题呢?
如果采用单线程的消息队列,则会有个问题,就是处理的消息不能有延时。一旦一个消息处理过慢,则对之后所有的消息都会产生影响。
多任务处理队列
#include "ua4qml2.h"
#include <iostream>
UA4Qml2::UA4Qml2(QObject *parent) : QObject(parent)
{
startProcMsg();
}
void UA4Qml2::sendMsg(const Message& msg)
{
//往消息队列中,增加事件,注意加锁
m_mutex.lock();
m_dequeMsgs.emplace_back(new Message(msg));
m_mutex.unlock();
sem_post(&m_sem);
}
void UA4Qml2::msgQueueSend(const QString str)
{
Message msg;
msg.cmd = str.toStdString();
msg.type = 1000;
sendMsg(msg);
}
void UA4Qml2::startProcMsg()
{
std::cout<<"进入消息队列"<<std::endl;
//c++ 中采用#if 的方式来进行代码块的注释,这是java所没有的
//这里采用了两种不同的线程方式,c开线程和c++开线程的两种方式
#if 1
for(size_t i=0;i<4;++i)
{
pthread_create(&m_thread,NULL,proc,this);
}
#else
for(size_t i=0;i<4;++i)
{
std::thread t([this](){
UA4Qml2::proc(this);
});
t.detach();
}
#endif
}
void* UA4Qml2::proc(void* data)
{
UA4Qml2* uA4Qml2 = (UA4Qml2*)data;
while(true){
sem_wait(&uA4Qml2->m_sem);
//如果有消息到来就会执行
uA4Qml2->m_mutex.lock();
//这里是一个一个的获取,而非一次性获取所有的消息
Message* it = *(uA4Qml2->m_dequeMsgs.begin());
uA4Qml2->m_dequeMsgs.pop_front();
uA4Qml2->m_mutex.unlock();
std::cout<<(*it).type<<":::"<<(*it).cmd<<std::this_thread::get_id()<<std::endl;
std::cout<<"卡住结束"<<std::endl;
sleep(2);
delete it;
}
return nullptr;
}
看下结果打印:
进入消息队列
1000:::hello world140150117496576 #【】
卡住结束
1000:::hello world140150100711168
卡住结束
1000:::hello world140150109103872
卡住结束
1000:::hello world140150017357568
卡住结束
1000:::hello world140150117496576 #【】
卡住结束
解析
- 这里使用了4个线程,来做多线程消息队列。
- 这里要注意因为使用了
Message*
所以要注意内存的释放与申请 - c++ 可以用宏定义 #if #error #define 来完成很多事情,c++的优点哦
欢迎关注我的微信公众号:
程序员学习分享录