Flink实战—Flink SQL在Batch场景的Demo
2020-04-11 本文已影响0人
北邮郭大宝
最近工作会用到Flink SQL,周末学习了一下,写个demo做记录,全部代码请参考Github.
基于的Flink版本是1.9.1,使用的是java8开发。
本例是Flink SQL在Batch场景下的应用,目标是从students、scores表中读取学生的信息,计算班级平均分。
1. 准备数据
students.txt 保存学生信息:id,name,classname
1 张三 1班
2 李四 1班
3 王五 2班
4 赵六 2班
5 郭大宝 2班
scores.txt 保存成绩:id,chinese,math,english
1 100 90 80
2 97 87 74
3 70 50 43
4 100 99 99
5 80 81 82
2. 创建工程
根据官网的提示,通过mvn创建flink项目
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
创建后使用IDEA打开,项目结构如图,把创建好的两份数据保存在resources中.
1586602165374.jpg
编辑pom.xml,主要是引入一些flink的依赖:
<dependencies>
<!--flink core-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink-table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
3. 实现功能
创建SQLBatch的JAVA类,实现功能。
package com.cmbc.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import static org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE;
public class SQLBatch {
public static void main(String[] args) throws Exception {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
// read files
DataSet<String> s_students = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/students.txt");
DataSet<String> s_score = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/scores.txt");
// prepare data
DataSet<Tuple3<Integer, String, String>> students = s_students.map(new MapFunction<String, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> map(String s) throws Exception {
String[] line = s.split(" ");
return new Tuple3<Integer, String, String>(Integer.valueOf(line[0]), line[1], line[2]);
}
});
DataSet<Tuple4<Integer, Integer, Integer, Integer>> score = s_score.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
@Override
public Tuple4<Integer, Integer, Integer, Integer> map(String s) throws Exception {
String[] line = s.split(" ");
return new Tuple4<Integer, Integer, Integer, Integer>(Integer.valueOf(line[0]), Integer.valueOf(line[1]),
Integer.valueOf(line[2]), Integer.valueOf(line[3]));
}
});
// join data
DataSet<Tuple6<Integer, String, String, Integer, Integer, Integer>> data = students.join(score)
.where(0)
.equalTo(0)
.projectFirst(0,1,2)
.projectSecond(1,2,3);
// register to a table
tEnv.registerDataSet("Data", data, "id, name, classname, chinese, math, english");
// do sql
Table sqlQuery = tEnv.sqlQuery("SELECT classname, AVG(chinese) as avg_chinese, AVG(math) as avg_math, AVG(english) as avg_english, " +
"AVG(chinese + math + english) as avg_total " +
"FROM Data " +
"GROUP BY classname " +
"ORDER BY avg_total"
);
// to sink
DataSet<Info> result = tEnv.toDataSet(sqlQuery, Info.class);
result.writeAsText("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/info.txt", OVERWRITE);
tEnv.execute("do flink sql demo in batch");
}
public static class Info {
public String classname;
public Integer avg_chinese;
public Integer avg_math;
public Integer avg_english;
public Integer avg_total;
public Info() {
}
public Info(String classname, Integer avg_chinese, Integer avg_math, Integer avg_english, Integer avg_total) {
this.classname = classname;
this.avg_chinese = avg_chinese;
this.avg_math = avg_math;
this.avg_english = avg_english;
this.avg_total = avg_total;
}
@Override
public String toString() {
return
"classname=" + classname +
", avg_chinese=" + avg_chinese +
", avg_math=" + avg_math +
", avg_english=" + avg_english +
", avg_total=" + avg_total +
"";
}
}
}
功能比较简单,简单说一下:
- 初始化flink env
- 读取文件数据,这里读取student.txt、scores.txt两张表
- 数据预处理,这里通过id字段将两个表的数据join出dataset
- 将dataset映射成table,并执行sql
- 数据保存
4. 运行和结果
- 启动flink on local的模式 ,在flink的安装路径下找到脚本start-cluster.sh
- mvn打Jar包:mvn clean package,或者在idea里完成这一步,jar包位置在项目target路径下
- 执行脚本:
flink run -c com.cmbc.flink.SQLBatch flinksql-1.0-SNAPSHOT.jar
-
结果
1586602913833.jpg