Flink-sql自定义udtf 函数
2021-08-12 本文已影响0人
wudl
1. 用自定义的函数在Flink Sql 中使用
1.1 官网也说的很详细
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html
1.1.1 官网上面的例子:
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
// function with overloaded evaluation methods
// but globally defined output type
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends TableFunction<Row> {
public void eval(int a, int b) {
collect(Row.of("Sum", a + b));
}
// overloading of arguments is still possible
public void eval() {
collect(Row.of("Empty args", -1));
}
}
// decouples the type inference from evaluation methods,
// the type inference is entirely determined by the function hints
@FunctionHint(
input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
output = @DataTypeHint("INT")
)
@FunctionHint(
input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
output = @DataTypeHint("BIGINT")
)
@FunctionHint(
input = {},
output = @DataTypeHint("BOOLEAN")
)
public static class OverloadedFunction extends TableFunction<Object> {
// an implementer just needs to make sure that a method exists
// that can be called by the JVM
public void eval(Object... o) {
if (o.length == 0) {
collect(false);
}
collect(o[0]);
}
}
2. 自己实现
需要注意的是: 自己实现参数定义传参
package com.wudl.flink.sql;
import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
/**
* @ClassName : Flink_Sql_Function_UDTF
* @Description : Flink自定义udtf 函数
* @Author :wudl
* @Date: 2021-08-11 22:55
*/
public class Flink_Sql_Function_UDTF {
public static void main(String[] args) throws Exception {
//1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2. 读取端口中的数据并且转化为javaBean
SingleOutputStreamOperator<WaterSensor> waterSensorDs = env.socketTextStream("192.168.1.180", 9999)
.map(line -> {
String[] split = line.split(",");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
});
// 3. 讲流 转化为动态表
Table table = tableEnv.fromDataStream(waterSensorDs);
// 5. 先注册在使用
tableEnv.createTemporarySystemFunction("mySpilt",mySpilt.class);
//5.1 在使用注册的自定义函数 名称为MyLength
// table.joinLateral(call("mySpilt",$("id")))
// .select($("id"),$("s")).execute().print();
// 5.2 采用sql 的方式进行使用自定义函数
tableEnv.sqlQuery("select id, s from "+table+", lateral table(mySpilt(id))").execute().print();
//5. 执行任务
env.execute();
}
// 自定义函数类
@FunctionHint(output = @DataTypeHint("ROW<s STRING>"))
public static class mySpilt extends TableFunction<Row> {
public void eval(String value) {
String[] split = value.split("_");
for (String str:split)
{
collect(Row.of(str));
}
}
}
}
Flink-udft 函数.png