C++11-互斥量&锁

2020-02-29  本文已影响0人  JasonLiThirty

互斥量

基本类型

std::mutex:独占的互斥量,不能递归使用

std::timed_mutex:有超时功能的独占互斥量,不能递归使用

std::recursive_mutex:递归互斥量,能递归使用

std::recursive_timed_mutex:有超时功能的递归互斥量

std::shared_mutex(C++14)

基本操作

lock&unlock

// C++_Mutex_Sample.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <mutex>
#include <thread>
#include <iostream>

std::mutex g_lck;

void fun()
{
    g_lck.lock();
    std::cout << "thread " << std::this_thread::get_id() << " Do something_Begin " << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "thread " << std::this_thread::get_id() << " Do something_End" << std::endl;
    g_lck.unlock();
}

int main()
{
    std::thread thA(fun);
    std::thread thB(fun);
    std::thread thC(fun);

    thA.join();
    thB.join();
    thC.join();
    return 0;
 }

std::lock_gurad包装std::mutex

// C++_Mutex_Sample.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <mutex>
#include <thread>
#include <iostream>

std::mutex g_lck;

void fun()
{
    std::lock_guard<std::mutex> lck(g_lck);
    std::cout << "thread " << std::this_thread::get_id() << " Do something_Begin " << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "thread " << std::this_thread::get_id() << " Do something_End" << std::endl;
}


int main()
{
    std::thread thA(fun);
    std::thread thB(fun);
    std::thread thC(fun);

    thA.join();
    thB.join();
    thC.join();
    return 0;
}

std::recursive_mutex(解决递归死锁问题)

//死锁出错
struct Complex
{
    std::mutex mt;
    int i;

    Complex():i(1) {}

    void mul(int x)
    {
        std::lock_guard<std::mutex> lck(mt);
        i *= x;
        std::cout << "i = " << i << std::endl;
    }

    void div(int x)
    {
        std::lock_guard<std::mutex> lck(mt);
        i /= x;
        std::cout << "i = " << i << std::endl;
    }

    void Calc(int x, int y)
    {
        std::lock_guard<std::mutex> lck(mt);
        mul(x);
        div(y);
    }
};

int main()
{
    Complex complex;
    complex.Calc(12, 4);
    return 0;
}
//解决方案1
struct Complex
{
    std::recursive_mutex mt;
    int i;

    Complex():i(1) {}

    void mul(int x)
    {
        std::lock_guard<std::recursive_mutex> lck(mt);
        i *= x;
        std::cout << "i = " << i << std::endl;
    }

    void div(int x)
    {
        std::lock_guard<std::recursive_mutex> lck(mt);
        i /= x;
        std::cout << "i = " << i << std::endl;
    }

    void Calc(int x, int y)
    {
        std::lock_guard<std::recursive_mutex> lck(mt);
        mul(x);
        div(y);
    }
};

int main()
{
    Complex complex;
    complex.Calc(12, 4);
    return 0;
}

//解决方案2
struct Complex
{
    std::mutex mt;
    int i;

    Complex():i(1) {}

    void mul(int x)
    {
        std::lock_guard<std::mutex> lck(mt);
        i *= x;
        std::cout << "i = " << i << std::endl;
    }

    void div(int x)
    {
        std::lock_guard<std::mutex> lck(mt);
        i /= x;
        std::cout << "i = " << i << std::endl;
    }

    void Calc(int x, int y)
    {
        //std::lock_guard<std::mutex> lck(mt);
        mul(x);
        div(y);
    }
};

int main()
{
    Complex complex;
    complex.Calc(12, 4);
    return 0;
}

std::timed_mutex(基本示例)

std::timed_mutex g_timeLock;

void Work()
{
    while (true)
    {
        if (g_timeLock.try_lock_for(std::chrono::milliseconds(1000)))
        {
            std::cout << "thread " << std::this_thread::get_id() << " Do something with time mutex" << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(2000));

            g_timeLock.unlock();
        }
        else
        {
            std::cout << "thread " << std::this_thread::get_id() << " Do something without mutex" << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        }
        //std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    }

}

int main()
{
    std::thread t1(Work);
    std::thread t2(Work);

    t1.join();
    t2.join();

    return 0;
}

std::timed_mutex(try_lock_for/try_lock_until)

//一直让Work函数得不到g_timeLock,观察超时时间

