Apache Beam Java SDK 扩展

2019-03-28  本文已影响0人  HelloWide

1 Join-library

join -library提供了内部连接、外部左连接和外部右连接函数。其目的是简化连接到简单函数调用的最常见情况。
这些函数是泛型的,支持任何bean支持的类型的连接。连接函数的输入是键/值的PCollections。左右PCollections都需要相同类型的键。所有连接函数都返回键/值,其中键为连接键,值为键/值,其中键为左值,右为值。类似于 SQL 中的a join b on …(key)。
注:
对于外部连接,用户必须提供一个表示null的值,因为null不能序列化。
Example:

PCollection<KV<String, String>> leftPcollection = ...
PCollection<KV<String, Long>> rightPcollection = ...
PCollection<KV<String, KV<String, Long>>> joinedPcollection = Join.innerJoin(leftPcollection,rightPcollection);

目前在2.10.0 release中支持:


1.png

2 Sorter

模块提供SortValues算子,对输入的PCollection<KV<K, Iterable<KV<K2, V>>>> 返回一个对于每个主键K,匹配一个按照副键(K2)的字节编码进行排序的集合,PCollection<KV<K, Iterable<KV<K2, V>>>>。它是一种高效的、可伸缩的迭代排序器,即使它们很大(不适合内存)。
说明:

  1. 这个转换只执行值排序;每个键所附带的iterable是排序的,但是不同键之间没有关系,因为Beam不支持PCollection中不同元素之间定义的任何关系;
  2. 每一个集合排序都通过使用本地内存和磁盘的任务完成。这意味着SortValues在不同管道中使用时可能会存在性能和/或可伸缩性的瓶颈。因此,不鼓励用户在单个元素的PCollection上使用SortValues对大型PCollection进行全局排序。(粗略)估计分类溢出到磁盘时使用的磁盘空间字节数公式如下:
numRecords * (numSecondaryKeyBytesPerRecord + numValueBytesPerRecord + 16) * 3

Options
如果排序导致内存耗尽或溢出,用户可以通过创建一个自定义的临时位置。在调用SortValues.create()时候可以传入一个自定义的实例BufferedExternalSorter.Options。
Example:

PCollection<KV<String, KV<String, Integer>>> input = ...
// Group by primary key, bringing <SecondaryKey, Value> pairs for the same key together.
PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
    input.apply(GroupByKey.<String, KV<String, Integer>>create());
// For every primary key, sort the iterable of <SecondaryKey, Value> pairs by secondary key.
PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
    grouped.apply(
        SortValues.<String, String, Integer>create(BufferedExternalSorter.options()));

原文参考链接:https://beam.apache.org/documentation/sdks/java-extensions/
Beam API DOC: https://beam.apache.org/releases/javadoc/2.9.0/
Apache Beam: 一个高级且统一的编程模型
让批处理和流数据处理的作业在任何执行引擎上都可以运行.

上一篇下一篇

猜你喜欢

热点阅读