【翻译】为何我们要使用boost strands

2016-11-30  本文已影响0人  whosemario

原文链接: http://www.crazygaze.com/blog/2016/03/17/how-strands-work-and-why-you-should-use-them/

如果你使用过Boost Asio,一般情况下你都使用过或者了解过strands

使用strands最显著的好处就是简化我们的代码,因为通过strand来维护handler不需要显式地同步线程。strands保证同属于一个strand的两个handler不会同时执行(在两个线程同时执行)。

如果你只使用一个IO线程(在Boost里面只有一个线程调用io_service::run),那么你不需要做任何的同步,此时已经是隐式的strand。但是如果你想提高性能,因此使用多个IO线程,那么你有两种选择,一种是在不同的handler进行显式的同步,另一种就是使用strand。

显式同步handler会提升代码的复杂度,从而导致bug产生。显式同步的另外一个负面影响就是会给线程产生没有必要的阻塞。

strands通过在你的应用代码和handler执行代码之间插入一层,从而避免工作线程直接执行你的handler,而是将handler插入队列,strand控制handler的执行顺序,就像下面这张图:

可能发生的情况

为了可视化地说明IO线程和handlers之间发生了什么,我使用了Remotery

测试代码模拟4个工作线程和8个连接。对于一个连接,handler会产生一个随机的工作时间(5ms到15ms之间),实际上你不需要随机设置这些工作时间,这只是为了更容易地说明问题。另外,我没有使用Boost Asio,我自己定制了一个strand。

那么,来看一下每一个工作线程的情况:

Conn N是我们的连接实例(每一个连接都运行在一个工作线程里面)。每一个连接有不同的颜色,现在让我们看一下每一个时间片,每一个Conn都在做什么。

上面的情况是各个工作线程并不关心每一个handler在做什么,一个线程尝试执行一个Conn,它发现在另外一个工作线程中Conn也在执行,因此第二个工作线程就阻塞了。

在这个案例中19%的时间浪费在阻塞和其他的负载中,也就是说工作线程只有81%的时间在做真正的工作:

注意:计算的结果是线程的总共运行时间减去线程工作时间,剩下的时间也就是线程用于同步的时间。

让我们看看使用strand的结果:

非常少的时间用于线程的内部同步。

Cache局部性

另一个使用strand的好处就是提高CPU的Cache利用率,一个工作线程会使用等多的时间处理数量少的handler。

不使用strand的情况:

使用strand的情况:

实现strands

作为练习,我自己实现一个strand,虽然没有达到产品的标准,但却可以用于实验。

首先,让我们思考一下strands具有哪些功能:

  1. 没有handler可以同时执行
    • 这要求我们检测是否有工作线程正在使用strand,如果有使用,strand处于running状态
    • 为了避免阻塞,strand需要一个handler队列,以至于如果strand正运行在其他线程上,此handler将会进入队列稍后执行
  2. handler仅仅执行于工作线程
    • 这通常也指明了strand的handler队列的任务,如果在一个非工作线程中加入一个handler,handler将会进入队列
  3. handler的执行顺序是不保证的
    • 因为我们尝试从不同的线程中将handler加入strand,所以我们无法保证handler的执行顺序

strand的实现围绕着三个方法:

现在我们可以画出这三个方法的流程图:

为了实现strands,我们需要使用到我先去讲到的两个类:

template <class T>
class Monitor {
private:
    mutable T m_t;
    mutable std::mutex m_mtx;

public:
    using Type = T;
    Monitor() {}
    Monitor(T t_) : m_t(std::move(t_)) {}
    template <typename F>
    auto operator()(F f) const -> decltype(f(m_t)) {
        std::lock_guard<std::mutex> hold{m_mtx};
        return f(m_t);
    }
};

strand的实现与上图的描述基本一致,但是为了说明内部的同步机制,我在代码里面还是做了很多注释:

#pragma once
#include "Callstack.h"
#include "Monitor.h"
#include <assert.h>
#include <queue>
#include <functional>
 
//
// A strand serializes handler execution.
// It guarantees the following:
// - No handlers executes concurrently
// - Handlers are only executed from the specified Processor
// - Handler execution order is not guaranteed
//
// Specified Processor must implement the following interface:
//
//  template <typename F> void Processor::push(F w);
//      Add a new work item to the processor. F is a callable convertible
// to std::function<void()>
//
//  bool Processor::canDispatch();
//      Should return true if we are in the Processor's dispatching function in
// the current thread.
//
template <typename Processor>
class Strand {
public:
    Strand(Processor& proc) : m_proc(proc) {}
 
    Strand(const Strand&) = delete;
    Strand& operator=(const Strand&) = delete;
 
