Big Data

flink 自定义函数

2020-01-15  本文已影响0人  盗梦者_56f2

用户自定义的函数是一个重要的特性,因为它们展示扩展了查询的表达能力。

注册用户自定义的函数

在大多数情况下,必须先注册用户定义的函数,然后才能在查询中使用它。TableEnvironment通过调用registerFunction()方法来注册函数。

标量函数(Scalar Functions)

如果内置函数中不包含必需的标量函数,则可以为Table API和SQL定义用户自定义的标量函数。用户定义的标量函数将零个,一个或多个标量值映射到新的标量值。
为了定义标量函数,必须扩展org.apache.flink.Table.function中的基类ScalarFunction和实现(一个或多个)评估方法。标量函数的行为由评估方法决定。必须公开声明一个评估方法并命名为eval。评估方法的参数类型和返回类型也决定标量函数的参数和返回类型。还可以通过实现名为eval的多个评估方法来重载评估方法。评估方法也可以支持变量参数,比如eval(String…str)

以下示例展示如何定义标量函数,在TableEnvironment中注册它,并在查询中调用它。

public class HashCode extends ScalarFunction {
  private int factor = 12;
  
  public HashCode(int factor) {
      this.factor = factor;
  }
  //一个约定方法
  public int eval(String s) {
      return s.hashCode() * factor;
  }
}

BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");

默认情况下,评估方法的结果类型由Flink的类型提取工具决定。对于基本类型或简单pojo,这已经足够了。但是对于更复杂的、自定义的或复合类型,这可能是错误的。在这些情况下,可以通过通过重写ScalarFunctiongetResultType()手动定义结果类型的类型信息。

表值函数(Table Functions)

与用户定义的标量函数类似,用户定义的表函数将零个,一个或多个标量值作为输入参数。但是,与标量函数相比,它可以返回任意数量的行作为输出而不是单个值。返回的行可以包含一个或多个列。
为了定义表函数,必须扩展org.apache.flink.table.function中的基类TableFunction和实现(一个或多个)评估方法。表函数的行为由其评估方法决定。一个评估方法必须声明为public并命名为eval。可以通过实现名为eval的多个评估方法来重载TableFunction。评估方法的参数类型决定了表函数的所有有效参数。评估方法也可以支持变量参数,比如eval(String…str)。返回的表的类型由TableFunction的泛型类型决定。评估方法使用受保护的collect(T)方法发出输出行。
在Table API中,表函数与.join(Table). leftouterjoin(Table)一起使用。joinLateral 操作将外部表(操作符左侧的表)中的每一行与表值函数(操作符右侧的表)生成的所有行关联起来。leftOuterJoin操作符将外部表(操作符左侧的表)中的每一行与表值函数(操作符右侧的表)生成的所有行连接起来,并保留表函数返回空表的外部行。在SQL中,使用带有CROSS JOIN或LEFT JOIN且具有ON TRUE连接条件的LATERAL TABLE(<Table Function>)。

以下示例展示如何定义表值函数,在TableEnvironment中注册它,并在查询中调用它。

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
    private String separator = " ";
    
    public Split(String separator) {
        this.separator = separator;
    }
    //输入一行一列输出多行两个列
    public void eval(String str) {
        for (String s : str.split(separator)) {
            // use collect(...) to emit a row
            collect(new Tuple2<String, Integer>(s, s.length()));
        }
    }
}

BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ...         // table schema: [a: String]

// Register the function.
tableEnv.registerFunction("split", new Split("#"));

// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
    .select("a, word, length");
myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
    .select("a, word, length");

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");

默认情况下,评估方法的结果类型由Flink的类型提取工具决定。对于基本类型或简单pojo,这已经足够了。但是对于更复杂的、自定义的或复合类型,这可能是错误的。在这些情况下,可以通过通过重写TableFunctiongetResultType()手动定义结果类型的类型信息。

聚合函数

用户定义的聚合函数将一个表的一个或多个行并且具有一个或多个属性聚合为标量值。
用户定义的聚合函数是通过扩展AggregateFunction类实现的。AggregateFunction的工作原理如下。首先,需要定义一个累加器,它是保存聚合的中间结果的数据结构。然后是利用AggregateFunctioncreateAccumulator()方法创建一个空的累加器。接下来,对每个输入行调用函数的accumulate()方法来更新累加器。处理完所有行之后,调用函数的getValue()方法来计算并返回最终结果。
除了上述所必须的方法之外,还有一些可选择性实现的约定方法。虽然其中一些方法允许系统更高效地执行查询,但是其他方法对于某些用例则必需的。e.g. retract():在有界OVER窗口上聚合是必需的。merge():许多批处理聚合和会话窗口聚合都需要。resetAccumulator():许多批处理聚合都需要。
AggregateFunction的所有方法都必须声明为public的,而不是static

/**
 * Accumulator for WeightedAvg.
 */
public static class WeightedAvgAccum {
    public long sum = 0;
    public int count = 0;
}

/**
 * Weighted Average user-defined aggregate function.
 */
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {

    @Override
    public WeightedAvgAccum createAccumulator() {
        return new WeightedAvgAccum();
    }

    @Override
    public Long getValue(WeightedAvgAccum acc) {
        if (acc.count == 0) {
            return null;
        } else {
            return acc.sum / acc.count;
        }
    }

    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum += iValue * iWeight;
        acc.count += iWeight;
    }
    //可选择性的实现
    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum -= iValue * iWeight;
        acc.count -= iWeight;
    }
     //可选择性的实现
    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
        Iterator<WeightedAvgAccum> iter = it.iterator();
        while (iter.hasNext()) {
            WeightedAvgAccum a = iter.next();
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }
     //可选择性的实现
    public void resetAccumulator(WeightedAvgAccum acc) {
        acc.count = 0;
        acc.sum = 0L;
    }
}

// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());

// use function
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");

将UDF与运行时集成

有时,用户定义的函数可能需要获取全局运行时信息,或者在实际工作之前进行一些设置/清理工作。用户自定义的函数可以通过覆盖open()和close()方法实现。
open()方法在评估方法之前调用一次。在最后一次调用评估方法之后在调用close()方法。
open()方法提供一个FunctionContext,其中包含有关用户定义函数在其中执行的上下文的信息。

public class HashCode extends ScalarFunction {

    private int factor = 0;

    @Override
    public void open(FunctionContext context) throws Exception {
        // access "hashcode_factor" parameter
        // "12" would be the default value if parameter does not exist
        factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); 
    }

    public int eval(String s) {
        return s.hashCode() * factor;
    }
}

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);

// register the function
tableEnv.registerFunction("hashCode", new HashCode());

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
上一篇 下一篇

猜你喜欢

热点阅读