std::t
imed_mutex g_timeLock;
void Work()
{
    while (true)
    {
        if (g_timeLock.try_lock_for(std::chrono::milliseconds(3000)))
        //std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
        //if (g_timeLock.try_lock_until(now + std::chrono::milliseconds(3000)))
        {
            auto t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
            std::cout << "thread " << std::this_thread::get_id() << " Do something with time mutex" << std::put_time(std::localtime(&t), "%Y-%m-%d %H.%M.%S") <<std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(2000));

            g_timeLock.unlock();
        }
        else
        {
            auto t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
            std::cout << "thread " << std::this_thread::get_id() << " Do something without mutex" << std::put_time(std::localtime(&t), "%Y-%m-%d %H.%M.%S") << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        }
    }
}

int main()
{
    std::lock_guard<std::timed_mutex> lck(g_timeLock);
    std::thread t1(Work);
    t1.join();
    return 0;
}

锁(互斥量管理类)

std::lock_guard

  locking (1)           explicit lock_guard (mutex_type& m);
  adopting (2)      lock_guard (mutex_type& m, adopt_lock_t tag);
  copy [deleted](3) lock_guard (const lock_guard&) = delete;

lock_guard 对象的拷贝构造和移动构造(move construction),赋值运算符均被禁用,因此 lock_guard 对象不可被拷贝构造或移动构造。

template<class _Mutex>
    class lock_guard<_Mutex>
    {   // specialization for a single mutex
      public:
        typedef _Mutex mutex_type;
      
        explicit lock_guard(_Mutex& _Mtx)
            : _MyMutex(_Mtx)
            {   // construct and lock
            _MyMutex.lock();
            }
      
        lock_guard(_Mutex& _Mtx, adopt_lock_t)
            : _MyMutex(_Mtx)
            {   // construct but don't lock
            }
      
        ~lock_guard() _NOEXCEPT
            {   // unlock
            _MyMutex.unlock();
            }
      
        lock_guard(const lock_guard&) = delete;
        lock_guard& operator=(const lock_guard&) = delete;
      private:
        _Mutex& _MyMutex;
    };

std::unique_lock

template<class _Mutex>
    class unique_lock
    {   // whizzy class with destructor that unlocks mutex
public:
    typedef unique_lock<_Mutex> _Myt;
    typedef _Mutex mutex_type;

    // CONSTRUCT, ASSIGN, AND DESTROY
    //默认构造函数不管理任何Mutex对象
    unique_lock() _NOEXCEPT
        : _Pmtx(0), _Owns(false)
        {   // default construct
        }

    //传入Mutex对象,并尝试调用Mutex对象的lock()进行上锁
    //如果有另外的unique_lock对象已经管理了该Mutex对象,则当前线程会被阻塞
    explicit unique_lock(_Mutex& _Mtx)
        : _Pmtx(&_Mtx), _Owns(false)
        {   // construct and lock
        _Pmtx->lock();
        _Owns = true;
        }
    //传入Mutex对象,该Mutex对象已经被当前进程锁住了
    unique_lock(_Mutex& _Mtx, adopt_lock_t)
        : _Pmtx(&_Mtx), _Owns(true)
        {   // construct and assume already locked
        }
    //传入Mutex对象,不会锁住该Mutex对象
    unique_lock(_Mutex& _Mtx, defer_lock_t) _NOEXCEPT
        : _Pmtx(&_Mtx), _Owns(false)
        {   // construct but don't lock
        }
    //传入Mutex对象,并尝试调用Mutex对象的try_lock()进行上锁
    //上锁不成功,当前线程也不会阻塞
    unique_lock(_Mutex& _Mtx, try_to_lock_t)
        : _Pmtx(&_Mtx), _Owns(_Pmtx->try_lock())
        {   // construct and try to lock
        }

    //传入Mutex对象,并尝试调用Mutex对象的try_lock_for(rel_time)进行上锁
    //锁住一段时间
    template<class _Rep,
        class _Period>
        unique_lock(_Mutex& _Mtx,
            const chrono::duration<_Rep, _Period>& _Rel_time)
        : _Pmtx(&_Mtx), _Owns(_Pmtx->try_lock_for(_Rel_time))
        {   // construct and lock with timeout
        }

        }

    //传入Mutex对象,并尝试调用Mutex对象的try_lock_until(rel_time)进行上锁
    //在某个时间点前锁住
    template<class _Clock,
        class _Duration>
        unique_lock(_Mutex& _Mtx,
            const chrono::time_point<_Clock, _Duration>& _Abs_time)
        : _Pmtx(&_Mtx), _Owns(_Pmtx->try_lock_until(_Abs_time))
        {   // construct and lock with timeout
        }

    unique_lock(_Mutex& _Mtx, const xtime *_Abs_time)
        : _Pmtx(&_Mtx), _Owns(false)
        {   // try to lock until _Abs_time
        _Owns = _Pmtx->try_lock_until(_Abs_time);
        }

    unique_lock(unique_lock&& _Other) _NOEXCEPT
        : _Pmtx(_Other._Pmtx), _Owns(_Other._Owns)
        {   // destructive copy
        _Other._Pmtx = 0;
        _Other._Owns = false;
        }

    unique_lock& operator=(unique_lock&& _Other)
        {   // destructive copy
        if (this != &_Other)
            {   // different, move contents
            if (_Owns)
                _Pmtx->unlock();
            _Pmtx = _Other._Pmtx;
            _Owns = _Other._Owns;
            _Other._Pmtx = 0;
            _Other._Owns = false;
            }
        return (*this);
        }

    ~unique_lock() _NOEXCEPT
        {   // clean up
        if (_Owns)
            _Pmtx->unlock();
        }

    unique_lock(const unique_lock&) = delete;
    unique_lock& operator=(const unique_lock&) 
    ...
    };

