PPL-并行模式库

2018-06-12  本文已影响326人  龙翱天际

PPL简介

并行模式库 (PPL) 提供命令式编程模型,以促进开发并发应用程序的可扩展性和易用性。 PPL 构建在并发运行时的计划和资源管理组件上。
通过提供并行作用于数据的泛型安全算法和容器,提高应用程序代码与基础线程机制之间的抽象级别。 使用 PPL 还可以开发通过为共享状态提供替代方案实现缩放的应用程序。

PPL 提供以下功能:

并行算法

并行模式库 (PPL) 提供了在数据集合并发执行工作的算法。 这些算法类似于标准模板库 (STL)所提供的。
并行算法由并发运行时中的现有功能组成。 例如, concurrency:: parallel_for 算法使用 concurrency:: structured_task_group 对象执行并行循环迭代。 parallel_for 算法以最佳方式工作分区给定可用数量的计算资源。

parallel_for 算法

简介

Concurrency:: parallel_for 算法重复地以并行方式执行相同的任务。 每个任务是由迭代值参数化。 当您具有不共享资源分布该循环的迭代循环主体时,此算法很有用。
parallel_for 算法以可最佳并行执行的方式对任务进行分区。 当工作负载不平衡时,此算法还会使用工作窃取算法和范围窃取来平衡这些分区。 当某个循环迭代以协作方式被阻塞时,则运行时将原本分配当前线程的迭代范围重新分配给其他线程或处理器。 同样,当某个线程完成迭代范围,则运行时将原本属于其他线程的工作重新分配给该线程。 parallel_for 算法还支持 嵌套并行。 当一个并行循环中包含另一个并行循环时,运行时之间以高效并行执行的方式协调处理循环主体的资源。

parallel_for 算法有多个重载版本。 第一个版本采用起始值、 结束值和工作函数 (lambda 表达式、 函数对象或函数指针)。 第二个版本所依据采用起始值、 结束值、 一个步长值和工作函数。 此函数的第一个版本使用 1 作为步长值。 其余版本采用分区程序对象,使您能够指定 parallel_for如何在线程之间对范围进行分区。 分区程序在 分区工作 中有更详细地介绍。
您可以将许多 for 循环,转换成 parallel_for。 但是, parallel_for 算法与 for 语句以下不同点︰

尽管 parallel_for 算法不支持任意终止条件,但你可以使用取消以停止所有的任务。 有关取消的详细信息,请参阅 取消。

注意
对负载均衡和取消之类的功能的支持,可能会抵消并行执行循环主体可能带来的好处,尤其当循环体相对较小时。
您可以在并行循环中使用分区程序来尽量减少此开销。 有关详细信息,请参阅 分区工作

示例

下面的示例演示的基本结构 parallel_for 算法。 本示例向控制台打印范围 [1,5] 并行中的每个值。

// parallel-for-structure.cpp
// compile with: /EHsc
#include <ppl.h>
#include <array>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Print each value from 1 to 5 in parallel.
   parallel_for(1, 6, [](int value) {
      wstringstream ss;
      ss << value << L' ';
      wcout << ss.str();
   });
}

该示例产生下面的示例输出︰
1 2 4 3 5

注意
因为 parallel_for 算法作用于并行每个项目,向控制台打印值的顺序将会不同。
有关完整的示例使用 parallel_for 算法,请参阅 如何︰ 编写 parallel_for 循环

parallel_for_each 算法

Concurrency:: parallel_for_each 算法对迭代容器(如 STL提供的)并行执行任务。 它跟parallel_for 算法一样使用相同的分区逻辑。

parallel_for_each 算法类似于 STL for_each 算法,不同之处在于 parallel_for_each 算法并发执行任务。 像其他并行算法一样, parallel_for_each 不会按特定顺序执行这些任务。

尽管 parallel_for_each 算法支持前向迭代器和随机访问迭代器,但随机访问迭代器表现更好。

