boost Strands Internal
我对boost.asio的使用不深入,所以总是过了一段时间就会对它的理解上又变得模糊起来。
还是先回头看看boost.asio的一些基础知识。
ASIO攻破!!!这篇文章有一个蛮不错的比喻:
其实io_service就像是劳工中介所,而一个线程就是一个劳工,调用post的模块相当于富人们,他们去中介所委托任务,而劳工们就听候中介所的调遣去执行这些任务,任务的内容就写在富人们给你的handler上,也就是函数指针,指针指向具体实现就是任务的实质内容。其实在整个过程中,富人们都不知道是哪个劳工帮他们做的工作,只知道是中介所负责完成这些就可以了。这使得逻辑上的耦合降到了最低。不过这样的比喻也有个不恰当的地方,如果硬要这样比喻的话,我只能说:其实劳工里面也有很多富人的o! 。很多劳工在完成任务的过程中自己也托给中介所一些任务,然后这些任务很可能还是自己去完成。这也难怪,运行代码的总是这些线程,那么调用post的肯定也会有这些线程了,不过不管怎么说,如此循环往复可以解决问题就行。
多说一句,允许有多个劳工,也就允许有多个中介所。换句话说,你可以实现1+n模式(让一个中介所为多个劳工服务),也可以实现为n*(1+1)模式(一个中介所只为一个劳工服务,有多个这样的搭配),当然还可以实现为n+m模式(多个中介所共同为多个劳工服务)。
对于n*(1+1)模式,其实相当于对那一部分数据实现了单线程处理模型,类似于一个spsc队列。这里暂不讨论。
对于1+n模式和n+m模式,情况会复杂一些。
把boost asio io_service与 strand 分析 一文的例子拿来用,他采用了n+m模式(2+3)来验证strand的作用。这种模式也就是官方文档中说的线程池模型:
Multiple threads may call the run()
function to set up a pool of threads from which the io_service
may execute handlers. All threads that are waiting in the pool are equivalent and the io_service
may choose any one of them to invoke a handler.
首先是使用了strand的版本。
#include <iostream>
#include <boost/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace std;
using namespace boost;
using namespace asio;
typedef boost::asio::io_service ioType;
typedef boost::asio::strand strandType;
ioType m_service;
strandType m_strand(m_service);
boost::mutex m_mutex;
void print( int fatherID)
{
// boost::mutex::scoped_lock lock(m_mutex);
static int count = 0;
cout<<"fatherID "<<fatherID<<" "<<endl;
sleep(1);
cout<<"count "<<count++<<endl;
}
void ioRun1()
{
while(1)
{
m_service.run();
}
}
//
void ioRun2()
{
while(1)
{
m_service.run();
}
}
void print1()
{
m_strand.dispatch(bind(print,1));
// cout<<"over"<<endl;
}
void print2()
{
m_strand.post(bind(print,2));
}
void print3()
{
m_strand.post(bind(print,3));
}
int main()
{
boost::thread t0(ioRun1);
boost::thread t(ioRun2);
boost::thread t1(print1);
boost::thread t2(print2);
boost::thread t3(print3);
cout<<"111111"<<endl;
t1.join();
t2.join();
t3.join();
t0.join();
t.join();
cout<<"ads"<<endl;
return 0;
}
最终输出结果:
fatherID 1
count 0
fatherID 2
count 1
fatherID 3
count 2
说明这是线程安全的!
但是 而 io_service 不能保证:更改程序:
void print1()
{
m_service.dispatch(bind(print,1));
// cout<<"over"<<endl;
}
void print2()
{
m_service.post(bind(print,2));
}
void print3()
{
m_service.post(bind(print,3));
}
输出结果:
fatherID 3
fatherID 2
count 0
fatherID 1
count 1
count 2
很显然,这里存在并发的问题。所以strand就是为了解决这样的问题而出现的。
How strands guarantee correct execution of pending events in boost.asio一文的这个回答说:
The strand manages all handlers posted to it in a
FIFO
queue. When the queue is empty and a handler is posted to the strand, then the strand will post an internal handle to the io_service. Within the internal handler, a handler will be dequeued from the strand's FIFO queue, executed, and then if the queue is not empty, the internal handler posts itself back to the io_service.
翻译过来,就是说strand使用一个FIFO
队列来管理handlers(也就是函数对象)。
asio::async_write and strand的问题是,
asio::async_write(m_socket, asio::buffer(buf, bytes),
custom_alloc(m_strand.wrap(custom_alloc(_OnSend))));
Does this code guarantee that all asynchronous operation handlers(calls to async_write_some) inside async_write are called through strand? (or it's just for my_handler?)
Tanner Sansbury的回答很细致:
With the following code:
asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...)));
For this composed operation, all calls to stream.async_write_some()
will be invoked within m_strand
if all of the following conditions are true:
-
The initiating
async_write(...)
call is running withinm_strand()
:assert(m_strand.running_in_this_thread()); asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...)));
-
The return type from
custom_alloc
is either: -
the exact type returned from
strand::wrap()
template <typename Handler>
Handler custom_alloc(Handler) { ... }
- a custom handler that appropriate chains invocations of
asio_handler_invoke()
:
template <class Handler>
class custom_handler
{
public:
custom_handler(Handler handler)
: handler_(handler)
{}
template <class... Args>
void operator()(Args&&... args)
{
handler_(std::forward<Args>(args)...);
}
template <typename Function>
friend void asio_handler_invoke(
Function intermediate_handler,
custom_handler* my_handler)
{
// Support chaining custom strategies incase the wrapped handler
// has a custom strategy of its own.
using boost::asio::asio_handler_invoke;
asio_handler_invoke(intermediate_handler, &my_handler->handler_);
}
private:
Handler handler_;
};
template <typename Handler>
custom_handler<Handler> custom_alloc(Handler handler)
{
return {handler};
}
See this answer for more details on strands, and this answer for details on asio_handler_invoke
.