Flink-sql自定义UDTFA函数
2021-08-19 本文已影响0人
wudl
1. 用自定义的函数在Flink Sql 中使用
1.1 官网也说的很详细
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html
1.1.1 官网上面的例子:
**
* Accumulator for Top2.
*/
public class Top2Accum {
public Integer first;
public Integer second;
}
/**
* The top2 user-defined table aggregate function.
*/
public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
@Override
public Top2Accum createAccumulator() {
Top2Accum acc = new Top2Accum();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
return acc;
}
public void accumulate(Top2Accum acc, Integer v) {
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
} else if (v > acc.second) {
acc.second = v;
}
}
public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
for (Top2Accum otherAcc : iterable) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}
public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
// emit the value and rank
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
tEnv.registerFunction("top2", new Top2());
Table orders = tableEnv.from("Orders");
Table result = orders
.groupBy($("key"))
.flatAggregate(call("top2", $("a")).as("v", "rank"))
.select($("key"), $("v"), $("rank");
2.自己实现
个人理解:对实时输入的一组数据找到最大的一个
package com.wudl.flink.sql;
import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggregate;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
/**
* @ClassName : Flink_Sql_Function_UDTFA
* @Description : Flink自定义udtfa 函数
* @Author :wudl
* @Date: 2021-08-18 23:55
*/
public class Flink_Sql_Function_UDTFA {
public static void main(String[] args) throws Exception {
//1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2. 读取端口中的数据并且转化为javaBean
SingleOutputStreamOperator<WaterSensor> waterSensorDs = env.socketTextStream("192.168.1.161", 9999)
.map(line -> {
String[] split = line.split(",");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
});
// 3. 讲流 转化为动态表
Table table = tableEnv.fromDataStream(waterSensorDs);
// 5. 先注册在使用
tableEnv.createTemporarySystemFunction("Top2", Top2.class);
//5.1 使用table api 实现的方式
table.groupBy($("id"))
.flatAggregate(call("top2",$("vc")).as("top","rank"))
.select($("id"),$("top"),$("rank"))
.execute()
.print();
// 5.2 采用sql 的写法
//5. 执行任务
env.execute();
}
// 自定义函数类Udtf 求平均数
public static class Top2 extends TableAggregateFunction<Tuple2<Integer,String>,VcTop2> {
public void accumulate(VcTop2 acc, Integer value) {
if (value > acc.getTopOne()) {
acc.setTopTwo(acc.getTopOne());
acc.setTopOne(value);
} else if (value > acc.getTopTwo()) {
acc.setTopTwo(value);
}
}
public void emitValue(VcTop2 acc, Collector<org.apache.flink.api.java.tuple.Tuple2<Integer, String>> out) {
out.collect(new org.apache.flink.api.java.tuple.Tuple2<>(acc.getTopOne(), "Top1"));
if (acc.getTopTwo() > Integer.MIN_VALUE) {
out.collect(new org.apache.flink.api.java.tuple.Tuple2<>(acc.getTopTwo(), "Top2"));
}
}
@Override
public VcTop2 createAccumulator() {
return new VcTop2(Integer.MIN_VALUE, Integer.MIN_VALUE);
}
}
}
实体类
package com.wudl.flink.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @ClassName : VcTop2
* @Description :
* @Author :wudl
* @Date: 2021-08-18 23:46
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class VcTop2 {
private int topOne;
private int topTwo;
}
Flink-UDTFA函数.png