FlinkFlink学习指南玩转大数据

Flink 使用之 TypeInformation

2022-01-28  本文已影响0人  AlienPaul

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

背景

本篇从近期遇到的Stream Java API 问题,引出TypeInformation的使用。

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(XXXTest.java:77)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

这个问题的原因是由于JVM运行时候会擦除类型(泛型类型),Flink无法准确的获取到数据类型。因此,在使用Java API的时候,我们需要手工指定类型。使用Scala的时候无需指定。

需要使用SingleOutputStreamOperatorreturns方法来指定算子的返回数据类型。

TypeInformation

TypeInformation是Flink类型系统的核心,是生成序列化/反序列化工具和Comparator的工具类。同时它还是连接schema和编程语言内部类型系统的桥梁。
我们可以使用of方法创建TypeInformation

TypeHint

由于泛型类型在运行时会被JVM擦除,所以说我们无法使用TypeInformation.of(XXX.class)方式指定带有泛型的类型。

为了可以支持泛型类型,Flink引入了TypeHint。例如我们需要获取Tuple2<String, Long>的类型信息,可以使用如下方式:

TypeInformation<Tuple2<String, Long>> info = TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){});
// 或者
TypeInformation<Tuple2<String, Long>> info = new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo();

Types

在Flink中经常使用的类型已经预定义在了Types中。它们的serializer/deserializer和Comparator已经定义好了。

Tuple类型既可以使用TypeHint指定又可以使用Types指定。例如Tuple2<String, Integer>类型我们可以使用如下

TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})

Types.TUPLE(Types.STRING, Types.INT)

方式定义。

上一篇下一篇

猜你喜欢

热点阅读