Flink

Flink-sql自定义udtf 函数

2021-08-12  本文已影响0人  wudl

1. 用自定义的函数在Flink Sql 中使用

1.1 官网也说的很详细

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html

1.1.1 官网上面的例子:

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

// function with overloaded evaluation methods
// but globally defined output type
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends TableFunction<Row> {

  public void eval(int a, int b) {
    collect(Row.of("Sum", a + b));
  }

  // overloading of arguments is still possible
  public void eval() {
    collect(Row.of("Empty args", -1));
  }
}

// decouples the type inference from evaluation methods,
// the type inference is entirely determined by the function hints
@FunctionHint(
  input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
  output = @DataTypeHint("INT")
)
@FunctionHint(
  input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
  output = @DataTypeHint("BIGINT")
)
@FunctionHint(
  input = {},
  output = @DataTypeHint("BOOLEAN")
)
public static class OverloadedFunction extends TableFunction<Object> {

  // an implementer just needs to make sure that a method exists
  // that can be called by the JVM
  public void eval(Object... o) {
    if (o.length == 0) {
      collect(false);
    }
    collect(o[0]);
  }
}

2. 自己实现

需要注意的是: 自己实现参数定义传参

package com.wudl.flink.sql;

import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * @ClassName : Flink_Sql_Function_UDTF
 * @Description : Flink自定义udtf 函数
 * @Author :wudl
 * @Date: 2021-08-11 22:55
 */

public class Flink_Sql_Function_UDTF {
    public static void main(String[] args) throws Exception {


        //1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //2. 读取端口中的数据并且转化为javaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDs = env.socketTextStream("192.168.1.180", 9999)
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                });

        // 3. 讲流 转化为动态表
        Table table = tableEnv.fromDataStream(waterSensorDs);
        // 5. 先注册在使用
        tableEnv.createTemporarySystemFunction("mySpilt",mySpilt.class);
        //5.1 在使用注册的自定义函数 名称为MyLength
//        table.joinLateral(call("mySpilt",$("id")))
//                .select($("id"),$("s")).execute().print();
        // 5.2 采用sql 的方式进行使用自定义函数
            tableEnv.sqlQuery("select id, s from  "+table+", lateral table(mySpilt(id))").execute().print();
        //5. 执行任务
        env.execute();


    }

    // 自定义函数类
    @FunctionHint(output = @DataTypeHint("ROW<s STRING>"))
    public static class  mySpilt extends TableFunction<Row> {
        public void eval(String value) {
            String[] split = value.split("_");
            for (String str:split)
            {
                collect(Row.of(str));
            }
        }
    }

}

Flink-udft 函数.png
上一篇下一篇

猜你喜欢

热点阅读