BOOST

boost Strands Internal

2017-09-29  本文已影响17人  Brent姜

我对boost.asio的使用不深入,所以总是过了一段时间就会对它的理解上又变得模糊起来。
还是先回头看看boost.asio的一些基础知识。

ASIO攻破!!!这篇文章有一个蛮不错的比喻:

其实io_service就像是劳工中介所,而一个线程就是一个劳工,调用post的模块相当于富人们,他们去中介所委托任务,而劳工们就听候中介所的调遣去执行这些任务,任务的内容就写在富人们给你的handler上,也就是函数指针,指针指向具体实现就是任务的实质内容。其实在整个过程中,富人们都不知道是哪个劳工帮他们做的工作,只知道是中介所负责完成这些就可以了。这使得逻辑上的耦合降到了最低。不过这样的比喻也有个不恰当的地方,如果硬要这样比喻的话,我只能说:其实劳工里面也有很多富人的o! 。很多劳工在完成任务的过程中自己也托给中介所一些任务,然后这些任务很可能还是自己去完成。这也难怪,运行代码的总是这些线程,那么调用post的肯定也会有这些线程了,不过不管怎么说,如此循环往复可以解决问题就行。

多说一句,允许有多个劳工,也就允许有多个中介所。换句话说,你可以实现1+n模式(让一个中介所为多个劳工服务),也可以实现为n*(1+1)模式(一个中介所只为一个劳工服务,有多个这样的搭配),当然还可以实现为n+m模式(多个中介所共同为多个劳工服务)。

对于n*(1+1)模式,其实相当于对那一部分数据实现了单线程处理模型,类似于一个spsc队列。这里暂不讨论。

对于1+n模式和n+m模式,情况会复杂一些。
boost asio io_service与 strand 分析 一文的例子拿来用,他采用了n+m模式(2+3)来验证strand的作用。这种模式也就是官方文档中说的线程池模型:

Multiple threads may call the run()
function to set up a pool of threads from which the io_service
may execute handlers. All threads that are waiting in the pool are equivalent and the io_service
may choose any one of them to invoke a handler.

首先是使用了strand的版本。

#include <iostream>
#include <boost/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace std;
using namespace boost;
using namespace asio;

typedef boost::asio::io_service ioType;
typedef boost::asio::strand strandType;
ioType m_service;
strandType m_strand(m_service);
boost::mutex m_mutex;


void print( int fatherID)
{
//  boost::mutex::scoped_lock lock(m_mutex);
  static int count = 0;
  cout<<"fatherID "<<fatherID<<" "<<endl;
  sleep(1);
  cout<<"count "<<count++<<endl;
}

void ioRun1()
{
  while(1)
    {
      m_service.run();
    }
}
//
void ioRun2()
{
  while(1)
    {
      m_service.run();
    }
}

void print1()
{
  m_strand.dispatch(bind(print,1));
//  cout<<"over"<<endl;
}

void print2()
{
  m_strand.post(bind(print,2));
}

void print3()
{
  m_strand.post(bind(print,3));
}

int main()
{
  boost::thread t0(ioRun1);
  boost::thread t(ioRun2);
  boost::thread t1(print1);
  boost::thread t2(print2);
  boost::thread t3(print3);
  cout<<"111111"<<endl;
  t1.join();
  t2.join();
  t3.join();
  t0.join();
  t.join();
  cout<<"ads"<<endl;
  return 0;
}

最终输出结果:

   fatherID 1 
 count 0
 fatherID 2 
 count 1
 fatherID 3 
   count 2

说明这是线程安全的!

但是 而 io_service 不能保证:更改程序:

void print1()
{
  m_service.dispatch(bind(print,1));
//  cout<<"over"<<endl;
}

void print2()
{
  m_service.post(bind(print,2));
}

void print3()
{
  m_service.post(bind(print,3));
}

输出结果:

fatherID 3 
fatherID 2 
count 0
fatherID 1 
count 1
count 2

很显然,这里存在并发的问题。所以strand就是为了解决这样的问题而出现的。

How strands guarantee correct execution of pending events in boost.asio一文的这个回答说:

The strand manages all handlers posted to it in a FIFO queue. When the queue is empty and a handler is posted to the strand, then the strand will post an internal handle to the io_service. Within the internal handler, a handler will be dequeued from the strand's FIFO queue, executed, and then if the queue is not empty, the internal handler posts itself back to the io_service.

翻译过来,就是说strand使用一个FIFO队列来管理handlers(也就是函数对象)。

asio::async_write and strand的问题是,
asio::async_write(m_socket, asio::buffer(buf, bytes),
custom_alloc(m_strand.wrap(custom_alloc(_OnSend))));

Does this code guarantee that all asynchronous operation handlers(calls to async_write_some) inside async_write are called through strand? (or it's just for my_handler?)

Tanner Sansbury的回答很细致:
With the following code:

asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...)));

For this composed operation, all calls to stream.async_write_some() will be invoked within m_strand if all of the following conditions are true:

        template <typename Handler> 
        Handler custom_alloc(Handler) { ... }
        template <class Handler>
        class custom_handler
        {
        public:
          custom_handler(Handler handler)
            : handler_(handler)
          {}

          template <class... Args>
          void operator()(Args&&... args)
          {
            handler_(std::forward<Args>(args)...);
          }

          template <typename Function>
          friend void asio_handler_invoke(
            Function intermediate_handler,
            custom_handler* my_handler)
          {
            // Support chaining custom strategies incase the wrapped handler
            // has a custom strategy of its own.
            using boost::asio::asio_handler_invoke;
            asio_handler_invoke(intermediate_handler, &my_handler->handler_);
          }

        private:
          Handler handler_;
        };

        template <typename Handler>
        custom_handler<Handler> custom_alloc(Handler handler)
        {
          return {handler};
        }

See this answer for more details on strands, and this answer for details on asio_handler_invoke.

上一篇下一篇

猜你喜欢

热点阅读