C++11:并发

2020-02-26  本文已影响0人  fck_13

线程

多线程的优缺点:
优点:轻量的进程 ,线程间的通讯更迅速
缺点:不好实现,不能运行在分布式系统上

一个线程的简单的例子:

#include <iostream>
#include <thread>
using namespace std;

void function_1(){
  cout<<"Beauty is only skin-deep"<<endl;
}

int main(){
  thread t1(function_1);  
  t1.join();               
  return 0;
}

thread t1(function_1) 声明了一个线程,同时启动了该线程,该线程开始执行function_1函数。

接着我们调用了join(),该函数会将主线程阻塞,直到子线程结束后,主线程才会继续执行。与此作用相反的一个函数是detach,这个函数会使线程成为一个守护线程,主线程不用在乎子线程是否执行结束,会直接往下运行。每个线程只能detach或者join一次。我们可以通过joinable函数来判断该线程是否可以执行join函数。

如果子线程使用了主线程的资源,例如在创建子线程的,会以引用的方式来使用主线程的数据,主线程要保证这些数据活的够久。

class Fctor{
  public:
  void operator()(){
    for(int ii = 0; i>-100; i++){
      cout<<"from t1: "<<i<<endl;
    }
  }
};

class Foo{
public:
  void print(){
    //do something
  }
};

thread t1(function_1);
void function_1(){
  cout<<"Beauty is only skin-deep"<<endl;
}
  
thread t1(function_1);  //调用普通函数
Factor f;
thread t2(f);           //调用仿函数
Foo f
thread t3(&Foo::print, &f);  //调用成员函数

thread的构造函数的第一个参数是callable对象,函数指针,functor或者lambda表达式。

如果我们这样声明thread t1(Fctor());,是错误的,编译器会认为这是一个返回值为thread类型,函数名叫做t1,并且其参数为Fctor的函数声明。解决方法是改写为thread t1((Fctor()));

如果想向thread传入参数的话,

class Fctor{
  public:
  void operator()(string msg){
    for(int ii = 0; i>-100; i++){
      cout<<"from t1: "<<i<<endl;
    }
  }
};

int main(){
  string s = "Where there is no trust, there is no love";
  std::thread t1((Fctor()), s);   // 值传递
  //std::thread t1((Fctor()), std::ref(s)); //引用传递
  cout<<"From main: "<<s<<endl;
  t1.join();
  return 0;
}

thread 不能被复制,只能被move

thread t2 = t1;// error
thread t2 = std::move(t1);//right

获取线程id

//在线程外获取某个线程的id
thread t1(func);
t1.get_id();

//在线程内获取当前线程的id
std::this_thread::get_id();

我们应该建立的线程数

std::thread::hardware_concurrency();

data race and mutex

首先看一个代码示例:

#include <thread>
#include <string>
using namespace std;

void function_1(){
  for(int i = 0; i>-1000; i--){
    cout<<"From t1: "<<i<<endl;
  }
}

int main(){
  thread t1(function_1);
  
  for(int i = 0; i<1000; i++){
    cout<<"From main: "<<i<<endl;
  }
  
  t1.join();
  
  return 0;
}

一部分执行结果如下:

屏幕快照 2020-02-24 下午8.33.49.png

我们可以从结果看出,两个线程交替的输出结果。并且,在一个线程的输出操作还未结束时,就开始了另一个线程的输出操作。两个线程都需要访问标准输入输出(cout),这就导致了两个线程间的data race。

为了解决这个问题,使两个线程能够正常的打印出结果,我们引入mutex(互斥量)。上述的黛娜可以修改为

#include <thread>
#include <string>
#include <iostream>
#include <mutex>
using namespace std;

mutex mu;

void shared_print(string msg, int i){
    mu.lock();
  cout<<msg<<id<<endl;
  mu.unlock();
}

void function_1(){
  for(int i = 0; i>-1000; i--){
    shared_print("From t1: ", i);
  }
}