示例

下面的示例演示parallel_for_each 算法的基本结构。 此示例并行地将 std:: array对象中的每个值打印到控制台 。

// parallel-for-each-structure.cpp
// compile with: /EHsc
#include <ppl.h>
#include <array>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an array of integer values.
   array<int, 5> values = { 1, 2, 3, 4, 5 };

   // Print each value in the array in parallel.
   parallel_for_each(begin(values), end(values), [](int value) {
      wstringstream ss;
      ss << value << L' ';
      wcout << ss.str();
   });
}
/* Sample output:
   5 4 3 1 2
*/

输出如下:
4 5 1 2 3

注意
因为 parallel_for_each 算法并行作用于每个元素,向控制台打印值的顺序将会不同。
有关完整的示例使用 parallel_for_each 算法,请参阅 如何︰ 编写 parallel_for_each 循环

parallel_invoke 算法

Concurrency:: parallel_invoke 算法以并行方式执行一组任务。 每个任务完成之前不返回。 当您想要在同一时间执行多个独立的任务时,此算法很有用。

parallel_invoke 算法将一系列工作函数作为其参数 (lambda 函数、 函数对象或函数指针)。 parallel_invoke 算法被重载成以 2 到 10 个参数之间的形式。 传递给 parallel_invoke的每个函数必须都必须是不带参的。

像其他并行算法 parallel_invoke 不会按特定顺序执行这些任务。 任务并行 解释了parallel_invoke 算法如何关联不同的任务和任务组。

示例

下面的示例演示 parallel_invoke 算法的基本结构。 此示例用三个局部变量并发调用 twice函数,并输出结果到控制台。

// parallel-invoke-structure.cpp
// compile with: /EHsc
#include <ppl.h>
#include <string>
#include <iostream>

using namespace concurrency;
using namespace std;

// Returns the result of adding a value to itself.
template <typename T>
T twice(const T& t) {
   return t + t;
}

int wmain()
{
   // Define several values.
   int n = 54;
   double d = 5.6;
   wstring s = L"Hello";

   // Call the twice function on each value concurrently.
   parallel_invoke(
      [&n] { n = twice(n); },
      [&d] { d = twice(d); },
      [&s] { s = twice(s); }
   );

   // Print the values to the console.
   wcout << n << L' ' << d << L' ' << s << endl;
}

输出:
108 11.2 HelloHello

有关使用 parallel_invoke 算法的完整示例,请参阅 如何︰ 使用 parallel_invoke 来编写并行排序例程如何︰ 使用 parallel_invoke 来执行并行操作

parallel_transform 和 parallel_reduce 算法

Concurrency:: parallel_transformconcurrency:: parallel_reduce 算法分别是 STL 算法 std:: transformstd:: accumulate的并行版本。 并发运行时版本的行为类似于 STL 版本,只不过因为它们是并行执行的,所以操作顺序不确定。 当您使用的集合足够大,可从并行处理中获得性能和可扩展性优势时,请使用这些算法。
注意

因为这些迭代器会生成稳定的内存地址,所以 parallel_transform 算法和 parallel_reduce 算法仅支持随机访问、双向和向前迭代器。 
而且这些迭代器必须生成非 const 左值。

parallel_transform 算法

您可以使用 parallel transform 算法执行许多数据并行化操作。 例如,你可以:

下面的示例显示用于调用 parallel_transform 算法的基本结构。 此示例中采用两种方式对std::vector对象中的每个元素求反。 第一种方法是使用 lambda 表达式。 第二种方法是使用 std::negate, ,它派生自 std::unary_function

// basic-parallel-transform.cpp
// compile with: /EHsc
#include <ppl.h>
#include <random>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Create a large vector that contains random integer data.
    vector<int> values(1250000);
    generate(begin(values), end(values), mt19937(42));

    // Create a vector to hold the results.
    // Depending on your requirements, you can also transform the 
    // vector in-place.
    vector<int> results(values.size());

    // Negate each element in parallel.
    parallel_transform(begin(values), end(values), begin(results), [](int n) {
        return -n;
    });

    // Alternatively, use the negate class to perform the operation.
    parallel_transform(begin(values), end(values), begin(results), negate<int>());
}

