C++11:并发
线程
多线程的优缺点:
优点:轻量的进程 ,线程间的通讯更迅速
缺点:不好实现,不能运行在分布式系统上
一个线程的简单的例子:
#include <iostream>
#include <thread>
using namespace std;
void function_1(){
cout<<"Beauty is only skin-deep"<<endl;
}
int main(){
thread t1(function_1);
t1.join();
return 0;
}
thread t1(function_1)
声明了一个线程,同时启动了该线程,该线程开始执行function_1
函数。
接着我们调用了join()
,该函数会将主线程阻塞,直到子线程结束后,主线程才会继续执行。与此作用相反的一个函数是detach
,这个函数会使线程成为一个守护线程,主线程不用在乎子线程是否执行结束,会直接往下运行。每个线程只能detach
或者join
一次。我们可以通过joinable
函数来判断该线程是否可以执行join
函数。
如果子线程使用了主线程的资源,例如在创建子线程的,会以引用的方式来使用主线程的数据,主线程要保证这些数据活的够久。
class Fctor{
public:
void operator()(){
for(int ii = 0; i>-100; i++){
cout<<"from t1: "<<i<<endl;
}
}
};
class Foo{
public:
void print(){
//do something
}
};
thread t1(function_1);
void function_1(){
cout<<"Beauty is only skin-deep"<<endl;
}
thread t1(function_1); //调用普通函数
Factor f;
thread t2(f); //调用仿函数
Foo f
thread t3(&Foo::print, &f); //调用成员函数
thread
的构造函数的第一个参数是callable
对象,函数指针,functor
或者lambda
表达式。
如果我们这样声明thread t1(Fctor());
,是错误的,编译器会认为这是一个返回值为thread
类型,函数名叫做t1
,并且其参数为Fctor
的函数声明。解决方法是改写为thread t1((Fctor()));
。
如果想向thread
传入参数的话,
class Fctor{
public:
void operator()(string msg){
for(int ii = 0; i>-100; i++){
cout<<"from t1: "<<i<<endl;
}
}
};
int main(){
string s = "Where there is no trust, there is no love";
std::thread t1((Fctor()), s); // 值传递
//std::thread t1((Fctor()), std::ref(s)); //引用传递
cout<<"From main: "<<s<<endl;
t1.join();
return 0;
}
thread
不能被复制,只能被move
。
thread t2 = t1;// error
thread t2 = std::move(t1);//right
获取线程id
//在线程外获取某个线程的id
thread t1(func);
t1.get_id();
//在线程内获取当前线程的id
std::this_thread::get_id();
我们应该建立的线程数
std::thread::hardware_concurrency();
data race and mutex
首先看一个代码示例:
#include <thread>
#include <string>
using namespace std;
void function_1(){
for(int i = 0; i>-1000; i--){
cout<<"From t1: "<<i<<endl;
}
}
int main(){
thread t1(function_1);
for(int i = 0; i<1000; i++){
cout<<"From main: "<<i<<endl;
}
t1.join();
return 0;
}
一部分执行结果如下:
屏幕快照 2020-02-24 下午8.33.49.png我们可以从结果看出,两个线程交替的输出结果。并且,在一个线程的输出操作还未结束时,就开始了另一个线程的输出操作。两个线程都需要访问标准输入输出(cout)
,这就导致了两个线程间的data race。
为了解决这个问题,使两个线程能够正常的打印出结果,我们引入mutex
(互斥量)。上述的黛娜可以修改为
#include <thread>
#include <string>
#include <iostream>
#include <mutex>
using namespace std;
mutex mu;
void shared_print(string msg, int i){
mu.lock();
cout<<msg<<id<<endl;
mu.unlock();
}
void function_1(){
for(int i = 0; i>-1000; i--){
shared_print("From t1: ", i);
}
}
int main(){
thread t1(function_1);
for(int i = 0; i<1000; i++){
shared_print("From main: ", i);
}
t1.join();
return 0;
}
不会再出现上文中的状况了。这就是mutex
的作用。
mu的lock
和unlock
函数需要成对出现,但是我们会经常忘记调用unlock
,借助C++的RAII机制,标准库给我门提供了一个类,std::lock_guard
,上面的shared_print
函数可以被修改为
void shared_print(string msg, int i){
std::lock_guard<mutex> guard(mu);
cout<<msg<<i<<endl;
}
这里还存在一个问题,就是cout
是全局的,其他的线程仍然可以直接调用cout
,还是存在data race。在一个系统中,我们应该规定大家使用统一的函数来调用需要被互斥量保护的资源。
死锁 dead lock
#include <thread>
#include <string>
#include <iostream>
#include <mutex>
using namespace std;
class LogFile{
public:
LogFile(){}
void shared_print1(string msg, int i){
std::lock_guard<mutex> guard1(mu1);
std::lock_guard<mutex> guard2(mu2);
cout<<msg<<i<<endl;
}
void shared_print2(string msg, int i){
std::lock_guard<mutex> guard2(mu2);
std::lock_guard<mutex> guard1(mu1);
cout<<msg<<i<<endl;
}
private:
mutex mu1;
mutex mu2;
};
void function_1(LogFile& log){
for(int i = 0; i>-1000; i--){
log.shared_print1("From t1: ", i);
}
}
int main(){
LogFile log;
thread t1(function_1, std::ref(log));
for(int i = 0; i<1000; i++){
log.shared_print2("From main: ", i);
}
t1.join();
return 0;
}
结果运行如下:
屏幕快照 2020-02-24 下午9.08.06.png我们可以看到,代码在输出完From main: 428
之后卡死了,这就是发生了死锁。
为了避免死锁,我们希望所有的互斥量都能够以相同的顺序被调用。
标准库提供了std::lock
函数来帮助我们同时锁住多个mutex,上面的代码就可以被改写为
class LogFile{
public:
LogFile(){}
void shared_print1(string msg, int i){
std::lock(mu1, mu2);
std::lock_guard<mutex> guard1(mu1, std::adopt_lock);
std::lock_guard<mutex> guard2(mu2, std::adopt_lock);
cout<<msg<<i<<endl;
}
void shared_print2(string msg, int i){
std::lock(mu1, mu2);
std::lock_guard<mutex> guard1(mu1, std::adopt_lock);
std::lock_guard<mutex> guard2(mu2, std::adopt_lock);
cout<<msg<<i<<endl;
}
private:
mutex mu1;
mutex mu2;
};
参数std::adopt_lock
的含义是将lock
这件事转交给std::lock
来做,unlock
由guard
来做,其实就是将mutex的所有权转交了一下。
为了避免死锁,我们应该做到:
- 尽量只lock一个mutex;
- 避免lock一个mutex后再调用一个用户提供的函数;
- 使用std::lock来lock多个mutex;
- 以同样的顺序来lock 多个mutex;
unique_lock lazy initialization
class LogFile{
std::mutex _mu;
std::ofstream _f;
public:
LogFile(){
_f.open("log.txt");
}
void shared_printf(string id, int value){
std::unique_lock<mutex> locker(_mu, std::defer);
//do something else
locker.lock();
_f<<"From "<<id<<": "<<value<<std::endl;
locker.unlock();
locker.lock();
//do something
locker.unlock();
}
};
std::unique_lock
的作用跟std::lock_guard
类似,但是更加的灵活。std::unique_lock
可以上锁解锁多次,还可以使用std::defer
在适当的时候调用lock函数来上锁,而不是在构造函数中直接上锁。
std::unique_lock
可以被move,但是lock_guard 不行。
std::unique_lock<mutex> locker2 = std::move(locker);
更灵活意味着std::unique_lock 比 std::lock_guard 复杂,在某些简单的场景下,可能后者是更好的选择。
在上面的例子中,存在下面这种情况,就是我们可能从始至终都没有用到shared_print,但是我们却已经将文件打开了,此时打开文件不是必须的。
对于这种情况,我们的解决方案是lazy initialization。我们可以将上面的代码改为
class LogFile{
std::mutex _mu;
std::ofstream _f;
std::once_flag _flag;
public:
LogFile(){}
void shared_printf(string id, int value){
std::call_once(_flag, [&]{_f.open("log.txt");});
std::unique_lock<mutex> locker(_mu, std::defer);
//do something else
locker.lock();
_f<<"From "<<id<<": "<<value<<std::endl;
locker.unlock();
}
};
这里我们把对log文件的打开操作放到了shared_print中来,只有当真正的需要log文件的时候,它才会被打开。但是如果我们使用
if(!_f.is_open()){
_f.open("log.txt");
}
来打开文件,会有多线程重入,多次打开文件的风险,显然是线程不安全的,即便我们对其加锁进行保护
if(!_f.is_open()){
std::lock_guard<mutex> another_guard(another_mutex);
_f.open("log.txt");
}
这样仍然是线程不安全的。
正确的做法是使用标准库提供的std::call_once 函数,这个函数所做的正如它的名字一样,只会调用一次,就避免了多线程重入的问题。
condition variable
首先我们来看一下经典的生产者消费者模型:
std::deque<int> q;
std::mutex mu;
std::condition_variable cond;
void produce(){
int count = 10;
while(count>0){
std::unique_lock<mutex> locker(mu);
q.push_front(count);
locker.unlock();
cond.notify_one();
std::this_thread::sleep_for(chrono::seconds(1));
count--;
}
}
void consume(){
int data = 0;
while(data!=1){
std::unique_lock<mutex> locker(mu);
cond.wait(locker);
data = q.back();
q.pop_back();
locker.unlock();
cout<<"t2 got a value from t1: "<<data<<endl;
}
}
int main(){
std::thread t1(produce);
std::thread t2(consume);
t1.join();
t2.join();
return 0;
}
这里我们使用条件变量来让producer和consumer跑起来,当produce往q里放入一个数据后,条件变量就会调用notify_one,用来通知一个线程来进行操作。在consume中,执行到cond.wait(locker)的时候,线程进入sleep状态,直到notify_one将其唤醒,进行操作。这样就保证了线程安全。
实际上,即便cond.wait使线程2进入睡眠状态,线程2还是还是会被非条件变量的notify函数唤醒,这种唤醒称之为虚假唤醒(spurious wake)。为了避免这种情况,我们将consume中的函数修改为
cond.wait(locker, []{return !q.empty();})
如果 q 为空,t2会返回睡眠状态,非空的话就会继续往下执行。
std::notify_one 只会唤醒一个线程,如果想要唤醒多个线程的话,请使用notify_all。
Future and promise
#include <future>
void factorial(int N){
int res = 1;
for(int i = N; i>1; i--){
res*=i;
}
std::cout<<"Result is: "<<res<<std::endl;
}
int main(){
std::thread t1(factorial, 4);
t1.join();
return 0;
}
上述代码只能将线程执行的结果打印在标准输出上,如果我们想在线程执行结束的时候获取到这个值,我们有下面几种方法:
- 使用传出参数来返回结果,代码修改为
#include <future>
void factorial(int N, int& x){
int res = 1;
for(int i = N; i>1; i--){
res*=i;
}
x = res;
}
int main(){
int x = 0;
std::thread t1(factorial, 4, std::ref(x));
t1.join();
return 0;
}
这样虽然能够达到我们的目的,但是在多线程的情况下要使用互斥量和锁来保证线程安全。
- 使用std::future来存储运行结果,代码可修改为
#include <future>
int factorial(int N){
int res = 1;
for(int i = N; i>1; i--){
res*=i;
}
return res;
}
int main(){
int x = 0;
std::future<int> fu = std::async(std::launch::async, factorial, 4);
x = fu.get();
return 0;
}
std::async会启动一个线程来执行factorial函数,并将返回的结果放到future中。当我们执行了fu.get() 之后,如果再次执行的话,会导致程序崩溃。
我们来稍微详细的讲一下std::async的第一个参数,实际上,这个参数的取值有四种
(1)不使用该参数,等同于下面的第四种
(2)该参数取值为std::launch::async,则std::async会创建一个新的线程来执行factorial函数。
(3)该参数的取值为std::launch::deferred,则std::async不会创建新的线程,而是lazy evaluation。也就是说,只有当fu.get()被执行的时候,factorial才会被执行。
(4)该参数的取值为 std::launch::async | std::launch::deferred,它的行为可能是第二种情况,也可能是第三种情况,这是实现相关的。
接下来是std::promise,其中文意思为承诺,是对谁的承诺呢,当然是对未来(std::future)的承诺。
#include <future>
int factorial(std::future<int>& f){
int res = 1;
int N = f.get();
for(int i = N; i>1; i--){
res*=i;
}
return res;
}
int main(){
int x = 0;
std::promise<int> p;
std::future<int> f = p.get_future();
std::future<int> fu = std::async(std::launch::async, factorial, std::ref(f));
p.set_value(4);
x = fu.get();
return 0;
}
在main函数中,我们先给了一个承诺,这个承诺是对future f的,然后我们在需要的时候给promise一个值,这个值就会被future获取到。
这个例子中,promise和future的用法其实是为了从主线程向子线程中传递参数。
传入到自线程中的future,我们使用的是以引用的方式传值,这是因为future是不可以拷贝的,只能move。
但是,会有这样一种情况,就是多个线程都会使用同样的future的值,这时候难道我们需要声明多个含有同样值的future么?标准库给出了std::shared_future来解决这个问题,代码如下:
#include <future>
int factorial(std::shared_future<int> sf){
int res = 1;
int N = sf.get();
for(int i = N; i>1; i--){
res*=i;
}
return res;
}
int main(){
int x = 0;
std::promise<int> p;
std::future<int> f = p.get_future();
std::shared_future<int> sf = f.share();
std::future<int> fu1 = std::async(std::launch::async, factorial, sf);
std::future<int> fu2 = std::async(std::launch::async, factorial, sf);
std::future<int> fu3 = std::async(std::launch::async, factorial, sf);
std::future<int> fu4 = std::async(std::launch::async, factorial, sf);
p.set_value(4);
return 0;
}
从上面的代码中我们就能够看出,shared_future是可以拷贝的。
packaged_task
Packaged task就是对可调用的对象进行了封装,然后使得这些可调用对象异步的执行:
int factorial(int N){
int res = 1;
for(int i = N; i>1; i--){
res*=i;
}
cout<<"Result is : "<<res<<endl;
return res;
}
std::deque<std::packaged_task<int()> >task_q;
void thread_1(){
std::packaged_task<int()> t;
t = std::move(task_q.front());
t();
}
int main(){
std::thread t1(thread_1);
std::packaged_task<int()> t(std::bind(factorial, 6));
std::future<int> fu = t.get_future();
task_q.push_back(t);
cout<<fu.get()<<endl;
t1.join();
return 0;
}
上述代码中,通过packaged_task 对factorial 进行封装,并使其在子线程中运行,并通过future获取到运行结果。
packaged_task 的不同之处在于,线程构建了就会开始进行,而packaged_task可以等到我们需要的时候再去执行。
NOTE:上述的代码只是示例packaged_task的用法,并没有对data race进行处理。