int main(){
  thread t1(function_1);
  
  for(int i = 0; i<1000; i++){
    shared_print("From main: ", i);
  }
  
  t1.join();
  return 0;
}

不会再出现上文中的状况了。这就是mutex的作用。

mu的lockunlock函数需要成对出现,但是我们会经常忘记调用unlock,借助C++的RAII机制,标准库给我门提供了一个类,std::lock_guard,上面的shared_print函数可以被修改为

void shared_print(string msg, int i){
  std::lock_guard<mutex> guard(mu);
  cout<<msg<<i<<endl;
}

这里还存在一个问题,就是cout是全局的,其他的线程仍然可以直接调用cout,还是存在data race。在一个系统中,我们应该规定大家使用统一的函数来调用需要被互斥量保护的资源。

死锁 dead lock

#include <thread>
#include <string>
#include <iostream>
#include <mutex>
using namespace std;

class LogFile{
public:
  LogFile(){}

  void shared_print1(string msg, int i){
    std::lock_guard<mutex> guard1(mu1);
    std::lock_guard<mutex> guard2(mu2);
    cout<<msg<<i<<endl;
  }

  void shared_print2(string msg, int i){
    std::lock_guard<mutex> guard2(mu2);
    std::lock_guard<mutex> guard1(mu1);
    cout<<msg<<i<<endl;
  }
private:
  mutex mu1;
  mutex mu2;
};

void function_1(LogFile& log){
  for(int i = 0; i>-1000; i--){
    log.shared_print1("From t1: ", i);
  }
}

int main(){
  LogFile log;
  thread t1(function_1, std::ref(log));
  
  for(int i = 0; i<1000; i++){
    log.shared_print2("From main: ", i);
  }
  
  t1.join();
  
  return 0;
}

结果运行如下:

屏幕快照 2020-02-24 下午9.08.06.png

我们可以看到,代码在输出完From main: 428之后卡死了,这就是发生了死锁。

为了避免死锁,我们希望所有的互斥量都能够以相同的顺序被调用。

标准库提供了std::lock函数来帮助我们同时锁住多个mutex,上面的代码就可以被改写为

class LogFile{
public:
  LogFile(){}

  void shared_print1(string msg, int i){
    std::lock(mu1, mu2);
    std::lock_guard<mutex> guard1(mu1, std::adopt_lock);
    std::lock_guard<mutex> guard2(mu2, std::adopt_lock);
    cout<<msg<<i<<endl;
  }

  void shared_print2(string msg, int i){
    std::lock(mu1, mu2);
    std::lock_guard<mutex> guard1(mu1, std::adopt_lock);
    std::lock_guard<mutex> guard2(mu2, std::adopt_lock);
    cout<<msg<<i<<endl;
  }
private:
  mutex mu1;
  mutex mu2;
};

参数std::adopt_lock的含义是将lock这件事转交给std::lock来做,unlockguard来做,其实就是将mutex的所有权转交了一下。

为了避免死锁,我们应该做到:

  1. 尽量只lock一个mutex;
  2. 避免lock一个mutex后再调用一个用户提供的函数;
  3. 使用std::lock来lock多个mutex;
  4. 以同样的顺序来lock 多个mutex;

unique_lock lazy initialization

class LogFile{
  std::mutex _mu;
  std::ofstream _f;
public:
  LogFile(){
    _f.open("log.txt");
  }
  
  void shared_printf(string id, int value){
    std::unique_lock<mutex> locker(_mu, std::defer);
    
    //do something else
    locker.lock();
    _f<<"From "<<id<<": "<<value<<std::endl;
    locker.unlock();
    
    locker.lock();
    //do something
    locker.unlock();
  }
};

std::unique_lock 的作用跟std::lock_guard类似,但是更加的灵活。std::unique_lock可以上锁解锁多次,还可以使用std::defer在适当的时候调用lock函数来上锁,而不是在构造函数中直接上锁。

std::unique_lock 可以被move,但是lock_guard 不行。

std::unique_lock<mutex> locker2 = std::move(locker);