std::unique_lock的主要成员函数

template<class _Mutex>
    class unique_lock
    {   // whizzy class with destructor that unlocks mutex
public:
    ...
    // LOCK AND UNLOCK
    void lock()
        {   // lock the mutex
        _Validate();
        _Pmtx->lock();
        _Owns = true;
        }

    bool try_lock()
        {   // try to lock the mutex
        _Validate();
        _Owns = _Pmtx->try_lock();
        return (_Owns);
        }

    template<class _Rep,
        class _Period>
        bool try_lock_for(const chrono::duration<_Rep, _Period>& _Rel_time)
        {   // try to lock mutex for _Rel_time
        _Validate();
        _Owns = _Pmtx->try_lock_for(_Rel_time);
        return (_Owns);
        }

    template<class _Clock,
        class _Duration>
        bool try_lock_until(
            const chrono::time_point<_Clock, _Duration>& _Abs_time)
        {   // try to lock mutex until _Abs_time
        _Validate();
        _Owns = _Pmtx->try_lock_until(_Abs_time);
        return (_Owns);
        }

    bool try_lock_until(const xtime *_Abs_time)
        {   // try to lock the mutex until _Abs_time
        _Validate();
        _Owns = _Pmtx->try_lock_until(_Abs_time);
        return (_Owns);
        }

    void unlock()
        {   // try to unlock the mutex
        if (!_Pmtx || !_Owns)
            _THROW_NCEE(system_error,
                _STD make_error_code(errc::operation_not_permitted));

        _Pmtx->unlock();
        _Owns = false;
        }

    // MUTATE
    void swap(unique_lock& _Other) _NOEXCEPT
        {   // swap with _Other
        _STD swap(_Pmtx, _Other._Pmtx);
        _STD swap(_Owns, _Other._Owns);
        }

    _Mutex *release() _NOEXCEPT
        {   // disconnect
        _Mutex *_Res = _Pmtx;
        _Pmtx = 0;
        _Owns = false;
        return (_Res);
        }

    // OBSERVE
    bool owns_lock() const _NOEXCEPT
        {   // return true if this object owns the lock
        return (_Owns);
        }

    explicit operator bool() const _NOEXCEPT
        {   // return true if this object owns the lock
        return (_Owns);
        }

    _Mutex *mutex() const _NOEXCEPT
        {   // return pointer to managed mutex
        return (_Pmtx);
        }
        ...
      }

std::unique_lock的使用

std::mutex mtx;

void print_thread_id(int id)
{
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
    lock.lock();
    std::cout << "thread #" << id << '\n';
    lock.unlock();
}

int main()
{
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; i++){
        threads.emplace_back(print_thread_id, i + 1);
    }
    for (auto &th : threads){
        th.join();
    }
}

//output
thread #1
thread #5
thread #3
thread #10
thread #2
thread #6
thread #7
thread #8
thread #9
thread #4
std::mutex mtx;