注意
本示例演示 parallel_transform 的基本用法。 由于工作函数不会执行大量工作,因此本示例中不会有显著的性能提升。

parallel_transform 算法有两个重载。 第一个重载采用一个输入范围和一个一元函数。 该一元函数可以是采用一个参数的 lambda 表达式、一个函数对象或从 unary_function 派生的一个类型。 第二个重载采用两个输入范围和一个二元函数。 该二元函数可以采用两个参数、 函数对象或派生自的类型的 lambda 表达式 std:: binary_function。 下面的示例阐释了这些差异。

    //
    // Demonstrate use of parallel_transform together with a unary function.

    // This example uses a lambda expression.
    parallel_transform(begin(values), end(values), 
        begin(results), [](int n) { 
            return -n;
        });

    // Alternatively, use the negate class:
    parallel_transform(begin(values), end(values), 
        begin(results), negate<int>());

    //
    // Demonstrate use of parallel_transform together with a binary function.

    // This example uses a lambda expression.
    parallel_transform(begin(values), end(values), begin(results), 
        begin(results), [](int n, int m) {
            return n * m;
        });

    // Alternatively, use the multiplies class:
    parallel_transform(begin(values), end(values), begin(results), 
        begin(results), multiplies<int>());

注意
您为 parallel_transform 的输出提供的迭代器必须与输入迭代器完全重叠或根本不重叠。 如果输入迭代器和输出迭代器部分重叠,则此算法的行为是未指定的。

parallel_reduce 算法

当您具有满足关联属性的操作序列时,parallel_reduce 算法很有用。 (此算法不要求可交换属性。)以下是可以使用 parallel_reduce 执行的一些操作:

下面的基本示例演示如何使用 parallel_reduce 算法将一个字符串序列组合为一个字符串。 与 parallel_transform 的示例一样,此基本示例中不会有性能提升。

// basic-parallel-reduce.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <string> 
#include <vector>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Create a vector of strings.
    vector<wstring> words;
    words.push_back(L"Lorem ");
    words.push_back(L"ipsum ");
    words.push_back(L"dolor ");
    words.push_back(L"sit ");
    words.push_back(L"amet, ");
    words.push_back(L"consectetur ");
    words.push_back(L"adipiscing ");
    words.push_back(L"elit.");

    // Reduce the vector to one string in parallel.
    wcout << parallel_reduce(begin(words), end(words), wstring()) << endl;
}

/* Output:
   Lorem ipsum dolor sit amet, consectetur adipiscing elit.
*/

在许多情况下,您可以将 parallel_reduce 当成是parallel_for_eachconcurrency:: combinable 类一起使用的的简写形式。

示例︰ 并行执行映射和归约

一个 映射 操作将函数应用于序列中的每个值。 一个 归约 操作会将序列中的每一个元素组合成一个值。 可以使用标准模板库 (STL) std:: transformstd:: accumulate 类来执行映射和归约操作。 但是,对于许多问题,您可以使用 parallel_transform 算法并行执行映射操作,并使用 parallel_reduce 算法并行执行归约操作。

下面的示例将按串行方式计算质数和所需的时间与按并行方式计算质数和所需的时间进行比较。 映射阶段会将非质数值转换为 0,而归约阶段将对这些值求和。

// parallel-map-reduce-sum-of-primes.cpp
// compile with: /EHsc
#include <windows.h>
#include <ppl.h>
#include <array>
#include <numeric>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Determines whether the input value is prime.
bool is_prime(int n)
{
   if (n < 2)
      return false;
   for (int i = 2; i < n; ++i)
   {
      if ((n % i) == 0)
         return false;
   }
   return true;
}