更灵活意味着std::unique_lock 比 std::lock_guard 复杂,在某些简单的场景下,可能后者是更好的选择。

在上面的例子中,存在下面这种情况,就是我们可能从始至终都没有用到shared_print,但是我们却已经将文件打开了,此时打开文件不是必须的。

对于这种情况,我们的解决方案是lazy initialization。我们可以将上面的代码改为

class LogFile{
  std::mutex _mu;
  std::ofstream _f;
  std::once_flag _flag;
public:
  LogFile(){}
  
  void shared_printf(string id, int value){
    std::call_once(_flag, [&]{_f.open("log.txt");});
    std::unique_lock<mutex> locker(_mu, std::defer);
    
    //do something else
    locker.lock();
    _f<<"From "<<id<<": "<<value<<std::endl;
    locker.unlock();
  }
};

这里我们把对log文件的打开操作放到了shared_print中来,只有当真正的需要log文件的时候,它才会被打开。但是如果我们使用

if(!_f.is_open()){
  _f.open("log.txt");
}

来打开文件,会有多线程重入,多次打开文件的风险,显然是线程不安全的,即便我们对其加锁进行保护

if(!_f.is_open()){
  std::lock_guard<mutex> another_guard(another_mutex);
  _f.open("log.txt");
}

这样仍然是线程不安全的。

正确的做法是使用标准库提供的std::call_once 函数,这个函数所做的正如它的名字一样,只会调用一次,就避免了多线程重入的问题。

condition variable

首先我们来看一下经典的生产者消费者模型:

std::deque<int> q;
std::mutex mu;

std::condition_variable cond;

void produce(){
  int count = 10;
  while(count>0){
    std::unique_lock<mutex> locker(mu);
    q.push_front(count);
    locker.unlock();
    cond.notify_one();
    std::this_thread::sleep_for(chrono::seconds(1));
    count--;
  }
}

void consume(){
  int data = 0;
  while(data!=1){
    std::unique_lock<mutex> locker(mu);
    cond.wait(locker);
    data = q.back();
    q.pop_back();
    locker.unlock();
    cout<<"t2 got a value from t1: "<<data<<endl;
  }
}

int main(){
  std::thread t1(produce);
  std::thread t2(consume);
  t1.join();
  t2.join();
  return 0;
}

这里我们使用条件变量来让producer和consumer跑起来,当produce往q里放入一个数据后,条件变量就会调用notify_one,用来通知一个线程来进行操作。在consume中,执行到cond.wait(locker)的时候,线程进入sleep状态,直到notify_one将其唤醒,进行操作。这样就保证了线程安全。

实际上,即便cond.wait使线程2进入睡眠状态,线程2还是还是会被非条件变量的notify函数唤醒,这种唤醒称之为虚假唤醒(spurious wake)。为了避免这种情况,我们将consume中的函数修改为

cond.wait(locker, []{return !q.empty();})

如果 q 为空,t2会返回睡眠状态,非空的话就会继续往下执行。

std::notify_one 只会唤醒一个线程,如果想要唤醒多个线程的话,请使用notify_all。

Future and promise

#include <future>

void factorial(int N){
  int res = 1;
  for(int i = N; i>1; i--){
    res*=i;
  }
  std::cout<<"Result is: "<<res<<std::endl;
}

int main(){
  std::thread t1(factorial, 4);
  t1.join();
  return 0;
}

上述代码只能将线程执行的结果打印在标准输出上,如果我们想在线程执行结束的时候获取到这个值,我们有下面几种方法:

#include <future>

void factorial(int N, int& x){
  int res = 1;
  for(int i = N; i>1; i--){
    res*=i;
  }
  x = res;
}

int main(){
  int x = 0;
  std::thread t1(factorial, 4, std::ref(x));
  t1.join();
  return 0;
}

这样虽然能够达到我们的目的,但是在多线程的情况下要使用互斥量和锁来保证线程安全。

#include <future>

