Qt多线程中基于队列实现网络通信

2018-11-20  本文已影响0人  Qt工坊

基本思想

  1. 在线程中创建QTcpSocket对象实例;
  2. 在线程类中采用队列的方式动态添加需要执行的网络命令类实例;
  3. 为命令执行线程定义空闲、数据发送和数据读取三个状态;
  4. 将网络反馈数据以信号的方式发送给命令类进行解析;

命令执行线程类

tcpcmdexecthread.h

#ifndef CMDEXECTHREADABSTRACT_H
#define CMDEXECTHREADABSTRACT_H

#include <QThread>
#include <QTcpSocket>
#include <QMutex>
class CommandAbstract;
class TcpCmdExecThread : public QThread
{
    Q_OBJECT
public:
    enum State{    //命令执行状态
        E_Idle = 0,//线程空闲
        E_Sending, //数据发送
        E_Reading  //数据读取
    };
    TcpCmdExecThread(QObject *parent = 0);
    ~TcpCmdExecThread();
    void run();
    //追加要执行的网络命令到线程的执行队列中
    void appendCmd(CommandAbstract* cmd);
signals:
    //对外通知网络反馈数据
    void response(const QByteArray& data);
public slots:
    void onSocketError(QAbstractSocket::SocketError error);
private:
    //准备下一个命令,如果成功则返回true,失败则返回false
    bool prepareNextCmd();
private:
    QList<CommandAbstract*> m_cmdQueue;//命令执行队列
    CommandAbstract* m_currentCmd;//当前命令
    QTcpSocket* m_tcpSocket;
    State m_state;//命令执行线程的状态
    QMutex m_mutex;
};
#endif // CMDEXECTHREADABSTRACT_H

tcpcmdexecthread.cpp

#include "tcpcmdexecthread.h"
#include "commandabstract.h"
#include <QDebug>
#include <QMutexLocker>
TcpCmdExecThread::TcpCmdExecThread(QObject *parent):
    QThread(parent),
    m_currentCmd(NULL),
    m_tcpSocket(NULL),
    m_state(E_Idle)
{

}

TcpCmdExecThread::~TcpCmdExecThread()
{

}

void TcpCmdExecThread::appendCmd(CommandAbstract *cmd)
{
    QMutexLocker locker(&m_mutex);
    m_cmdQueue.append(cmd);
}

void TcpCmdExecThread::run()
{
    if(m_tcpSocket)
        m_tcpSocket->deleteLater();

    m_tcpSocket = new QTcpSocket;
    connect(m_tcpSocket,SIGNAL(error(QAbstractSocket::SocketError)),
            this,SLOT(onSocketError(QAbstractSocket::SocketError)));
    QString host = QString("127.0.0.1");
    int port = 9090;
    m_tcpSocket->connectToHost(host,port);
    if(!m_tcpSocket->waitForConnected())
    {
        return ;
    }
    qDebug()<<__LINE__<<__FUNCTION__<<"Tcp connected.";
    while(!isInterruptionRequested())//调用requestInterruption()可退出线程
    {
        QThread::msleep(25);
        switch(m_state)
        {
        case E_Idle:
        {
            QThread::msleep(10);
            if(m_cmdQueue.length() > 0)
            {
                bool success = prepareNextCmd();
                if(success)
                    m_state =  E_Sending;
            }
        }break;
        case E_Sending:
        {
            qDebug()<<__LINE__<<__FUNCTION__<<"m_state =  E_Sending";
            QByteArray sendingData = m_currentCmd->getSendingData();
            m_tcpSocket->write(sendingData);
            qDebug()<<__LINE__<<__FUNCTION__<<"sendingData = "<<sendingData;
            m_tcpSocket->waitForReadyRead();
            m_state =  E_Reading;

        }break;
        case E_Reading:
        {
            qDebug()<<__LINE__<<__FUNCTION__<<"m_state =  E_Reading";
            QByteArray readData = m_tcpSocket->readAll();
            //信号response采用BlockingQueuedConnection方式与命令槽函数parseResponseData连接,
            //则当parseResponseData函数执行完毕之后,线程才会继续往后执行
            emit response(readData);
            m_currentCmd->deleteLater();
            m_state =  E_Idle;
        }break;
        default:{

        }
        }
    }

}

void TcpCmdExecThread::onSocketError(QAbstractSocket::SocketError error)
{
    qDebug()<<__LINE__<<__FUNCTION__<<"error:"<<(int)error;
}

bool TcpCmdExecThread::prepareNextCmd()
{
    if(m_cmdQueue.length() > 0){
        m_currentCmd = m_cmdQueue.takeFirst();
        connect(this,SIGNAL(response(QByteArray)),
                m_currentCmd,SLOT(parseResponseData(QByteArray)),Qt::BlockingQueuedConnection);
        return true;
    }
    return false;
}

