flink简单使用教程

flink使用16-正确打包Flink程序并使用Cli提交任务

2019-11-15  本文已影响0人  CheckChe

本文的计划是使用正确的maven插件打包当前教程代码库batch模块下的WordCount代码,并通过命令行的方式提交到Flink来启动任务。WordCount类即为Flink主方法类,该部分代码是Flink官方example的简单修改,只是对map方法填加了一点sleep来方便观察运行情况。

package.png

项目的运行环境使用Docker来部署Flink, Flink镜像可以从Docker hub上拉去,其Docker-Compose文件如下:

version: "2.1"
services:
  jobmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

正确启动Flink之后,就可以在WebUI上看到我们的环境了。

webUI.png

下面就开始打包我们的应用程序了。

官方推荐我们使用maven-shade-plugin插件,复制一下代码到POM中指定我们的主方法类即可。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>my.programs</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

需要注意的是一般来说我们是不会将flink的一些相关的包直接打到项目里,通常有两种方案:

打包好后就可以直接是用 FLink Cli 提交到集群来开始job了 。

Flink Cli 一般来讲主要作用有:提交并执行任务、取消任务、获取任务状态信息、列出正在运行和等待的任务、触发savepoint等。

我们将已经打包好的jar包放到docker中

docker cp /opt/flink/wordcount.jar flink_jobmanager_1:/opt/

然后就可以通过命令行启动任务了,启动完成后我们可以在webUI上看到任务的执行情况。

docker exec -ti flink_jobmanager_1 bash -c 'flink run /opt/wordcount.jar'
runningJob.png

Flink Cli 的命令有很多,具体的内容可以参考官网示例:

Flink Cli Examples

上一篇 下一篇

猜你喜欢

热点阅读