用C++实现数据总线的方法系列(上):基本概念&同步队列

2020-03-11  本文已影响0人  JasonLiThirty

视频教程:https://www.bilibili.com/video/av94487439

本文主要介绍多线程中数据同步的方法,技术包括:线程锁,同步变量,原子变量,消息处理等;以及三种同步队列的实现方法。

std::unique_lock

条件变量

类型

成员函数

notify_one()和notify_all()都是Object对象用于通知处在等待该对象的线程的方法,但notify_one是通知一个线程获取锁,notify_all是通知所有相关的线程去竞争锁。

执行过程

简洁写法及wait机制

std::unique_lock<std::mutex> lck(m_mtRun);
m_cvRun.wait(lck, [this]{ return m_runDown; });

基本示例-wait, wait_for和假唤醒

#include <iostream>                
#include <thread>               
#include <mutex>         
#include <list>
#include <condition_variable>   
#include <windows.h>

bool                     completed;
std::mutex               mtRun;
std::condition_variable  cvRun;

void Wait()
{
    std::unique_lock<std::mutex> lck(mtRun);

    std::cout <<"Thread_"<<std::this_thread::get_id() << " is waiting..." << std::endl;
    cvRun.wait(lck, []() {
        return completed;
    });

    std::cout << "Thread_" << std::this_thread::get_id() << " is completed" << std::endl;
}

void Wait_For()
{
    std::unique_lock<std::mutex> lck(mtRun);

    std::cout << "Thread_" << std::this_thread::get_id() << " is waiting..." << std::endl;
    if (!cvRun.wait_for(lck, std::chrono::seconds(4), []() {
    //if (!cvRun.wait_for(lck, std::chrono::seconds(2), []() {
        return completed;
    }))
    {
        std::cout << "Thread_" << std::this_thread::get_id() << " time out!" << std::endl;
    }
    else
    {
        std::cout << "Thread_" << std::this_thread::get_id() << " is completed" << std::endl;
    }
}

void Completed()
{
    {
        std::cout << "Thread_" << std::this_thread::get_id() << " set completed" << std::endl;
        std::unique_lock<std::mutex> lck(mtRun);
        completed = true;
    }
    cvRun.notify_all();
}

void FakeCompleted()
{
    {
        std::cout << "Thread_" << std::this_thread::get_id() << " not set completed" << std::endl;
        std::unique_lock<std::mutex> lck(mtRun);
        completed = false;
    }
    cvRun.notify_all();
}

int main()
{
    //Wait
    completed = false;
    std::thread thWait(Wait);
    thWait.detach();
    Sleep(3000);
    std::thread thCompleted(Completed);
    thCompleted.join();
    Sleep(3000);
    //Waitfor
    //completed = false;
    //std::thread thWait(Wait_For);
    //thWait.detach();
    //Sleep(3000);
    //std::thread thCompleted(Completed);
    //thCompleted.join();
    //Sleep(3000);
    //Fake
    /*completed = false;
    std::thread thWait(Wait_For);
    thWait.detach();
    Sleep(3000);
    std::thread thCompleted(FakeCompleted);
    thCompleted.join();
    Sleep(3000);*/
    return 0;
}

原子变量

typedef atomic<bool> atomic_bool;
typedef atomic<char> atomic_char;
typedef atomic<signed char> atomic_schar;
typedef atomic<unsigned char> atomic_uchar;
typedef atomic<short> atomic_short;
typedef atomic<unsigned short> atomic_ushort;
typedef atomic<int> atomic_int;
typedef atomic<unsigned int> atomic_uint;
typedef atomic<long> atomic_long;
typedef atomic<unsigned long> atomic_ulong;
typedef atomic<long long> atomic_llong;
typedef atomic<unsigned long long> atomic_ullong;
typedef atomic<char16_t> atomic_char16_t;
typedef atomic<char32_t> atomic_char32_t;
typedef atomic<wchar_t> atomic_wchar_t;
typedef atomic<int8_t> atomic_int8_t;
typedef atomic<uint8_t> atomic_uint8_t;
typedef atomic<int16_t> atomic_int16_t;
typedef atomic<uint16_t> atomic_uint16_t;
typedef atomic<int32_t> atomic_int32_t;
typedef atomic<uint32_t> atomic_uint32_t;
typedef atomic<int64_t> atomic_int64_t;
typedef atomic<uint64_t> atomic_uint64_t;
typedef atomic<int_least8_t> atomic_int_least8_t;
typedef atomic<uint_least8_t> atomic_uint_least8_t;
typedef atomic<int_least16_t> atomic_int_least16_t;
typedef atomic<uint_least16_t> atomic_uint_least16_t;
typedef atomic<int_least32_t> atomic_int_least32_t;
typedef atomic<uint_least32_t> atomic_uint_least32_t;
typedef atomic<int_least64_t> atomic_int_least64_t;
typedef atomic<uint_least64_t> atomic_uint_least64_t;
typedef atomic<int_fast8_t> atomic_int_fast8_t;
typedef atomic<uint_fast8_t> atomic_uint_fast8_t;
typedef atomic<int_fast16_t> atomic_int_fast16_t;
typedef atomic<uint_fast16_t> atomic_uint_fast16_t;
typedef atomic<int_fast32_t> atomic_int_fast32_t;
typedef atomic<uint_fast32_t> atomic_uint_fast32_t;
typedef atomic<int_fast64_t> atomic_int_fast64_t;
typedef atomic<uint_fast64_t> atomic_uint_fast64_t;

