Flink(1.13) FlinkSql自定义函数
2021-08-25 本文已影响0人
万事万物
函数分类
官网介绍
Currently, Flink distinguishes between the following kinds of functions:
- Scalar functions:标量函数将标量值映射到一个新的标量值。
- Table functions:制表函数将标量值映射到新行(类似于列转行)。
- Aggregate functions:聚合函数将多行标量值映射为新标量值。
- Table aggregate functions:属于
Table functions
和Aggregate functions
功能的合并。 - Async table functions:异步表函数是用于执行查找的表源的特殊函数。
自定义 Scalar functions
- 准备一个类
import org.apache.flink.table.functions.ScalarFunction;
/**
* 字符串转换大写
* @author admin
* @date 2021/8/21
*/
public class MyScalarFunctionByUppercase extends ScalarFunction {
}
- 异常:需要自定义
eval
方法。
org.apache.flink.table.api.ValidationException: Function class 'com.admin.flink.demo12.function.MyScalarFunctionByUppercase'
does not implement a method named 'eval'.
方法名必须叫eval
,参数和返回值随意。
public class MyScalarFunctionByUppercase extends ScalarFunction {
public String eval(String words){
return words.toUpperCase();
}
}
- 应用
@Test
public void test1(){
// 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 模拟数据
DataStreamSource<String> source = env.fromElements("java", "google", "hello");
// 给字段取名
Table table = tableEnv.fromDataStream(source,$("words"));
// 使用内联的方式
table.select($("words"),call(MyScalarFunctionByUppercase.class,$("words")))
.execute()
.print();
}
- 查询
+----+--------------------------------+--------------------------------+
| op | words | _c1 |
+----+--------------------------------+--------------------------------+
| +I | java | JAVA |
| +I | google | GOOGLE |
| +I | hello | HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
- 注册后再使用
// 先注册再使用
tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);
table.select($("words"),call("toUppercase",$("words")))
.execute()
.print();
- 查询
+----+--------------------------------+--------------------------------+
| op | words | _c1 |
+----+--------------------------------+--------------------------------+
| +I | java | JAVA |
| +I | google | GOOGLE |
| +I | hello | HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
- 在sql中使用,必须先注册
// 给字段取名
Table table = tableEnv.fromDataStream(source,$("words"));
// 先注册再使用
tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);
// 在sql中使用
tableEnv.sqlQuery("select words,toUppercase(words) as upp_words from "+table)
.execute()
.print();
- 查询
+----+--------------------------------+--------------------------------+
| op | words | upp_words |
+----+--------------------------------+--------------------------------+
| +I | java | JAVA |
| +I | google | GOOGLE |
| +I | hello | HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
自定义 Table functions
- 自定义函数
/**
* 行专列
* 泛型:每行数据有多列
* @FunctionHint 指定返回列的类型
* @author admin
* @date 2021/8/21
*/
@FunctionHint(output = @DataTypeHint("row<w string,len int>"))
public class MyTableFunctionByRowToColumn extends TableFunction<Row> {
public void eval(String phrase){
Arrays.stream(phrase.split(" ")).forEach(s -> {
collect( Row.of(s,s.length()));
});
}
}
查询
+----+--------------------------------+--------------------------------+-------------+
| op | phrase | w | len |
+----+--------------------------------+--------------------------------+-------------+
| +I | hello world! | hello | 5 |
| +I | hello world! | world! | 6 |
| +I | 明天 你好! | 明天 | 2 |
| +I | 明天 你好! | 你好! | 3 |
| +I | 数码 宝贝 | 数码 | 2 |
| +I | 数码 宝贝 | 宝贝 | 2 |
| +I | 名侦探 柯南! | 名侦探 | 3 |
| +I | 名侦探 柯南! | 柯南! | 3 |
+----+--------------------------------+--------------------------------+-------------+
8 rows in set
- 应用 table api 使用(内联)
@Test
public void test1(){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "数码 宝贝", "名侦探 柯南!");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.fromDataStream(source,$("phrase"));
// 需求:将元数据炸开,
table.joinLateral(call(MyTableFunctionByRowToColumn.class,$("phrase")))
.execute().print();
}
- 应用 sql 使用
@Test
public void test2(){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "数码 宝贝", "名侦探 柯南!");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.fromDataStream(source,$("phrase"));
// 创建一张临时表
tableEnv.createTemporaryView("t",table);
// 需求:将元数据炸开,
// 注册
tableEnv.createFunction("rowToColumn",MyTableFunctionByRowToColumn.class);
//查询
tableEnv.sqlQuery("select phrase , w ,len from t join lateral table (rowToColumn(phrase)) on true")
.execute()
.print();
}
- 取别名,内部内置函数 T 就是用于取别名
<!--取别名-->
<sql id="tableFunction2">
select
phrase , w1 ,len1
from #{tableName}
join lateral table (rowToColumn(phrase))
as T(w1,len1)
on true
</sql>
查询
+----+--------------------------------+--------------------------------+-------------+
| op | phrase | w1 | len1 |
+----+--------------------------------+--------------------------------+-------------+
| +I | hello world! | hello | 5 |
| +I | hello world! | world! | 6 |
| +I | 明天 你好! | 明天 | 2 |
| +I | 明天 你好! | 你好! | 3 |
| +I | 数码 宝贝 | 数码 | 2 |
| +I | 数码 宝贝 | 宝贝 | 2 |
| +I | 名侦探 柯南! | 名侦探 | 3 |
| +I | 名侦探 柯南! | 柯南! | 3 |
+----+--------------------------------+--------------------------------+-------------+