大数据安全大数据我爱编程

Google 大数据引擎 Apache Beam Java SD

2017-01-23  本文已影响318人  许伦

本文将带你执行你的第一个 Beam 管线 —— 运行一个由 Beam 的 Java SDK 编写的 WordCount 示例,于你选定一个的 runner 上。

Apache Beam 代言

设置开发环境

  1. 下载并安装 Java Development Kit (JDK) 1.7 或更高版本。检查 JAVA_HOME 环境变量已经设置并指向你的 JDK 安装目录。
  2. 照着 Maven 的 安装指南 下载并安装适合你的操作系统的 Apache Maven

获取 WordCount 代码

获得一份 WordCount 管线代码拷贝最简单的方法,就是使用下列指令来生成一个简单的、包含基于 Beam 最新版的 WordCount 示例和构建的 Maven 项目:

$ mvn archetype:generate \
      -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots \
      -DarchetypeGroupId=org.apache.beam \
      -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
      -DarchetypeVersion=LATEST \
      -DgroupId=org.example \
      -DartifactId=word-count-beam \
      -Dversion="0.1" \
      -Dpackage=org.apache.beam.examples \
      -DinteractiveMode=false

这将创建一个叫 word-count-beam 的目录,其中包含了一份简单的 pom.xml 文件和一套示例管线,用来计算某个文本文件中的各个单词的数量。

$ cd word-count-beam/

$ ls
pom.xml src

$ ls src/main/java/org/apache/beam/examples/
DebuggingWordCount.java WindowedWordCount.java  common
MinimalWordCount.java   WordCount.java

关于这些示例中用到的 Beam 的概念的详细介绍,请阅读 WordCount Example Walkthrough 一文。这里我们只聚焦于如何执行 WordCount.java 上。

运行 WordCount

一个单 Beam 管线可以运行于多种 Beam runner 上,包括 ApexRunnerFlinkRunnerSparkRunnerDataflowRunner 等。

在你选好用哪个 runner 以后:

  1. 确保你已经正确配置了该 runner 。
  2. 这样来构造你的命令行:
  3. --runner=<runner> 选项指定你选定的 runner (缺省为 DirectRunner)
  4. 添加特定于该 runner 的必需选项
  5. 选择该 runner 能访问到的输入文件和输出位置。(例如,当你在外部集群上运行管线的时候是无法访问本地文件的。)
  6. 运行你的第一个 WordCount 管线。
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts --runner=ApexRunner" -Papex-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -Pflink-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
                  --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
     -Pdataflow-runner

检视结果

一旦管线完成运行,你可以查看结果。你会注意到有多个以 count 打头的输出文件。具体会有几个这样的文件是由 runner 决定的。这样能方便 runner 进行高效的分布式执行。

$ ls counts*
$ ls counts*
$ ls counts*
$ ls /tmp/counts*
$ ls counts*
$ gsutil ls gs://<your-gcs-bucket>/counts*

当你查看文件内容的时候,你会看到里面包含每个单词的出现数量。文件中的元素顺序可能会和这里看到的不同。因为 Beam 模型通常并不保障顺序,以便于 runner 优化效率。

$ more counts*
api: 9
bundled: 1
old: 4
Apache: 2
The: 1
limitations: 1
Foundation: 1
...
$ cat counts*
BEAM: 1
have: 1
simple: 1
skip: 4
PAssert: 1
...
$ more counts*
The: 1
api: 9
old: 4
Apache: 2
limitations: 1
bundled: 1
Foundation: 1
...
$ more /tmp/counts*
The: 1
api: 9
old: 4
Apache: 2
limitations: 1
bundled: 1
Foundation: 1
...
$ more counts*
beam: 27
SF: 1
fat: 1
job: 1
limitations: 1
require: 1
of: 11
profile: 10
...
$ gsutil cat gs://<your-gcs-bucket>/counts*
feature: 15
smother'st: 1
revelry: 1
bashfulness: 1
Bashful: 1
Below: 2
deserves: 32
barrenly: 1
...

下一步

如果你遇到任何问题,请千万不要犹豫 跟我们联系

英文原文: https://beam.apache.org/get-started/quickstart/

上一篇下一篇

猜你喜欢

热点阅读