如何在Calcite中注册函数

2019-05-12  本文已影响0人  ni_d58f

很多同学在使用Calcite的过程中需要自定义函数, 现在讲讲如何定自义函数

1. Calcite 内置函数和对应的流程

Calcite中内置的函数主要在SqlStdOperatorTable中, 包括常见的算术运算符、时间函数等。现在就以一个列子来说明在SqlStdOperatorTable 中添加函数以达到注册函数的功能

  1. SqlStdOperatorTable.java 中添加对应函数
public static final SqlFunction TIMESTR2LONG = new SqlFunction(
          new SqlIdentifier("TIMESTR2LONG", SqlParserPos.ZERO),
          //返回值为Long, 可以为NULL
          ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.BIGINT), SqlTypeTransforms.TO_NULLABLE),
          //输入类型推测为Varchar, 即java中的String
          InferTypes.VARCHAR_1024,
         //类型检查,如果类型不是String, 报错
          OperandTypes.family(SqlTypeFamily.STRING),
          Lists.newArrayList(new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT).createSqlType(SqlTypeName.VARCHAR)),
          SqlFunctionCategory.USER_DEFINED_FUNCTION);
  1. 在自已的代码中将Parser的OperatorTable 设置成上述 SqlStdOperatorTable
public class FunctionTestOne {
    public static final SqlTypeFactoryImpl TYPE_FACTORY = new SqlTypeFactoryImpl(
            RelDataTypeSystem.DEFAULT);
    public static final RelDataTypeSystem TYPE_SYSTEM = RelDataTypeSystem.DEFAULT;

