C++多线程

{ 3 }CPP_并行算法的尝试、识别线程以及共享数据带来的问题

2018-11-26  本文已影响13人  庄阿莫

一、并行算法初步尝试

template<typename Iterator,typename T>
struct accumulate_block
{
    void operator()(Iterator first,Iterator last,T& result)
    {
        result = std::accumulate(first, last, result);//累积所有任务块的总和
    }
};


template<typename Iterator,typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{

    unsigned long const length = std::distance(first, last);//distance是计算两个iterator的距离。

    if(!length)
        return init;


    unsigned long const min_per_thread = 25;

    /*
     * 用范围内元素的总数量(length + min_per_thread)除以线程(块)中最小任务数,从而确定启动线程的最大数量。
     * 这样能避免无谓的计算资源的浪费。
     * */
    unsigned long const max_threads =
            (length+min_per_thread-1) / min_per_thread;


    unsigned long const hardware_threads =
            std::thread::hardware_concurrency();//返回能同时并发在一个程序中的线程数量


    /*
     * 计算量的最大值和硬件支持线程数中,较小的值为启动线程的数量。
     * 因为上下文频繁的切换会降低线程的性能,所以你肯定不想启动的线程数多于硬件支持的线程数量。
     *
     * 当 std::thread::hardware_concurrency() 返回0,你可以选择一个合适的数作为你的选择;在本例中,我选择了"2"。
     * 也就是说,无论如何都会开启两个线程
     * */
    unsigned long const num_threads =
            std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);


    //每个线程中处理的元素数量,是范围中元素的总量除以线程的个数得出的。
    unsigned long const block_size = length / num_threads;


    std::vector<T> results(num_threads);//用于存放当前能开启的线程中,每个线程处理的任务块
    std::vector<std::thread> threads(num_threads-1);//存放总共能开启多少线程,因为在启动之前已经有了一个线程(主线程), 所以-1。

    Iterator block_start = first;
    for(unsigned long i=0; i < (num_threads-1); ++i)
    {

        Iterator block_end = block_start;

        std::advance(block_end, block_size);//block_end迭代器指向当前任务块的末尾

        threads[i] = std::thread(
                accumulate_block<Iterator,T>(),
                block_start, block_end, std::ref(results[i]) );//启动一个新线程为当前块累加结果

        block_start = block_end;//当迭代器指向当前块的末尾时,启动下一个块
    }

    /*
     * 启动所有线程后,线程会处理最终块的结果。对于分配不均,因为知道最终块是哪一个,那么这个块中有多少个元素就无所谓了。
     * */
    accumulate_block<Iterator,T>()(block_start,last,results[num_threads-1]);

    /*
     * 把容器中的thread对象逐个join
     * */
    std::for_each(threads.begin(),threads.end(),
            std::mem_fn(&std::thread::join));

    return std::accumulate(results.begin(),results.end(),init);//使用 std::accumulate 将所有结果进行累加
}

此程序的一些要求和缺点:

二、识别线程

线程标识类型是 std::thread::id,有两种方式可以获取它:

std::thread::id 的对象可以自由的拷贝和对比:

C++标准库允许程序员将 std::thread::id 的对象做为容器的键值,同时也提供std::hash<std::thread::id> 容器,所以 std::thread::id 也可以作为无序容器的键值。

三、共享数据带来的问题

当一个或多个线程要修改共享数据时,就会产生很多麻烦。
例子:
从一个列表中删除一个节点的步骤如下(不变量为此双链表)
a. 找到要删除的节点N
b. 更新前一个节点指向N的指针,让这个指针指向N的下一个节点
c. 更新后一个节点指向N的指针,让这个指正指向N的前一个节点
d. 删除节点N

image.png
b) 和 c) 在相同的方向上指向和原来已经不一致了,修改了共享数据,同时也破坏了不变量。在并发程序中,极有可能因为其他线程访问刚刚被删除的数据,而引发错误:条件竞争

四、条件竞争

条件竞争的定义

如上例子,并发中竞争条件的形成,取决于一个以上线程的相对执行顺序,每个线程都抢着完成自己的任务,也被称为 “恶性条件竞争” 。

C++标准也定义了数据竞争这个术语:并发的去修改一个独立对象,数据竞争时可怕的未定义行为的起因。

如何避免恶性条件竞争?

1. 通过加锁或者无锁的方式来避免

2.使用事务的方式去处理数据结构的更新

上一篇下一篇

猜你喜欢

热点阅读