void print_star(int id)
{
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
    if (lock.try_lock()) {
        std::cout << "*";
    }
    else {
        std::cout << "x";
    }
    std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
int main()
{
    std::vector<std::thread> threads;

    for (int i = 0; i < 500; i++) {
        threads.emplace_back(print_star, i + 1);
    }

    for (auto &th : threads) {
        th.join();
    }
}

//output
*xxxxxxxxx*xxxxxxxxxxxx*xxxxxxxxxx*xxxxxxxxxxx*xxxxxxxxxxxxxx*xxxxxxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxx*xxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxx*xxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxx*xxxxxxxxxxxxxxx*xxxxxxxxxxxxxx*xxxxxxxxxxx*xxxxxxxxxxxx*xxxxxxxxx*xxxxxxxxx*xxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxxx*xxxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxxx*xxxxxx
std::timed_mutex mtt;

void fireworks()
{
    std::unique_lock<std::timed_mutex> lock(mtt, std::defer_lock);

    while (!lock.try_lock_for(std::chrono::microseconds(200))) {
        std::cout << "-";
    }

    std::this_thread::sleep_for(std::chrono::microseconds(1000));
    std::cout << "*\n";
}

int main()
{
    std::vector<std::thread> threads;

    for (int i = 0; i < 10; i++) {
        threads.emplace_back(fireworks);
    }

    for (auto &th : threads) {
        th.join();
    }
}
//output
*
---------*
--------*
-------------*
-----*
----*
---*
--*
-*
-*
std::timed_mutex cinderella;

void carriage() 
{
    std::unique_lock<std::timed_mutex> lock(cinderella, std::defer_lock);

    if (lock.try_lock_until(std::chrono::system_clock::now() + std::chrono::seconds(2))) {
        std::cout << "ride back home on carriage\n";
        lock.unlock();
    }
    else {
        std::cout << "carriage reverts to pumpkin\n";
    }
}

void ball()
{
    std::unique_lock<std::timed_mutex> lock(cinderella, std::defer_lock);
    lock.lock();
    std::cout << "at the ball...\n";
    std::this_thread::sleep_for(std::chrono::seconds(3));
}

int main()
{
    std::thread th1(ball);
    std::thread th2(carriage);

    th1.join();
    th2.join();
}
//output
at the ball...
carriage reverts to pumpkin
std::mutex mtx;
std::atomic<int> count;

void print_count_and_unlock(std::mutex* p_mtx)
{
    std::this_thread::sleep_for(std::chrono::microseconds(1000));
    std::cout << "count: " << count << '\n';
    p_mtx->unlock();
}

void task()
{
    std::unique_lock<std::mutex> lock(mtx);
    ++count;
    print_count_and_unlock(lock.release());
}

int main()
{
    count = 0;

    std::vector<std::thread> threads;

    for (int i = 0; i < 10; i++) {
        threads.emplace_back(task);
    }

    for (auto &th : threads) {
        th.join();
    }
}
//output
count: 1
count: 2
count: 3
count: 4
count: 5
count: 6
count: 7
count: 8
count: 9
count: 10
std::mutex mtx;

void print_star()
{
    std::unique_lock<std::mutex> lock(mtx, std::try_to_lock);

    //if (lock.owns_lock()) {
    if (lock) {
        std::cout << '*';
    }
    else {
        std::cout << 'x';
    }
    std::this_thread::sleep_for(std::chrono::microseconds(1000));
}

int main()
{
    std::vector<std::thread> threads;

    for (int i = 0; i < 500; i++) {
        threads.emplace_back(print_star);
    }

    for (auto &th : threads) {
        th.join();
    }
}
//output
*xxxxx*xxxxxxxxxxx*xxxxxxxxxxxx*xxxxxxxxxxxxxxxxx*xxxxxxxxxxxxxx*xxxxxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxxxx*xxxxxxxxxxxxx*xxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxxx*xxxxxxxxxxxxxx*xxxxxxxxxxxxxxxxxx*xxxxxxxxxxxx*xxxxxxxxx*xxxxxxxxxxxxxxx*xxxxxxxxxxxxxxx*xxxxxxxxx*xxxxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxxx*xxxxxxxxxxxx*xxxxxxxxxxxxx*xxxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxxxxxxxxxxxxx*xxxxx*xxxxxxxxxxx*xxxxxxxxxxxxxx
class MyMutex : public std::mutex {
    int m_id;
public:
    MyMutex(int id) :m_id(id) 
    {

    }

    int id()
    {
        return m_id;
    }
};

MyMutex mtx(100);

void print_ids(int id)
{
    std::unique_lock<MyMutex> lock(mtx);
    std::cout << "thread #" << id << " locked mutex " << lock.mutex()->id() << '\n';
    std::this_thread::sleep_for(std::chrono::microseconds(1000));
}


int main()
{
    std::vector<std::thread> threads;

    for (int i = 0; i < 10; i++) {
        threads.emplace_back(print_ids, i+1);
    }

    for (auto &th : threads) {
        th.join();
    }
}
//output
thread #1 locked mutex 100
thread #2 locked mutex 100
thread #3 locked mutex 100
thread #4 locked mutex 100
thread #5 locked mutex 100
thread #6 locked mutex 100
thread #7 locked mutex 100
thread #8 locked mutex 100
thread #9 locked mutex 100
thread #10 locked mutex 100

std::shared_lock

typedef std::shared_lock<std::shared_mutex> ReadLock;
typedef std::lock_guard<std::shared_mutex> WriteLock;
int ReconCoordinator::SearchPort(int jobId)
{
    int port = 0;
    ReadLock lck(m_mtConn);


    auto it = m_jobIdAndPort.find(jobId);
    if (it != m_jobIdAndPort.end()) {
        port = it->second;
    }
    return port;
}
void ReconCoordinator::FlushConn()
{
    int jobId = 0;
    bool flushFlag = false;
    {
        std::lock_guard<std::mutex> erupLock(m_mtErup);
        ReadLock lck(m_mtConn);
        for (auto it = m_jobIdAndConn.begin(); it != m_jobIdAndConn.end(); it++) {
            try {
                std::shared_ptr<ErupProcServiceClient> procClient = it->second->GetSrvClient<ErupProcServiceClient>();
                procClient->Alive();    
            }
            catch (TException&) {
                flushFlag = true;
                jobId = it->first;
                break;
            }
        }
    }


    if (flushFlag) {
        if (jobId > ERUP_STANDBY_ID_MAX) {
            rcLog.LOG_WARN("ReconCoordinator, Zombie erup conn flush, jobId: %d", jobId);
        }
        else {
            rcLog.LOG_WARN("ReconCoordinator, Buffered zombie erup conn flush");
        }


        DestroyProcess(jobId);
        NotifyError(jobId);
    }
}
void ReconCoordinator::ClearPortInfo(int jobId)
{
    WriteLock lck(m_mtInfo);
    m_requiredInfo->ClearProtInfo(jobId);
}
void ReconCoordinator::InitFailed(int jobId)
{
    //rcLog.LOG_WARN("Erup process initialize failed, jobId: %d", jobId);
    if (jobId <= ERUP_STANDBY_ID_MAX) 
    {
        {
            WriteLock lck(m_mtConn);
            m_jobIdAndPort.erase(jobId);
        }
        {
            WriteLock lck(m_mtState);
            m_jobIdAndState.erase(jobId);
        }
        {
            std::lock_guard<std::mutex> lck(m_mtProc);
            auto it = m_jobIdAndProcess.find(jobId);
            if (it == m_jobIdAndProcess.end()) {
                rcLog.LOG_INFO("Erup process not exists in NotifyInitFailed, jobId: %d", jobId);
            }
            else {
                ::TerminateProcess(it->second, 0);
                ::CloseHandle(it->second);
                m_jobIdAndProcess.erase(it);
            }
        }
    }
}

加锁策略参数

策略 tag type 描述
(默认) 请求锁,阻塞当前线程直到成功获得锁。
std::defer_lock std::defer_lock_t 不请求锁。
std::try_to_lock std::try_to_lock_t 尝试请求锁,但不阻塞线程,锁不可用时也会立即返回。
std::adopt_lock std::adopt_lock_t 假定当前线程已经获得互斥对象的所有权,所以不再请求锁。
策略 描述 std::lock_guard std::unique_lock std::shared_lock
(默认) 请求锁,阻塞当前线程直到成功获得锁。 √(共享)
std::defer_lock 不请求锁。 ×
std::try_to_lock 尝试请求锁,但不阻塞线程,锁不可用时也会立即返回。 ×
std::adopt_lock 假定当前线程已经获得互斥对象的所有权,所以不再请求锁。

官方解释:https://zh.cppreference.com/w/cpp/thread/lock_tag

原子锁操作

std::lock(mt1, mt2)
上一篇 下一篇

猜你喜欢

热点阅读