Flink实战—Flink SQL在Batch场景的Demo

2020-04-11  本文已影响0人  北邮郭大宝

最近工作会用到Flink SQL,周末学习了一下,写个demo做记录,全部代码请参考Github.

基于的Flink版本是1.9.1,使用的是java8开发。

本例是Flink SQL在Batch场景下的应用,目标是从students、scores表中读取学生的信息,计算班级平均分。

1. 准备数据

students.txt 保存学生信息:id,name,classname

1 张三 1班
2 李四 1班
3 王五 2班
4 赵六 2班
5 郭大宝 2班

scores.txt 保存成绩:id,chinese,math,english

1 100 90 80
2 97 87 74
3 70 50 43
4 100 99 99
5 80 81 82

2. 创建工程

根据官网的提示,通过mvn创建flink项目

   $ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.9.0

创建后使用IDEA打开,项目结构如图,把创建好的两份数据保存在resources中.


1586602165374.jpg

编辑pom.xml,主要是引入一些flink的依赖:

<dependencies>
<!--flink core-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--flink-table-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--kafka-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>

3. 实现功能

创建SQLBatch的JAVA类,实现功能。

package com.cmbc.flink;
​
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
​
import static org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE;
​
​
public class SQLBatch {
    public static void main(String[] args) throws Exception {
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
​
        // read files
        DataSet<String> s_students = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/students.txt");
        DataSet<String> s_score = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/scores.txt");
​
        // prepare data
        DataSet<Tuple3<Integer, String, String>> students = s_students.map(new MapFunction<String, Tuple3<Integer, String, String>>() {
            @Override
            public Tuple3<Integer, String, String> map(String s) throws Exception {
                String[] line = s.split(" ");
                return new Tuple3<Integer, String, String>(Integer.valueOf(line[0]), line[1], line[2]);
            }
        });
​
        DataSet<Tuple4<Integer, Integer, Integer, Integer>> score = s_score.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
            @Override
            public Tuple4<Integer, Integer, Integer, Integer> map(String s) throws Exception {
                String[] line = s.split(" ");
                return new Tuple4<Integer, Integer, Integer, Integer>(Integer.valueOf(line[0]), Integer.valueOf(line[1]),
                        Integer.valueOf(line[2]), Integer.valueOf(line[3]));
            }
        });
​
        // join data
        DataSet<Tuple6<Integer, String, String, Integer, Integer, Integer>> data = students.join(score)
                .where(0)
                .equalTo(0)
                .projectFirst(0,1,2)
                .projectSecond(1,2,3);
​
​
        // register to a table
        tEnv.registerDataSet("Data", data, "id, name, classname, chinese, math, english");
​
​
        // do sql
        Table sqlQuery = tEnv.sqlQuery("SELECT classname, AVG(chinese) as avg_chinese, AVG(math) as avg_math, AVG(english) as avg_english, " +
                "AVG(chinese + math + english) as avg_total " +
                "FROM Data " +
                "GROUP BY classname " +
                "ORDER BY avg_total"
        );
​
        // to sink
        DataSet<Info> result = tEnv.toDataSet(sqlQuery, Info.class);
        result.writeAsText("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/info.txt", OVERWRITE);
        tEnv.execute("do flink sql demo in batch");
​
    }
​
    public static class Info {
        public String classname;
        public Integer avg_chinese;
        public Integer avg_math;
        public Integer avg_english;
        public Integer avg_total;
​
        public Info() {
        }
​
        public Info(String classname, Integer avg_chinese, Integer avg_math, Integer avg_english, Integer avg_total) {
            this.classname = classname;
            this.avg_chinese = avg_chinese;
            this.avg_math = avg_math;
            this.avg_english = avg_english;
            this.avg_total = avg_total;
        }
​
        @Override
        public String toString() {
            return
                    "classname=" + classname +
                    ", avg_chinese=" + avg_chinese +
                    ", avg_math=" + avg_math +
                    ", avg_english=" + avg_english +
                    ", avg_total=" + avg_total +
                    "";
        }
    }
}

功能比较简单,简单说一下:

4. 运行和结果

flink run -c com.cmbc.flink.SQLBatch flinksql-1.0-SNAPSHOT.jar

5. 参考

上一篇下一篇

猜你喜欢

热点阅读