Flink常用的DataSet和DataStream API
2021-09-03 本文已影响0人
一生逍遥一生
DataSet和DataStream的区别和联系
DataSet部分来源于文件、表或者Java集合;DataStream的Source部分则一般是消息中间件。
API介绍
@Data
@Getter
@Setter
@ToString
public class Item implements Serializable {
private String name;
private Integer id;
}
public class CustomDataSource implements SourceFunction<Item> {
private boolean isRunning = true;
@Override
public void run(SourceContext<Item> ctx) throws Exception {
while (isRunning) {
Item item = generateItem();
ctx.collect(item);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
private Item generateItem() {
int i = new Random().nextInt(100);
Item item = new Item();
item.setName("name " + i);
item.setId(i);
return item;
}
}
public class StreamingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
/**
* DataStreamSource<Item> text = env.addSource(new CustomDataSource()).setParallelism(1);
* DataStream<Item> item = text.map((MapFunction<Item, Item>) value -> value);
* item.print().setParallelism(1);
* String jobName = "user define streaming source";
* env.execute(jobName);
*/
// Map方式:Map接收一个元素作为输入,
/**
* DataStreamSource<Item> items = env.addSource(new CustomDataSource()).setParallelism(1);
* SingleOutputStreamOperator<Object> mapItems = items.map((MapFunction<Item, Object>) item -> item.getName());
* mapItems.print().setParallelism(1);
* String jobName = "user define streaming source";
* env.execute(jobName);
*/
//FlatMap:FlatMap接收一个元素,返回零到多个元素。
//Filter:过滤不需要的数据
//KeyedStream:根据某个或者某种属性进行分组,进行不同的处理
//KeyBy: 对数据进行分组,数据添加随机数,防止数据倾斜
//Aggregations:聚合函数。
//min与minBy的区别:min会返回限定字段的最小值,minBy会返回对应的元素
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));
DataStreamSource<Item> items = env.fromCollection(data);
items.keyBy(0).max(2).printToErr();
env.execute();
//Reduce会在每一个分组的KeyedStream上生效,按照用户自定义的聚合逻辑进行分组聚合。
}
}