flink returns 设置返回类型
Flink 支持 lambda表达式,但是用到泛型时候,需要明确声明类型。
public class FlinkFirstDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements("hello everyone","how are you")
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) {
for (String s : value.split(" ")) {
out.collect(s);
}
}
})
.print();
System.out.println("hello");
}
}
输出结果
hello
everyone
how
are
you
hello
通过idea的提示 将匿名每部类,用lambda表达式替换

代码如下
public class FlinkFirstDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements("hello everyone","how are you")
.flatMap((FlatMapFunction<String, String>) (value, out) -> {
for (String s : value.split(" ")) {
out.collect(s);
}
})
.print();
System.out.println("hello");
}
}
运行报错
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkFirstDemo.java:22)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at com.river.FlinkFirstDemo.main(FlinkFirstDemo.java:27)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
推断不出来 返回的集合是string的泛型。
只能通过 .returns(Types.STRING) 明确告诉它返回类型。
public class FlinkFirstDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements("hello everyone","how are you")
.flatMap((FlatMapFunction<String, String>) (value, out) -> {
for (String s : value.split(" ")) {
out.collect(s);
}
})
.returns(Types.STRING)
.print();
System.out.println("hello");
}
}
这样有点不爽哦。
官网也给出了解释。Flink 支持 lambda表达式,但是用到泛型时候,需要明确声明类型。
Java 8 introduced several new language features designed for faster and clearer coding. With the most important feature, the so-called “Lambda Expressions”, it opened the door to functional programming. Lambda expressions allow for implementing and passing functions in a straightforward way without having to declare additional (anonymous) classes.
Attention Flink supports the usage of lambda expressions for all operators of the Java API, however, whenever a lambda expression uses Java generics you need to declare type information explicitly.

参考地址
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/java_lambdas.html