typedef atomic<intptr_t> atomic_intptr_t;
typedef atomic<uintptr_t> atomic_uintptr_t;
typedef atomic<size_t> atomic_size_t;
typedef atomic<ptrdiff_t> atomic_ptrdiff_t;
typedef atomic<intmax_t> atomic_intmax_t;
typedef atomic<uintmax_t> atomic_uintmax_t;
//
typedef signed char        int8_t;
typedef short              int16_t;
typedef int                int32_t;
typedef long long          int64_t;
typedef unsigned char      uint8_t;
typedef unsigned short     uint16_t;
typedef unsigned int       uint32_t;
typedef unsigned long long uint64_t;

typedef signed char        int_least8_t;
typedef short              int_least16_t;
typedef int                int_least32_t;
typedef long long          int_least64_t;
typedef unsigned char      uint_least8_t;
typedef unsigned short     uint_least16_t;
typedef unsigned int       uint_least32_t;
typedef unsigned long long uint_least64_t;

typedef signed char        int_fast8_t;
typedef int                int_fast16_t;
typedef int                int_fast32_t;
typedef long long          int_fast64_t;
typedef unsigned char      uint_fast8_t;
typedef unsigned int       uint_fast16_t;
typedef unsigned int       uint_fast32_t;
typedef unsigned long long uint_fast64_t;

typedef long long          intmax_t;
typedef unsigned long long uintmax_t;
std::atomic_int                             m_standbyIdIndex;
std::atomic<int>                            m_standbyIdIndex;

call_once&once_flag

std::once_flag m_flag;
std::call_once(m_flag, [this](){StopExecute(); });

同步队列

基本同步队列

#include <iostream>
#include <thread>  
#include <mutex>         
#include <list>
#include <condition_variable>  
#include <windows.h>

class SyncQueue
{
public:
    SyncQueue()
    {

    }

    void Push(const int& x)
    {
        {
            std::unique_lock<std::mutex> lck(m_mutex);
            m_queue.push_back(x);
        }
        m_notEmpty.notify_all();
    }

    void Pop(int& x)
    {
        std::unique_lock<std::mutex> lck(m_mutex);
        m_notEmpty.wait(lck, [this]() {
            return !m_queue.empty();
        });
        x = m_queue.front();
        m_queue.pop_front();
    }

    bool Empty()
    {
        std::lock_guard<std::mutex> lck(m_mutex);
        return m_queue.empty();
    }

    size_t Size()
    {
        std::lock_guard<std::mutex> lck(m_mutex);
        return m_queue.size();
    }

private:
    std::list<int>          m_queue;
    std::mutex              m_mutex;
    std::condition_variable m_notEmpty;
};

SyncQueue queue;

void GetData()
{
    int x = 0;
    while (queue.Empty())
    {
        queue.Pop(x);
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop " << x << std::endl;
        if (x == 0)
        {
            break;
        }
    }
    std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop End!" << std::endl;
}

void SetData()
{
    for (int i = 10; i >= 0; i--)
    {
        Sleep(1000);
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Push " << i << std::endl;
        queue.Push(i);
    }
    Sleep(500);
    std::cout << "Thread_" << std::this_thread::get_id() << "---- Push End!" << std::endl;
}

int main()
{
    std::thread thGet(GetData);
    thGet.detach();
    std::thread thSet(SetData);
    thSet.join();
    return 0;
}
//output
Thread_27072---- Push 1
Thread_26712---- Pop 1
Thread_27072---- Push 2
Thread_26712---- Pop 2
Thread_27072---- Push 3
Thread_26712---- Pop 3
Thread_27072---- Push 4
Thread_26712---- Pop 4
Thread_27072---- Push 5
Thread_26712---- Pop 5
Thread_27072---- Push 6
Thread_26712---- Pop 6
Thread_27072---- Push 7
Thread_26712---- Pop 7
Thread_27072---- Push 8
Thread_26712---- Pop 8
Thread_27072---- Push 9
Thread_26712---- Pop 9
Thread_27072---- Push 10
Thread_26712---- Pop 10
Thread_26712---- Pop End!
Thread_27072---- Push End!

带外部控制的同步队列

#include <iostream>
#include <thread>  
#include <mutex>         
#include <list>
#include <condition_variable>  
#include <windows.h>
#include <atomic>

class SyncQueue
{
public:
    SyncQueue()
    {

    }

    void Push(const int& x)
    {
        {
            std::unique_lock<std::mutex> lck(m_mutex);
            m_queue.push_back(x);
        }
        m_notEmpty.notify_all();
    }