    public static void main(String[] args) {
        CalciteSchema rootSchema = CalciteSchema
                .createRootSchema(false, false);

        //添加表Test
        rootSchema.add("test", new AbstractTable() {
            @Override
            public RelDataType getRowType(RelDataTypeFactory typeFactory) {
                RelDataTypeFactory.Builder builder = new RelDataTypeFactory
                        .Builder(TYPE_FACTORY);
                //列id, 类型int
                builder.add("id", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.INTEGER));
                //列name, 类型为varchar
                builder.add("name", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.VARCHAR));
                builder.add("time_str", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.VARCHAR));
                return builder.build();
            }
        });

        SqlParser.ConfigBuilder builder = SqlParser.configBuilder();
        //以下需要设置成大写并且忽略大小写
        builder.setQuotedCasing(Casing.TO_UPPER);
        builder.setUnquotedCasing(Casing.TO_UPPER);
        builder.setCaseSensitive(false);

        final FrameworkConfig config = Frameworks.newConfigBuilder()
                .defaultSchema(rootSchema.plus())
                .parserConfig(builder.build())
                //注意用你自已的SqlStdOperatorTable, 此处之所以同名
                //目的是覆盖calcite中SqlStdOperatorTable
                .operatorTable(SqlStdOperatorTable.instance())
                .build();

        Planner planner = Frameworks.getPlanner(config);

        //now start to parser

        try {
            SqlNode originSqlNode = planner.parse("select name, timestr2long(time_str) from test where id < 5");
            SqlNode sqlNode = planner.validate(originSqlNode);
            RelRoot root = planner.rel(sqlNode);
            System.out.println(RelOptUtil.toString(root.rel, ALL_ATTRIBUTES));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

稍微解释一下上述代码中主要步骤

  1. 创建rootSchema, 你可以把其理解为Linux中root目录, 当往rootSchema添加table是相当于添加一个普通文件,当往rootSchema添加schema时相当于添加一个目录, 相应的rootSchema就是整个数据库的根,可以往根数据库中添加函数、数据库、表等,添加的子数据库又可以把函数、数据库、表放置其中,不断递归

  2. 然后往rootSchema添加表后,然后设置Parser和OperatorTable 后生成Planner, 至于中间是怎么用的,后面会详细说明

  3. 最后进行Parser/Validate并转化成RelNode

现在简要说明一下函数注册主要过程

Parser阶段,在Parser.jj文件中 有以下内容:

LOOKAHEAD( [<SPECIFIC>] FunctionName() <LPAREN>)
e = NamedFunctionCall()

// NamedFunctionCall的主要内容如下
...
SqlNode NamedFunctionCall() :
    qualifiedName = FunctionName() 
    createCall(qualifiedName, s.end(this), funcType, quantifier, args);
...

FunctionName() 主要是获取函数名,如例中的函数名 timestr2long或者是Calcite内置的一些固定的函数名,如ABS, AVG等
createCall()是生成SqlCall(函数的主要形式), 而createCall()最终会调用

 protected SqlCall createCall(
      SqlIdentifier funName,
      SqlParserPos pos,
      SqlFunctionCategory funcType,
      SqlLiteral functionQualifier,
      SqlNode[] operands) {
    SqlOperator fun = null;

    // First, try a half-hearted resolution as a builtin function.
    // If we find one, use it; this will guarantee that we
    // preserve the correct syntax (i.e. don't quote builtin function
    /// name when regenerating SQL).
    if (funName.isSimple()) {
      final List<SqlOperator> list = new ArrayList<>();
      //这里opTab的值为SqlStdOperatorTable.instance();
      opTab.lookupOperatorOverloads(funName, funcType, SqlSyntax.FUNCTION, list);
      if (list.size() == 1) {
        fun = list.get(0);
      }
    }

    // Otherwise, just create a placeholder function.  Later, during
    // validation, it will be resolved into a real function reference.
    if (fun == null) {
      fun = new SqlUnresolvedFunction(funName, null, null, null, null,
          funcType);
    }

    return fun.createCall(functionQualifier, pos, operands);
  }

经过以上步骤可以发现注册过程流程为:

  1. 在SqlStdOperatorTable按照格式注册自已的函数名
  2. 在Parser过程中,会根据函数名自动在SqlStdOperatorTable查找对应函数
  3. 如果没有找到,会自动将函数解析成SqlUnresolvedFunction

那问题来了,SqlUnresolvedFunction什么时候会解析成具体的函数?答案是: Validate阶段, 在Validate阶段,Calcite 会针对函数做以下工作

  1. 根据函数名在所有函数注册表中查找函数(之所以说是所有函数注册表是因为Calcite还可以在其它地方注册函数, 后面的ChainedSqlOperatorTable, ListSqlOperatorTable等)查找函数
  2. 验证函数的入参:包话参数类型、个数、函数类型等
  3. 递归验证每个入参是否合法,具体过程后续会详细说明

其它方式注册函数

上述注册函数略微tricky, 需要要本地搭建一个org.apache.calcite.sql.fun包并且从calcite-core中复制SqlStdOperatorTable.java的内容以覆盖内置的SqlStdOperatorTable,最后添加自已的函数,通过这种方式实现并不那么优雅,其实还有其它相对优雅的方式进行函数注册,主要有以下两种方式

1. 修改Parser.jj 注册

假如我要实现以下两个方法

    public static Integer func1(String s) {
        return s == null ? null : Integer.valueOf(s);
    }

    public static String func2(Integer i) {
        return i == null ? null : String.valueOf(i);
    }

要在Calcite中注册以上两个函数,只需要在parser.jj中添加以下内容, 见代码

/*
*   Create User-defined function
*/
SqlNode extraFunction() :
{
    SqlNode node;
    SqlNode e;
    List<SqlNode> args = null;
    final Span s;
}
{   (
        (
            <FUNC1> { s = span(); }
            <LPAREN>
            e = Expression(ExprContext.ACCEPT_NON_QUERY)
            {
                startList(e);
            }
            <LPAREN>
            {
                node = FunctionUtil.FUNC1.createCall(s.end(this), args);
            }
        )
        |
        (
            <FUNC2> { s = span(); }
            <LPAREN>
            e = Expression(ExprContext.ACCEPT_NON_QUERY)
            {
                startList(e);
            }
            <LPAREN>
            {
                node = FunctionUtil.FUNC2.createCall(s.end(this), args);
            }

        )
    )
    {
        return node;
    }
}


<DEFAULT, DQID, BTID> TOKEN :
...
|   < FUNCTION: "FUNCTION" >
|   < FUNC1: "FUNC1"> //添加的内容
|   < FUNC2: "FUNC2"> //添加的内容
...

FunctionUtil 内容如下, 见代码

public class FunctionUtil {

    public static final SqlFunction FUNC1 = new SqlFunction(
            new SqlIdentifier("FUNC1", SqlParserPos.ZERO),
            ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.INTEGER), SqlTypeTransforms.TO_NULLABLE),
            InferTypes.VARCHAR_1024,
            OperandTypes.family(SqlTypeFamily.STRING),
            Lists.newArrayList(TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR)),
            SqlFunctionCategory.USER_DEFINED_FUNCTION);

    public static final SqlFunction FUNC2 = new SqlFunction(
            new SqlIdentifier("FUNC2", SqlParserPos.ZERO),
            ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
            InferTypes.FIRST_KNOWN,
            OperandTypes.family(SqlTypeFamily.INTEGER),
            Lists.newArrayList(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER)),
            SqlFunctionCategory.USER_DEFINED_FUNCTION);
}

最后的测试代码如下,见代码

            //now test func
            //将创建的函数放入SqlOperatorTable()中
            ListSqlOperatorTable listSqlOperatorTable = new ListSqlOperatorTable();
            listSqlOperatorTable.add(FUNC1);
            listSqlOperatorTable.add(FUNC2);

            final FrameworkConfig funcConfig = Frameworks.newConfigBuilder()
                    .defaultSchema(rootSchema.plus())
                    .parserConfig(builder.build())
                    //添加一个专们用于添加函数的 listSqlOperatorTable
                    .operatorTable(ChainedSqlOperatorTable.of(listSqlOperatorTable,
                            SqlStdOperatorTable.instance()))
                    .build();

            Planner planner2 = Frameworks.getPlanner(funcConfig);
            SqlNode func1SqlNodeOrg = planner2.parse("select func1(name) from test where id > 4");
            SqlNode func1SqlNode = planner2.validate(func1SqlNodeOrg);
            RelRoot func1Root = planner2.rel(func1SqlNode);
            System.out.println("-------- func1 test -------");
            System.out.println(RelOptUtil.toString(func1Root.rel, ALL_ATTRIBUTES));

            Planner planner3 = Frameworks.getPlanner(funcConfig);
            SqlNode func2SqlNodeOrg = planner3.parse("select func2(id) from test where id > 4");
            SqlNode func2SqlNode = planner3.validate(func2SqlNodeOrg);
            RelRoot func2Root = planner3.rel(func2SqlNode);
            System.out.println("-------- func2 test -------");
            System.out.println(RelOptUtil.toString(func2Root.rel, ALL_ATTRIBUTES));