网络命令基类

commandabstract.h

#ifndef COMMANDABSTRACT_H
#define COMMANDABSTRACT_H

#include <QObject>
#include <QVariantHash>

class CommandAbstract : public QObject
{
    Q_OBJECT
public:
    explicit CommandAbstract(QObject *parent = 0);
    //接口:把命令参数组织成要发送给服务器的数据m_sendingData,其中包含操作类型cmdOperType
    virtual void prepareSendingData(const QVariantHash& cmdArgs) = 0;
    //获取要发送给服务器的数据m_sendingData
    virtual QByteArray getSendingData() const;
    virtual void setSendingData(const QByteArray& data);
    void setCmdIndex(int index);
    void setOperType(int operType);
signals:
    //通过信号对外通知命令执行数据m_parsedResponseData
    void infoResultData(QVariantHash& parsedResponseData);
public slots:
    //接口:解析来自服务器的响应数据,保存到m_parsedResponseData中
    virtual void parseResponseData(const QByteArray& data) = 0;
protected:
    int getCmdIndex() const;
    int getCmdOperType() const;
private:
    QVariantHash m_cmdData;//命令参数
    QByteArray m_sendingData;//最终发送给服务器的命令数据
    QVariantHash m_parsedResponseData;//解析之后的服务器响应数据
    int m_cmdIndex;
    int m_cmdOperType;
};
Q_DECLARE_METATYPE(CommandAbstract*)
#endif // COMMANDABSTRACT_H

commandabstract.cpp

#include "commandabstract.h"
CommandAbstract::CommandAbstract(QObject *parent) :
    QObject(parent),
    m_cmdIndex(0),
    m_cmdOperType(0)
{

}

QByteArray CommandAbstract::getSendingData() const
{
    return m_sendingData;
}

void CommandAbstract::setSendingData(const QByteArray &data)
{
    m_sendingData = data;
}

void CommandAbstract::setCmdIndex(int index)
{
    m_cmdIndex = index;
}

void CommandAbstract::setOperType(int operType)
{
    m_cmdOperType = operType;
}

int CommandAbstract::getCmdIndex() const
{
    return m_cmdIndex;
}

int CommandAbstract::getCmdOperType() const
{
    return m_cmdOperType;
}

使用方法

void MainWindow::on_pushButtonSendCmd_clicked()
{
    m_tcpCmdExecThread->start();
    QVariantHash cmdArgs;
    cmdArgs.insert("key1","cmd1 data1");
    cmdArgs.insert("key2","cmd1 data2");//data1和data2是模拟要发送给服务器的命令数据

    CommandAbstract* cmd1 = new DemoCommand();//向服务程序发送的第一个命令
    //命令对象cmd解析完成之后,发送信号将数据显示到界面
    connect(cmd1,SIGNAL(infoResultData(QVariantHash&)),
            this,SLOT(onInfoResultData(QVariantHash&)));
    cmd1->setCmdIndex(1);
    cmd1->setOperType(1);
    //准备发送数据
    cmd1->prepareSendingData(cmdArgs);
    //追加命令到线程的命令队列中
    m_tcpCmdExecThread->appendCmd(cmd1);


    cmdArgs.insert("key1","cmd2 data1");
    cmdArgs.insert("key2","cmd2 data2");
    CommandAbstract* cmd2 = new DemoCommand();//向服务程序发送的第二个命令
    //命令对象cmd解析完成之后,发送信号将数据显示到界面
    connect(cmd2,SIGNAL(infoResultData(QVariantHash&)),
            this,SLOT(onInfoResultData(QVariantHash&)));
    cmd2->setCmdIndex(1);
    cmd2->setOperType(1);
    //准备发送数据
    cmd2->prepareSendingData(cmdArgs);
    //追加命令到线程的命令队列中
    m_tcpCmdExecThread->appendCmd(cmd2);
}

//显示解析后的网络数据到界面
void MainWindow::onInfoResultData(QVariantHash &parsedResponseData)
{
    QString dataFromServer = parsedResponseData.value("response").toString();
    ui->textEdit->append(dataFromServer);
}

运行效果

客户程序.png
服务程序.png

源代码下载

说明:源代码中的commandmodeserver.exe为本例中的服务端程序,本例中的客户程序会与它进行连接,提供服务端数据响应。
百度网盘分享地址:
链接:https://pan.baidu.com/s/1K3c_OJqfHsXpRBSiinMqkA
提取码:z8ix

上一篇 下一篇

猜你喜欢

热点阅读