    void Pop(int& x)
    {
        std::unique_lock<std::mutex> lck(m_mutex);
        m_notEmpty.wait(lck, [this]() {
            return !m_queue.empty();
        });
        x = m_queue.front();
        m_queue.pop_front();
    }

    bool Empty()
    {
        std::lock_guard<std::mutex> lck(m_mutex);
        return m_queue.empty();
    }

    size_t Size()
    {
        std::lock_guard<std::mutex> lck(m_mutex);
        return m_queue.size();
    }

private:
    std::list<int>          m_queue;
    std::mutex              m_mutex;
    std::condition_variable m_notEmpty;
};

SyncQueue queue;
std::atomic_bool getStop = false;

void GetData()
{
    int x = 0;
    while (queue.Empty())
    {
        if (getStop)
        {
            break;
        }
        queue.Pop(x);
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop " << x << std::endl;
    }
    std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop End!" << std::endl;
}

void SetData()
{
    for (int i = 10; i >= 0; i--)
    {
        Sleep(1000);
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Push " << i << std::endl;
        queue.Push(i);
        if (i == 5)
        {
            getStop = true;
        }
    }

    std::cout << "Thread_" << std::this_thread::get_id() << "---- Push End!" << std::endl;
}

int main()
{
    std::thread thGet(GetData);
    thGet.detach();
    std::thread thSet(SetData);
    thSet.join();
    return 0;
}
//output
Thread_29616---- Push 10
Thread_30076---- Pop 10
Thread_29616---- Push 9
Thread_30076---- Pop 9
Thread_29616---- Push 8
Thread_30076---- Pop 8
Thread_29616---- Push 7
Thread_30076---- Pop 7
Thread_29616---- Push 6
Thread_30076---- Pop 6
Thread_29616---- Push 5
Thread_30076---- Pop 5
Thread_30076---- Pop End!
Thread_29616---- Push 4
Thread_29616---- Push 3
Thread_29616---- Push 2
Thread_29616---- Push 1
Thread_29616---- Push 0
Thread_29616---- Push End!

带超时的同步队列

#include <iostream>
#include <thread>  
#include <mutex>         
#include <list>
#include <condition_variable>  
#include <windows.h>
#include <atomic>


class SyncQueue
{
public:
    SyncQueue()
    {


    }


    void Push(const int& x)
    {
        {
            std::unique_lock<std::mutex> lck(m_mutex);
            m_queue.push_back(x);
        }
        m_notEmpty.notify_all();
    }


    bool Pop(int& x)
    {
        std::unique_lock<std::mutex> lck(m_mutex);


        if (m_notEmpty.wait_for(lck, std::chrono::seconds(1), [this]() {
            return !m_queue.empty();
        }))
        {
            x = m_queue.front();
            m_queue.pop_front();
            return true;
        }
        else
        {
            return false;
        }
        
    }


    bool Empty()
    {
        std::lock_guard<std::mutex> lck(m_mutex);
        return m_queue.empty();
    }


    size_t Size()
    {
        std::lock_guard<std::mutex> lck(m_mutex);
        return m_queue.size();
    }


private:
    std::list<int>          m_queue;
    std::mutex              m_mutex;
    std::condition_variable m_notEmpty;
};


SyncQueue queue;
std::atomic_bool getStop = false;


void GetData()
{
    int x = 0;
    while (queue.Empty())
    {
        if (getStop)
        {
            break;
        }
        if (queue.Pop(x))
        {
            std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop " << x << std::endl;
        }
        else
        {
            std::cout << "Thread_" << std::this_thread::get_id() << "---- Get Data Time out" << std::endl;
        }
    }
        
    std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop End!" << std::endl;
}


void SetData()
{
    for (int i = 10; i >= 0; i--)
    {
        Sleep(100);
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Push " << i << std::endl;
        queue.Push(i);
        if (i <= 5)
        {
            Sleep(2000);
        }
    }
    getStop = true;
    Sleep(500);
    std::cout << "Thread_" << std::this_thread::get_id() << "---- Push End!" << std::endl;
}


int main()
{
    std::thread thGet(GetData);
    thGet.detach();


    std::thread thSet(SetData);
    thSet.join();


    return 0;
}

//output
Thread_18908---- Push 10
Thread_2204---- Pop 10
Thread_18908---- Push 9
Thread_2204---- Pop 9
Thread_18908---- Push 8
Thread_2204---- Pop 8
Thread_18908---- Push 7
Thread_2204---- Pop 7
Thread_18908---- Push 6
Thread_2204---- Pop 6
Thread_18908---- Push 5
Thread_2204---- Pop 5
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_18908---- Push 4
Thread_2204---- Pop 4
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_18908---- Push 3
Thread_2204---- Pop 3
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_18908---- Push 2
Thread_2204---- Pop 2
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_18908---- Push 1
Thread_2204---- Pop 1
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_18908---- Push 0
Thread_2204---- Pop 0
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_2204---- Pop End!
Thread_18908---- Push End!
上一篇 下一篇

猜你喜欢

热点阅读