
聊聊flink的Broadcast State

2018-12-26  本文已影响79人  go4it

本文主要研究一下flink的Broadcast State


    public void testBroadcastState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> originStream = env.addSource(new RandomWordSource());

        MapStateDescriptor<String, String> descriptor = new MapStateDescriptor("dynamicConfig", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        BroadcastStream<Tuple2<String,String>> configStream = env.addSource(new DynamicConfigSource()).broadcast(descriptor);

        BroadcastConnectedStream<String, Tuple2<String,String>> connectStream = originStream.connect(configStream);
        connectStream.process(new BroadcastProcessFunction<String, Tuple2<String,String>, Void>() {
            public void processElement(String value, ReadOnlyContext ctx, Collector<Void> out) throws Exception {
                ReadOnlyBroadcastState<String,String> config = ctx.getBroadcastState(descriptor);
                String configValue = config.get("demoConfigKey");
                //do some process base on the config
                LOGGER.info("process value:{},config:{}",value,configValue);

            public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<Void> out) throws Exception {
                LOGGER.info("receive config item:{}",value);
                //update state


public class DynamicConfigSource implements SourceFunction<Tuple2<String,String>> {

    private volatile boolean isRunning = true;

    public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
        long idx = 1;
        while (isRunning){
            ctx.collect(Tuple2.of("demoConfigKey","value" + idx));

    public void cancel() {
        isRunning = false;



public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {

    private static final long serialVersionUID = 1L;

     * Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keySerializer The type serializer for the keys in the state.
     * @param valueSerializer The type serializer for the values in the state.
    public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
        super(name, new MapSerializer<>(keySerializer, valueSerializer), null);

     * Create a new {@code MapStateDescriptor} with the given name and the given type information.
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keyTypeInfo The type information for the keys in the state.
     * @param valueTypeInfo The type information for the values in the state.
    public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {
        super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);

     * Create a new {@code MapStateDescriptor} with the given name and the given type information.
     * <p>If this constructor fails (because it is not possible to describe the type via a class),
     * consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keyClass The class of the type of keys in the state.
     * @param valueClass The class of the type of values in the state.
    public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
        super(name, new MapTypeInfo<>(keyClass, valueClass), null);

    public Type getType() {
        return Type.MAP;

     * Gets the serializer for the keys in the state.
     * @return The serializer for the keys in the state.
    public TypeSerializer<UK> getKeySerializer() {
        final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
        if (!(rawSerializer instanceof MapSerializer)) {
            throw new IllegalStateException("Unexpected serializer type.");

        return ((MapSerializer<UK, UV>) rawSerializer).getKeySerializer();

     * Gets the serializer for the values in the state.
     * @return The serializer for the values in the state.
    public TypeSerializer<UV> getValueSerializer() {
        final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
        if (!(rawSerializer instanceof MapSerializer)) {
            throw new IllegalStateException("Unexpected serializer type.");

        return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer();



     * Sets the partitioning of the {@link DataStream} so that the output elements
     * are broadcasted to every parallel instance of the next operation. In addition,
     * it implicitly as many {@link org.apache.flink.api.common.state.BroadcastState broadcast states}
     * as the specified descriptors which can be used to store the element of the stream.
     * @param broadcastStateDescriptors the descriptors of the broadcast states to create.
     * @return A {@link BroadcastStream} which can be used in the {@link #connect(BroadcastStream)} to
     * create a {@link BroadcastConnectedStream} for further processing of the elements.
    public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
        final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
        return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);

     * Internal function for setting the partitioner for the DataStream.
     * @param partitioner
     *            Partitioner to set.
     * @return The modified DataStream.
    protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
        return new DataStream<>(this.getExecutionEnvironment(), new PartitionTransformation<>(this.getTransformation(), partitioner));

     * Sets the partitioning of the {@link DataStream} so that the output elements
     * are broadcast to every parallel instance of the next operation.
     * @return The DataStream with broadcast partitioning set.
    public DataStream<T> broadcast() {
        return setConnectionType(new BroadcastPartitioner<T>());



     * Creates a new {@link ConnectedStreams} by connecting
     * {@link DataStream} outputs of (possible) different types with each other.
     * The DataStreams connected using this operator can be used with
     * CoFunctions to apply joint transformations.
     * @param dataStream
     *            The DataStream with which this stream will be connected.
     * @return The {@link ConnectedStreams}.
    public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
        return new ConnectedStreams<>(environment, this, dataStream);

     * Creates a new {@link BroadcastConnectedStream} by connecting the current
     * {@link DataStream} or {@link KeyedStream} with a {@link BroadcastStream}.
     * <p>The latter can be created using the {@link #broadcast(MapStateDescriptor[])} method.
     * <p>The resulting stream can be further processed using the {@code BroadcastConnectedStream.process(MyFunction)}
     * method, where {@code MyFunction} can be either a
     * {@link org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction KeyedBroadcastProcessFunction}
     * or a {@link org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction BroadcastProcessFunction}
     * depending on the current stream being a {@link KeyedStream} or not.
     * @param broadcastStream The broadcast stream with the broadcast state to be connected with this stream.
     * @return The {@link BroadcastConnectedStream}.
    public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
        return new BroadcastConnectedStream<>(



public class BroadcastConnectedStream<IN1, IN2> {

    private final StreamExecutionEnvironment environment;
    private final DataStream<IN1> inputStream1;
    private final BroadcastStream<IN2> inputStream2;
    private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;

    protected BroadcastConnectedStream(
            final StreamExecutionEnvironment env,
            final DataStream<IN1> input1,
            final BroadcastStream<IN2> input2,
            final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
        this.environment = requireNonNull(env);
        this.inputStream1 = requireNonNull(input1);
        this.inputStream2 = requireNonNull(input2);
        this.broadcastStateDescriptors = requireNonNull(broadcastStateDescriptors);

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return environment;

     * Returns the non-broadcast {@link DataStream}.
     * @return The stream which, by convention, is not broadcasted.
    public DataStream<IN1> getFirstInput() {
        return inputStream1;

     * Returns the {@link BroadcastStream}.
     * @return The stream which, by convention, is the broadcast one.
    public BroadcastStream<IN2> getSecondInput() {
        return inputStream2;

     * Gets the type of the first input.
     * @return The type of the first input
    public TypeInformation<IN1> getType1() {
        return inputStream1.getType();

     * Gets the type of the second input.
     * @return The type of the second input
    public TypeInformation<IN2> getType2() {
        return inputStream2.getType();

     * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
     * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
     * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
     * @param <KS> The type of the keys in the keyed stream.
     * @param <OUT> The type of the output elements.
     * @return The transformed {@link DataStream}.
    public <KS, OUT> SingleOutputStreamOperator<OUT> process(final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function) {

        TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(

        return process(function, outTypeInfo);

     * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
     * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
     * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
     * @param outTypeInfo The type of the output elements.
     * @param <KS> The type of the keys in the keyed stream.
     * @param <OUT> The type of the output elements.
     * @return The transformed {@link DataStream}.
    public <KS, OUT> SingleOutputStreamOperator<OUT> process(
            final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
            final TypeInformation<OUT> outTypeInfo) {

        Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
                "A KeyedBroadcastProcessFunction can only be used on a keyed stream.");

        TwoInputStreamOperator<IN1, IN2, OUT> operator =
                new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
        return transform("Co-Process-Broadcast-Keyed", outTypeInfo, operator);

     * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given
     * {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream.
     * @param function The {@link BroadcastProcessFunction} that is called for each element in the stream.
     * @param <OUT> The type of the output elements.
     * @return The transformed {@link DataStream}.
    public <OUT> SingleOutputStreamOperator<OUT> process(final BroadcastProcessFunction<IN1, IN2, OUT> function) {

        TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(

        return process(function, outTypeInfo);

     * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given
     * {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream.
     * @param function The {@link BroadcastProcessFunction} that is called for each element in the stream.
     * @param outTypeInfo The type of the output elements.
     * @param <OUT> The type of the output elements.
     * @return The transformed {@link DataStream}.
    public <OUT> SingleOutputStreamOperator<OUT> process(
            final BroadcastProcessFunction<IN1, IN2, OUT> function,
            final TypeInformation<OUT> outTypeInfo) {

        Preconditions.checkArgument(!(inputStream1 instanceof KeyedStream),
                "A BroadcastProcessFunction can only be used on a non-keyed stream.");

        TwoInputStreamOperator<IN1, IN2, OUT> operator =
                new CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);
        return transform("Co-Process-Broadcast", outTypeInfo, operator);

    private <OUT> SingleOutputStreamOperator<OUT> transform(
            final String functionName,
            final TypeInformation<OUT> outTypeInfo,
            final TwoInputStreamOperator<IN1, IN2, OUT> operator) {

        // read the output type of the input Transforms to coax out errors about MissingTypeInfo

        TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>(

        if (inputStream1 instanceof KeyedStream) {
            KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1;
            TypeInformation<?> keyType1 = keyedInput1.getKeyType();
            transform.setStateKeySelectors(keyedInput1.getKeySelector(), null);

        @SuppressWarnings({ "unchecked", "rawtypes" })
        SingleOutputStreamOperator<OUT> returnStream = new SingleOutputStreamOperator(environment, transform);


        return returnStream;

    protected <F> F clean(F f) {
        return getExecutionEnvironment().clean(f);