int wmain()
{   
   // Create an array object that contains 200000 integers.
   array<int, 200000> a;

   // Initialize the array such that a[i] == i.
   iota(begin(a), end(a), 0);

   int prime_sum;
   __int64 elapsed;

   // Compute the sum of the numbers in the array that are prime.
   elapsed = time_call([&] {
       transform(begin(a), end(a), begin(a), [](int i) { 
         return is_prime(i) ? i : 0; 
      });
      prime_sum = accumulate(begin(a), end(a), 0);
   });   
   wcout << prime_sum << endl;   
   wcout << L"serial time: " << elapsed << L" ms" << endl << endl;

   // Now perform the same task in parallel.
   elapsed = time_call([&] {
      parallel_transform(begin(a), end(a), begin(a), [](int i) { 
         return is_prime(i) ? i : 0; 
      });
      prime_sum = parallel_reduce(begin(a), end(a), 0);
   });
   wcout << prime_sum << endl;
   wcout << L"parallel time: " << elapsed << L" ms" << endl << endl;
}
/* Sample output:
   1709600813
   serial time: 7406 ms
   
   1709600813
   parallel time: 1969 ms
*/

注意
有关执行映射和化简操作的并行的另一个示例,请参阅 如何︰ 并行执行映射和归约操作

分区工作

若要在数据源上并行化一个操作,不可或缺的步骤为将操作源 分区成可被多线程并发操作的几块。 分区程序将指定并行算法如何在线程间划分区间范围。 如本文档前面所述,PPL 使用的是默认分区机制,该默认分区机制创建初始工作负荷并在工作负荷不平衡时使用工作窃取算法和范围窃取来平衡这些分区。 例如,当某个循环迭代完成一个迭代范围时,运行时会将其他线程的工作重新分配给该线程。 但是,在某些方案中,您可能希望指定另一个更适用于您的问题的分区机制。

parallel_forparallel_for_eachparallel_transform 算法提供有一附加参数 _Partitioner 的重载版本。 此参数定义了用于划分工作的分区程序类型。 以下是 PPL 定义的分区程序种类:

concurrency::affinity_partitioner
将工作划分为一个固定数量的范围(通常是可用于在循环中工作的辅助线程的数量)。 此分区程序类型与 static_partitioner 类似,但通过将范围映射到辅助线程的方式改善了缓存的关联。 当在相同数据集中多次执行一个循环(例如一个循环内的循环)且数据适合缓存时,此分区程序类型可提高性能。 此分区程序不完全适用取消。 它也不使用协作阻塞语义,因此不能与具有前向依赖关系的并行循环一起使用。

concurrency::auto_partitioner
将工作划分为一个初始数量的范围(通常是可用于在循环中工作的辅助线程的数量)。 当您不调用采用 _Partitioner 参数的重载的并行算法时,运行时默认使用此类型。 每个范围可以划分为子范围,从而实现负载平衡。 当一个工作范围完成时,运行时会将其他线程工作的子范围重新分配给该线程。 如果您的工作负荷不在另外一个类别下或者您需要完全支持取消或协作阻塞,请使用该分区程序。

concurrency::simple_partitioner
将工作划分到范围中,使每个范围至少拥有给定区块大小所指定的迭代的数目。 此分区程序类型加入了负载平衡;然而,运行时未将范围划分为子范围。 对于每个辅助,运行时将在 _Chunk_size 迭代完成后检查取消情况并执行负载平衡。

concurrency::static_partitioner
将工作划分为一个固定数量的范围(通常是可用于在循环中工作的辅助线程的数量)。 此分区程序类型不使用工作窃取,开销较小,可以一定程度上提高性能。当一个并行循环的每次迭代执行固定和统一数量的工作而且您不需要支持取消或前向协作阻塞时,请使用此分区程序类型。

上一篇 下一篇

猜你喜欢

热点阅读