Flink-sql自定义UDTFA函数

2021-08-19  本文已影响0人  wudl

1. 用自定义的函数在Flink Sql 中使用

1.1 官网也说的很详细

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

1.1.1 官网上面的例子:

**
 * Accumulator for Top2.
 */
public class Top2Accum {
    public Integer first;
    public Integer second;
}

/**
 * The top2 user-defined table aggregate function.
 */
public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {

    @Override
    public Top2Accum createAccumulator() {
        Top2Accum acc = new Top2Accum();
        acc.first = Integer.MIN_VALUE;
        acc.second = Integer.MIN_VALUE;
        return acc;
    }


    public void accumulate(Top2Accum acc, Integer v) {
        if (v > acc.first) {
            acc.second = acc.first;
            acc.first = v;
        } else if (v > acc.second) {
            acc.second = v;
        }
    }

    public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
        for (Top2Accum otherAcc : iterable) {
            accumulate(acc, otherAcc.first);
            accumulate(acc, otherAcc.second);
        }
    }

    public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
        // emit the value and rank
        if (acc.first != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.first, 1));
        }
        if (acc.second != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.second, 2));
        }
    }
}

tEnv.registerFunction("top2", new Top2());
Table orders = tableEnv.from("Orders");
Table result = orders
    .groupBy($("key"))
    .flatAggregate(call("top2", $("a")).as("v", "rank"))
    .select($("key"), $("v"), $("rank");

2.自己实现

个人理解:对实时输入的一组数据找到最大的一个

package com.wudl.flink.sql;

import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggregate;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;


/**
 * @ClassName : Flink_Sql_Function_UDTFA
 * @Description : Flink自定义udtfa 函数
 * @Author :wudl
 * @Date: 2021-08-18 23:55
 */

public class Flink_Sql_Function_UDTFA {
    public static void main(String[] args) throws Exception {


        //1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //2. 读取端口中的数据并且转化为javaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDs = env.socketTextStream("192.168.1.161", 9999)
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                });

        // 3. 讲流 转化为动态表
        Table table = tableEnv.fromDataStream(waterSensorDs);


        // 5. 先注册在使用
        tableEnv.createTemporarySystemFunction("Top2", Top2.class);
        //5.1 使用table api 实现的方式
        table.groupBy($("id"))
                .flatAggregate(call("top2",$("vc")).as("top","rank"))
                .select($("id"),$("top"),$("rank"))
                .execute()
                .print();
        // 5.2 采用sql 的写法

        //5. 执行任务
        env.execute();


    }

    // 自定义函数类Udtf 求平均数
    public static class Top2 extends TableAggregateFunction<Tuple2<Integer,String>,VcTop2> {


        public void accumulate(VcTop2 acc, Integer value) {

            if (value > acc.getTopOne()) {
                acc.setTopTwo(acc.getTopOne());
                acc.setTopOne(value);
            } else if (value > acc.getTopTwo()) {
                acc.setTopTwo(value);
            }
        }

        public void emitValue(VcTop2 acc, Collector<org.apache.flink.api.java.tuple.Tuple2<Integer, String>> out) {
            out.collect(new org.apache.flink.api.java.tuple.Tuple2<>(acc.getTopOne(), "Top1"));
            if (acc.getTopTwo() > Integer.MIN_VALUE) {
                out.collect(new org.apache.flink.api.java.tuple.Tuple2<>(acc.getTopTwo(), "Top2"));
            }
        }

        @Override
        public VcTop2 createAccumulator() {
            return new VcTop2(Integer.MIN_VALUE, Integer.MIN_VALUE);
        }
    }

}

实体类

package com.wudl.flink.sql;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @ClassName : VcTop2
 * @Description :
 * @Author :wudl
 * @Date: 2021-08-18 23:46
 */

@Data
@NoArgsConstructor
@AllArgsConstructor
public class VcTop2 {
    private int topOne;
    private int topTwo;
}

Flink-UDTFA函数.png
上一篇下一篇

猜你喜欢

热点阅读