2. Schema 注册

细心的读者可以发现,以上两种方式都要对calcite做侵入式修改。内置方法需要覆盖SqlStdOpeartorTable, 而修改Parser.jj则需要自已书写Parser逻辑,并在Validate阶段注册函数。考虑到一下场景,需要类似于Hive 注册UDF那样,动态的注册函数,上述两种方式是无法实现的,那如何实现动态注册函数呢? 用schema 注册, 下面以一个例子说明, 代码在

 //主要代码
 public static RelRoot sqlToRelNode(String sql) {

        try {
            SchemaPlus plus = ROOT_SCHEMA.plus();
            plus.add("FUNC1", ScalarFunctionImpl.create(
                    FunctionUtil.class, "func1"));

            plus.add("FUNC2", ScalarFunctionImpl.create(
                    FunctionUtil.class, "func2"));


            SqlParser parser = SqlParser.create(sql, config.getParserConfig());
            SqlNode sqlNode = parser.parseStmt();

            //这里需要注意大小写问题,否则表会无法找到
            Properties properties = new Properties();
            properties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
                    String.valueOf(config.getParserConfig().caseSensitive()));

            CalciteCatalogReader calciteCatalogReader =  new CalciteCatalogReader(
                    CalciteSchema.from(rootSchema(plus)),
                    CalciteSchema.from(config.getDefaultSchema()).path(null),
                    TYPE_FACTORY,
                    new CalciteConnectionConfigImpl(properties));

            //to supported user' define function
            SqlOperatorTable sqlOperatorTable = ChainedSqlOperatorTable
                    .of(config.getOperatorTable(), calciteCatalogReader);

            TestSqlValidatorImpl validator = new TestSqlValidatorImpl(
                    sqlOperatorTable,
                    calciteCatalogReader,
                    TYPE_FACTORY,
                    SqlConformanceEnum.DEFAULT);

            //try to union trait set
            //addRelTraitDef for is HepPlanner has not effect in fact
            VolcanoPlanner volcanoPlanner = new VolcanoPlanner();
            SqlNode validateSqlNode = validator.validate(sqlNode);
            final RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY);
            RelOptCluster cluster = RelOptCluster.create(volcanoPlanner, rexBuilder);

            final SqlToRelConverter.Config sqlToRelConverterConfig
                    = SqlToRelConverter.configBuilder()
                    .withConfig(config.getSqlToRelConverterConfig())
                    .withTrimUnusedFields(false)
                    .withConvertTableAccess(false)
                    .build();

            final SqlToRelConverter sqlToRelConverter =
                    new SqlToRelConverter(null, validator,
                            calciteCatalogReader, cluster, config.getConvertletTable(),
                            sqlToRelConverterConfig);

            RelRoot root =
                    sqlToRelConverter.convertQuery(validateSqlNode, false, true);
            root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
            final RelBuilder relBuilder = sqlToRelConverterConfig
                    .getRelBuilderFactory().create(cluster, null);

            //change trait set of TableScan
            return root.withRel(
                    RelDecorrelator.decorrelateQuery(root.rel, relBuilder));

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

Schema 注册相对于前两者来说过程复杂一点,相好处是可以定制化, 主要有以下特点

  1. 随时注册 随时使用
  2. 引入了schema空间,同一个function可以注册到不同的schema并可以隔离

采用Schema 注册的主要步骤在CalciteCatalogReader读取schema, 限于篇幅,我会专门用一个篇文章详细说明CalciteCatalogReader

3. 总结

以上为三种方式在Calcite注册函数,那么这三种有什么区别

  1. 从灵活度来说,前两种不如通过schema注册,而且通过schema注册也不需要去修改calcite核心代码,适合于初学者使用。更重要的是通过schema注册可以实现函数隔离,可以实现不同数据库级别的函数之持。
  2. 从实现容易度来说, 第一种更为简单,只需要简单修改一个内置的表即可

那么对于一个项目来说 采用哪里方式更好?我的建议是

  1. 如果需要动态注册或者schema级别函数隔离,建议采用第三种
  2. 如果函数相对固定而且函数数量较多,建议采用第一种
  3. 如果需要更加细致定制函数且函数数量不多,可以采用第二种
上一篇下一篇

猜你喜欢

热点阅读