Flink源码解析玩转大数据大数据&云计算

写给大忙人的Flink的Data Types

2020-03-16  本文已影响0人  shengjk1

一.Flink 中 Data Type 组成

二、Flink 是如何处理 Data Type 的
首先Flink会根据自身的序列化器进行序列化,如果不行,则默认回退到 Kryo 序列化器进行序列化。

可能碰到的问题,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.registerType(KuduTableDesc.class);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(SomeType.class);
        
DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(new TypeHint<SomeType.class});
TypeInformation<String> info = TypeInformation.of(String.class);

TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});

三、常见的 returns 的使用

.returns(Types.TUPLE(Types.INT,Types.INT))
.returns(Types.STRING)
.returns(TypeInformation.of(String.class))
.returns(new TypeHint<Tuple2<String, String>>(){})
.returns(TypeInformation.of(new TypeHint<Tuple2<ConsumerRecord, String>>() {}))
.returns(SomeType.class)
上一篇下一篇

猜你喜欢

热点阅读