从0到1成为Flink源码Contributor之自建版本Hel

2022-03-16  本文已影响0人  CodeRap

前置要求

关于Flink版本自建可以参考上一篇文章

新建flink-test项目测试自建Flink

在 flink-in-depth 目录下新建一个 flink-test 目录,并添加 pom.xml 文件,添加自建Flink的custom-test版本依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <flink.version>custom-test</flink.version>
    <scala.version>2.12.7</scala.version>
    <scala.binary.version>2.12</scala.binary.version>
    <calcite.version>1.26.0</calcite.version>
    <hadoop.version>2.7.3</hadoop.version>
</properties>
  
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <!--<scope>test</scope>-->
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.22</version>
        <scope>provided</scope>
    </dependency>
  
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
  
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>
  
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>
  
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>
<dependencies>

除了添加Flink依赖之外,我们还添加了junit单元测试、lombok快捷工具、scala语言包

下面就开始我们的Flink自建版本本地Local MiniCluster的单元测试吧
新建一个名为FlinkFrameworkTest的Java类,然后添加如下代码:

private static final StreamExecutionEnvironment streamExecutionEnvironment;
  
static {
    Configuration configuration = new Configuration();
    configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);
    configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
    configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
  
    streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
}
  
@Test
public void testFlinkHelloWorld() throws Exception {
    DataStreamSource<String> lines = streamExecutionEnvironment.socketTextStream("localhost", 8080);
    lines.print();
  
    streamExecutionEnvironment.execute();
}

HelloWorld程序很简单,使用静态代码块创建Flink的流式执行环境StreamExecutionEnvironment,然后从本地的8080网络端口读取Socket链接数据并进行输出,最后是使用执行环境执行流式应用程序
其中执行环境我们通过Configuration类配置了只有一个并行度,一个TaskManager,每个TaskManager只有一个Slot
由于我们使用了带有Flink WebUI的流式执行环境,所以需要 flink-runtime-web 依赖

由以上的HelloWorld可以看出,Flink的流式应用编程三步曲

1) 创建流式执行环境StreamExecutionEnvironment
2) 基于StreamExecutionEnvironment执行环境进行DataStream的算子操作(包括Source、Transformation、Sink)
3) 使用StreamExecutionEnvironment执行环境执行程序

本例子中只有Source与Sink,其实Source与Sink操作本质也是Transaformation操作,但更多与外部系统或集合交互

接下来我们运行一下HelloWorld例子
先通过nc程序打开8080网络端口

nc -lk 8080
image

然后点击单元测试运行HelloWorld单元测试


image

可以看到我们的HelloWorld已经创建了本地Local MiniCluster集群并运行了起来

通过浏览器找开 http://localhost:8081 页面(8081端口是Flink WebUI的默认端口)

image
从Flink WebUI概览图上,我们可以看出目前集群的配置是只有一个TaskManager,只有一个Slot,与我们代码里Configuration的配置一致,同时只有一个Job在运行,就是我们的HelloWorld程序,这个程序使用了默认的名称,叫做Flink Streaming Job

我们点击 Flink Streaming Job 这个程序名称,就可以打开正在运行的HelloWorld的算子流图


image

从流图中我们可以看出目前有一个Souce为Socket Stream,一个Sink为Print to Std. Out,与我们的代码一致

我们通过nc程序输入数据看一下效果


image

我们输入什么控制台上就输出什么

以上就是我们使用自建Flink版本进行的HelloWorld流式程序测试

上一篇下一篇

猜你喜欢

热点阅读