int factorial(int N){
  int res = 1;
  for(int i = N; i>1; i--){
    res*=i;
  }
  return res;
}

int main(){
  int x = 0;
  std::future<int> fu = std::async(std::launch::async, factorial, 4);
  x = fu.get();
  return 0;
}

std::async会启动一个线程来执行factorial函数,并将返回的结果放到future中。当我们执行了fu.get() 之后,如果再次执行的话,会导致程序崩溃。

我们来稍微详细的讲一下std::async的第一个参数,实际上,这个参数的取值有四种

(1)不使用该参数,等同于下面的第四种

(2)该参数取值为std::launch::async,则std::async会创建一个新的线程来执行factorial函数。

(3)该参数的取值为std::launch::deferred,则std::async不会创建新的线程,而是lazy evaluation。也就是说,只有当fu.get()被执行的时候,factorial才会被执行。

(4)该参数的取值为 std::launch::async | std::launch::deferred,它的行为可能是第二种情况,也可能是第三种情况,这是实现相关的。

接下来是std::promise,其中文意思为承诺,是对谁的承诺呢,当然是对未来(std::future)的承诺。

#include <future>

int factorial(std::future<int>& f){
  int res = 1;
  int N = f.get();
  for(int i = N; i>1; i--){
    res*=i;
  }
  return res;
}

int main(){
  int x = 0;
  std::promise<int> p;
  std::future<int> f = p.get_future();
  std::future<int> fu = std::async(std::launch::async, factorial, std::ref(f));
  p.set_value(4);
  x = fu.get();
  return 0;
}

在main函数中,我们先给了一个承诺,这个承诺是对future f的,然后我们在需要的时候给promise一个值,这个值就会被future获取到。

这个例子中,promise和future的用法其实是为了从主线程向子线程中传递参数。

传入到自线程中的future,我们使用的是以引用的方式传值,这是因为future是不可以拷贝的,只能move。

但是,会有这样一种情况,就是多个线程都会使用同样的future的值,这时候难道我们需要声明多个含有同样值的future么?标准库给出了std::shared_future来解决这个问题,代码如下:

#include <future>

int factorial(std::shared_future<int> sf){
  int res = 1;
  int N = sf.get();
  for(int i = N; i>1; i--){
    res*=i;
  }
  return res;
}

int main(){
  int x = 0;
  std::promise<int> p;
  std::future<int> f = p.get_future();
  std::shared_future<int> sf = f.share();
  std::future<int> fu1 = std::async(std::launch::async, factorial, sf);
  std::future<int> fu2 = std::async(std::launch::async, factorial, sf);
  std::future<int> fu3 = std::async(std::launch::async, factorial, sf);
  std::future<int> fu4 = std::async(std::launch::async, factorial, sf);
  p.set_value(4);
  return 0;
}

从上面的代码中我们就能够看出,shared_future是可以拷贝的。

packaged_task

Packaged task就是对可调用的对象进行了封装,然后使得这些可调用对象异步的执行:

int factorial(int N){
  int res = 1;
  for(int i = N; i>1; i--){
    res*=i;
  }
  cout<<"Result is : "<<res<<endl;
  return res;
}

std::deque<std::packaged_task<int()> >task_q;
void thread_1(){
  std::packaged_task<int()> t;
  t = std::move(task_q.front());
  t();
}

int main(){
  std::thread t1(thread_1);
  std::packaged_task<int()> t(std::bind(factorial, 6));
  std::future<int> fu = t.get_future();
  task_q.push_back(t);
  
  cout<<fu.get()<<endl;
  
  t1.join();
  return 0;
}

上述代码中,通过packaged_task 对factorial 进行封装,并使其在子线程中运行,并通过future获取到运行结果。

packaged_task 的不同之处在于,线程构建了就会开始进行,而packaged_task可以等到我们需要的时候再去执行。

NOTE:上述的代码只是示例packaged_task的用法,并没有对data race进行处理。

上一篇 下一篇

猜你喜欢

热点阅读