Flink(1.13) FlinkSql自定义函数

2021-08-25  本文已影响0人  万事万物

函数分类

官网介绍
Currently, Flink distinguishes between the following kinds of functions:

自定义 Scalar functions

import org.apache.flink.table.functions.ScalarFunction;

/**
 * 字符串转换大写
 * @author admin
 * @date 2021/8/21
 */
public class MyScalarFunctionByUppercase extends ScalarFunction {

}
org.apache.flink.table.api.ValidationException: Function class 'com.admin.flink.demo12.function.MyScalarFunctionByUppercase' 
does not implement a method named 'eval'.

方法名必须叫eval,参数和返回值随意。

public class MyScalarFunctionByUppercase extends ScalarFunction {
    public String eval(String words){
        return words.toUpperCase();
    }
}
    @Test
    public void test1(){
        // 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 模拟数据
        DataStreamSource<String> source = env.fromElements("java", "google", "hello");

        // 给字段取名
        Table table = tableEnv.fromDataStream(source,$("words"));

        // 使用内联的方式
        table.select($("words"),call(MyScalarFunctionByUppercase.class,$("words")))
                .execute()
                .print();
    }
+----+--------------------------------+--------------------------------+
| op |                          words |                            _c1 |
+----+--------------------------------+--------------------------------+
| +I |                           java |                           JAVA |
| +I |                         google |                         GOOGLE |
| +I |                          hello |                          HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
        // 先注册再使用
        tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);

        table.select($("words"),call("toUppercase",$("words")))
                .execute()
                .print();
+----+--------------------------------+--------------------------------+
| op |                          words |                            _c1 |
+----+--------------------------------+--------------------------------+
| +I |                           java |                           JAVA |
| +I |                         google |                         GOOGLE |
| +I |                          hello |                          HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
        // 给字段取名
        Table table = tableEnv.fromDataStream(source,$("words"));

        // 先注册再使用
        tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);

        // 在sql中使用
        tableEnv.sqlQuery("select words,toUppercase(words) as upp_words from "+table)
        .execute()
        .print();
+----+--------------------------------+--------------------------------+
| op |                          words |                      upp_words |
+----+--------------------------------+--------------------------------+
| +I |                           java |                           JAVA |
| +I |                         google |                         GOOGLE |
| +I |                          hello |                          HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set

自定义 Table functions

/**
 * 行专列
 * 泛型:每行数据有多列
 * @FunctionHint 指定返回列的类型
 * @author admin
 * @date 2021/8/21
 */
@FunctionHint(output = @DataTypeHint("row<w string,len int>"))
public class MyTableFunctionByRowToColumn extends TableFunction<Row> {

    public void eval(String phrase){

        Arrays.stream(phrase.split(" ")).forEach(s -> {
            collect( Row.of(s,s.length()));
        });
    }

}

查询

+----+--------------------------------+--------------------------------+-------------+
| op |                         phrase |                              w |         len |
+----+--------------------------------+--------------------------------+-------------+
| +I |                   hello world! |                          hello |           5 |
| +I |                   hello world! |                         world! |           6 |
| +I |                     明天 你好! |                           明天 |           2 |
| +I |                     明天 你好! |                          你好! |           3 |
| +I |                      数码 宝贝 |                           数码 |           2 |
| +I |                      数码 宝贝 |                           宝贝 |           2 |
| +I |                   名侦探 柯南! |                         名侦探 |           3 |
| +I |                   名侦探 柯南! |                          柯南! |           3 |
+----+--------------------------------+--------------------------------+-------------+
8 rows in set
    @Test
    public void test1(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "数码 宝贝", "名侦探 柯南!");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(source,$("phrase"));

        // 需求:将元数据炸开,
        table.joinLateral(call(MyTableFunctionByRowToColumn.class,$("phrase")))
                .execute().print();

    }
    @Test
    public void test2(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "数码 宝贝", "名侦探 柯南!");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(source,$("phrase"));

        // 创建一张临时表
        tableEnv.createTemporaryView("t",table);

        // 需求:将元数据炸开,

        // 注册
        tableEnv.createFunction("rowToColumn",MyTableFunctionByRowToColumn.class);

        //查询
        tableEnv.sqlQuery("select phrase , w ,len from t join lateral table (rowToColumn(phrase)) on true")
                .execute()
                .print();

    }
    <!--取别名-->
    <sql id="tableFunction2">
        select
        phrase , w1 ,len1
        from #{tableName}
        join lateral table (rowToColumn(phrase))
        as T(w1,len1)
        on true
    </sql>

查询

+----+--------------------------------+--------------------------------+-------------+
| op |                         phrase |                             w1 |        len1 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                   hello world! |                          hello |           5 |
| +I |                   hello world! |                         world! |           6 |
| +I |                     明天 你好! |                           明天 |           2 |
| +I |                     明天 你好! |                          你好! |           3 |
| +I |                      数码 宝贝 |                           数码 |           2 |
| +I |                      数码 宝贝 |                           宝贝 |           2 |
| +I |                   名侦探 柯南! |                         名侦探 |           3 |
| +I |                   名侦探 柯南! |                          柯南! |           3 |
+----+--------------------------------+--------------------------------+-------------+
上一篇下一篇

猜你喜欢

热点阅读