    // Executes the handler immediately if all the strand guarantees are met,
    // or posts the handler for later execution if the guarantees are not met
    // from inside this call
    template <typename F>
    void dispatch(F handler) {
        // If we are not currently in the processor dispatching function (in
        // this thread), then we cannot possibly execute the handler here, so
        // enqueue it and bail out
        if (!m_proc.canDispatch()) {
            post(std::move(handler));
            return;
        }
 
        // NOTE: At this point we know we are in a worker thread (because of the
        // check above)
 
        // If we are running the strand in this thread, then we can execute the
        // handler immediately without any other checks, since by design no
        // other threads can be running the strand
        if (runningInThisThread()) {
            handler();
            return;
        }
 
        // At this point we know we are in a worker thread, but not running the
        // strand in this thread.
        // The strand can still be running in another worker thread, so we need
        // to atomically enqueue the handler for the other thread to execute OR
        // mark the strand as running in this thread
        auto trigger = m_data([&](Data& data) {
            if (data.running) {
                data.q.push(std::move(handler));
                return false;
            } else {
                data.running = true;
                return true;
            }
        });
 
        if (trigger) {
            // Add a marker to the callstack, so the handler knows the strand is
            // running in the current thread
            Callstack<Strand>::Context ctx(this);
            handler();
 
            // Run any remaining handlers.
            // At this point we own the strand (It's marked as running in
            // this thread), and we don't release it until the queue is empty.
            // This means any other threads adding handlers to the strand will
            // enqueue them, and they will be executed here.
            run();
        }
    }
 
    // Post an handler for execution and returns immediately.
    // The handler is never executed as part of this call.
    template <typename F>
    void post(F handler) {
        // We atomically enqueue the handler AND check if we need to start the
        // running process.
        bool trigger = m_data([&](Data& data) {
            data.q.push(std::move(handler));
            if (data.running) {
                return false;
            } else {
                data.running = true;
                return true;
            }
        });
 
        // The strand was not running, so trigger a run
        if (trigger) {
            m_proc.push([this] { run(); });
        }
    }
 
    // Checks if we are currently running the strand in this thread
    bool runningInThisThread() {
        return Callstack<Strand>::contains(this) != nullptr;
    }
 
private:
    // Processes any enqueued handlers.
    // This assumes the strand is marked as running.
    // When there are no more handlers, it marks the strand as not running.
    void run() {
        Callstack<Strand>::Context ctx(this);
        while (true) {
            std::function<void()> handler;
            m_data([&](Data& data) {
                assert(data.running);
                if (data.q.size()) {
                    handler = std::move(data.q.front());
                    data.q.pop();
                } else {
                    data.running = false;
                }
            });
 
            if (handler)
                handler();
            else
                return;
        }
    }
 
    struct Data {
        bool running = false;
        std::queue<std::function<void()>> q;
    };
    Monitor<Data> m_data;
    Processor& m_proc;
};

用例

一个简单的用例:

#include "Strand.h"
#include "WorkQueue.h"
#include <random>
#include <stdlib.h>
#include <string>
#include <atomic>
 
// http://stackoverflow.com/questions/7560114/random-number-c-in-some-range
int randInRange(int min, int max) {
    std::random_device rd;   // obtain a random number from hardware
    std::mt19937 eng(rd());  // seed the generator
    std::uniform_int_distribution<> distr(min, max);  // define the range
    return distr(eng);
}
 
struct Obj {
    explicit Obj(int n, WorkQueue& wp) : strand(wp) {
        name = "Obj " + std::to_string(n);
    }
 
    void doSomething(int val) {
        printf("%s : doing %dn", name.c_str(), val);
    }
    std::string name;
    Strand<WorkQueue> strand;
};
 
void strandSample() {
    WorkQueue workQueue;
    // Start a couple of worker threads
    std::vector<std::thread> workerThreads;
    for (int i = 0; i < 4; i++) {
        workerThreads.push_back(std::thread([&workQueue] { workQueue.run(); }));
    }
 
    // Create a couple of objects that need strands
    std::vector<std::unique_ptr<Obj>> objs;
    for (int i = 0; i < 8; i++) {
        objs.push_back(std::make_unique<Obj>(i, workQueue));
    }
 
    // Counter used by all strands, so we can check if all work was done
    std::atomic<int> doneCount(0);
 
    // Add work to random objects
    const int todo = 20;
    for (int i = 0; i < todo; i++) {
        auto&& obj = objs[randInRange(0, objs.size() - 1)];
        obj->strand.post([&obj, i, &doneCount] {
            obj->doSomething(i);
            ++doneCount;
        });
    }
 
    workQueue.stop();
    for (auto&& t : workerThreads) {
        t.join();
    }
 
    assert(doneCount == todo);
}

输出:

Obj 2 : doing 0
Obj 1 : doing 1
Obj 1 : doing 3
Obj 1 : doing 4
Obj 3 : doing 6
Obj 5 : doing 2
Obj 4 : doing 5
Obj 6 : doing 11
Obj 3 : doing 8
Obj 5 : doing 10
Obj 5 : doing 12
Obj 6 : doing 17
Obj 3 : doing 9
Obj 3 : doing 13
Obj 5 : doing 18
Obj 0 : doing 14
Obj 2 : doing 15
Obj 3 : doing 16
Obj 5 : doing 19
Obj 1 : doing 7

小结

上一篇下一篇

猜你喜欢

热点阅读