flinkFlink

flink算子:union和connect

2021-01-29  本文已影响0人  王吉吉real

在合并数据流时,可以使用union和connect两种算子,两者的使用方式如下。

union

1、可以合并两个以上的数据流;
2、合并的各实时流数据类型必须相同;
3、合并的结果也是一个同类型数据流,两个DataStream合并结果为DataStream数据流,两个DataSet合并结果为DataSet流;

union方法代码如下,结果是创建了一个新的数据流

public final DataStream<T> union(DataStream<T>... streams) {
    List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();
    unionedTransforms.add(this.transformation);

    for (DataStream<T> newStream : streams) {
        if (!getType().equals(newStream.getType())) {   //判断数据类型是否一致
            throw new IllegalArgumentException("Cannot union streams of different types: " + getType() + " and " + newStream.getType());
        }
        unionedTransforms.add(newStream.getTransformation());
    }
    //构建新的数据流
    return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));//通过使用 UnionTransformation 将多个 StreamTransformation 合并起来
}

使用方法:

//数据流 1 和 2
final DataStream<Integer> stream1 = env.addSource(...);
final DataStream<Integer> stream2 = env.addSource(...);
//union
stream1.union(stream2)

connect

1、只能用于连接两个DataStream流,不能用于DataSet;
2、连接的两个数据流数据类型可以不同
3、连接的结果为一个ConnectedStreams流
4、连接后两个流可以使用不同的处理方法,两个流可以共享状态;
5、连接后可以使用CoMapFunction或CoFlatMapFunction等方法进行处理

如果连接的两个流是DataStream的话,那么连接后的数据流为 ConnectedStreams,也是新创建的:

public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
    return new ConnectedStreams<>(environment, this, dataStream);
}

如果连接的数据流是一个 BroadcastStream(广播数据流),那么连接后的数据流是一个 BroadcastConnectedStream。

public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
    return new BroadcastConnectedStream<>(
            environment, this, Preconditions.checkNotNull(broadcastStream), 
            broadcastStream.getBroadcastStateDescriptor());
}

具体使用:

//1、连接 DataStream
DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
DataStream<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connected = src1.connect(src2);

//2、连接 BroadcastStream
DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
final BroadcastStream<String> broadcast = srcTwo.broadcast(utterDescriptor);
BroadcastConnectedStream<Tuple2<Long, Long>, String> connect = src1.connect(broadcast);
上一篇 下一篇

猜你喜欢

热点阅读