大数据

Flink数据的处理

2020-11-02  本文已影响0人  羋学僧

1.1 Flink之数据源

1.1.1 source简介

source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加一个source。 flink提供了大量的已经实现好的source方法,你也可以自定义source:

获取source的方式

扩展的connectors
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)

1.1.2 数据源之collection

StreamingSourceFromCollection.java

public class StreamingSourceFromCollection {
    public static void main(String[] args) throws Exception {
        //步骤一:获取环境变量
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //步骤二:模拟数据
        ArrayList<String> data = new ArrayList<String>();
        data.add("hadoop");
        data.add("spark");
        data.add("flink");
        //步骤三:获取数据源
        DataStreamSource<String> dataStream = env.fromCollection(data);
        //步骤四:transformation操作
        SingleOutputStreamOperator<String> addPreStream = dataStream.map(new MapFunction<String, String>() {

            public String map(String word) throws Exception {
                return "mi_" + word;
            }
        });
        //步骤五:对结果进行处理(打印)
        addPreStream.print().setParallelism(1);
        //步骤六:启动程序
        env.execute("StreamingSourceFromCollection");

    }
}

1.1.3 自定义单并行度数据源

MyNoParalleSource.java

/**
 *
 * 我们数据输出的数据类型
 *
 * 代表我们的这个数据源只能支持一个并行度(单并行度)
 */
public class MyNoParalleSource implements SourceFunction<Long> {
    private long number = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<Long> sct) throws Exception {
        while (isRunning){
            //往下游发送数据
            sct.collect(number);
            number++;
            //每秒生成一条数据
            Thread.sleep(1000);
        }

    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}

StreamingDemoWithMyNoPralalleSource.java

public class StreamingDemoWithMyNoPralalleSource {
    public static void main(String[] args) throws Exception {

        /**
         * 1. 获取程序入口
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());


        /**
         * 2 获取数据源
         */

        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);

        /**
         * 3 数据的处理
         */
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        }).setParallelism(2);

        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                //过滤出来偶数
                return number % 2 == 0;
            }
        }).setParallelism(2);

        filterDataStream.print().setParallelism(1);

        env.execute("StreamingDemoWithMyNoPralalleSource");
    }
}

RichParallelSourceFunction是支持设置多并行度的,关于RichParallelSourceFunctionRichSourceFunction的区别,前者支持用户设置多并行度,后者不支持通过setParallelism()方法设置并行度大于1,默认的并行度为1,否则会报如下错误:

bashException in thread "main" java.lang.IllegalArgumentException: The maximum parallelism of non parallel operator must be 1.

1.1.4 自定义多并行度数据源

MyParalleSource.java

/**
 * 我们的这个source是支持多并行度的
 */
public class MyParalleSource implements ParallelSourceFunction<Long> {
    private long number = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<Long> sct) throws Exception {
        while (isRunning){
            sct.collect(number);
            number++;
            //每秒生成一条数据
            Thread.sleep(1000);
        }

    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}

StreamingDemoWithMyPralalleSource.java

public class StreamingDemoWithMyPralalleSource {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<Long> numberStream = env.addSource(new MyParalleSource()).setParallelism(2);

        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;
            }
        });

        filterDataStream.print().setParallelism(1);
        env.execute("StreamingDemoWithMyNoPralalleSource");
    }
}

1.2 常见Transformation操作

1.2.1 map和filter

/**
 * 数据源:1 2 3 4 5.....源源不断过来
 * 通过map打印一下接收到数据
 * 通过filter过滤一下数据,我们只需要偶数
 */
public class MapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
        //flink FlatMap/map -> spark FlatMap/map  -> Scala flatmap/Map
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;//true
            }
        });

        filterDataStream.print().setParallelism(1);
        env.execute("StreamingDemoWithMyNoPralalleSource");
    }


}

1.2.3 union

/**
 * 合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的
 * union timeWindowAll
 */
public class unionDemo {
    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1

        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);

        //把text1和text2组装到一起
        DataStream<Long> text = text1.union(text2);

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("原始接收到数据:" + value);
                return value;
            }
        });
        //每2秒钟处理一次数据
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
        //打印结果
        sum.print().setParallelism(1);
        String jobName = unionDemo.class.getSimpleName();
        env.execute(jobName);
    }
}

1.2.4 connect,conMap和conFlatMap

/**
 * 和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
 */
public class ConnectionDemo {
    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据源
        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1

        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);


        SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "str_" + value;
            }
        });

        //union
        ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);



        SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
            //这个方法处理的是数据源 1
            @Override
            public Object map1(Long value) throws Exception {
                return value;
            }
            //这个方法处理的就是数据源 2
            @Override
            public Object map2(String value) throws Exception {
                return value;
            }
        });

        //打印结果
        result.print().setParallelism(1);
        String jobName = ConnectionDemo.class.getSimpleName();
        env.execute(jobName);
    }
}

1.2.5 Split和Select

/**
 *  根据规则把一个数据流切分为多个流
 应用场景:
 * 可能在实际工作中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在根据一定的规则,
 * 把一个数据流切分成多个数据流,这样每个数据流就可以使用不用的处理逻辑了
 */
public class SplitDemo {
    public static void main(String[] args) throws  Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1

        //对流进行切分,按照数据的奇偶性进行区分
        SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
            @Override
            public Iterable<String> select(Long value) {
                ArrayList<String> outPut = new ArrayList<>();
                if (value % 2 == 0) {
                    outPut.add("even");//偶数
                } else {
                    outPut.add("odd");//奇数
                }
                return outPut;
            }
        });

        //选择一个或者多个切分后的流
        DataStream<Long> evenStream = splitStream.select("even");

        DataStream<Long> oddStream = splitStream.select("odd");
        DataStream<Long> moreStream = splitStream.select("odd","even");

        //打印结果
        //打印偶数
        evenStream.print().setParallelism(1);
        //打印奇数
//        oddStream.print().setParallelism(1);
        //打印全部
//        moreStream.print().setParallelism(1);
        String jobName = SplitDemo.class.getSimpleName();
        env.execute(jobName);

    }
}

1.3常见sink操作

1.3.1 print() / printToErr()

打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

1.3.2 writeAsText()

/**
 * 数据源:1 2 3 4 5.....源源不断过来
 * 通过map打印一下接收到数据
 * 通过filter过滤一下数据,我们只需要偶数
 */
public class WriteTextDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;
            }
        });

        filterDataStream.writeAsText("D:\\flinkout\\value.txt").setParallelism(1);

        filterDataStream.print();

        env.execute("StreamingDemoWithMyNoPralalleSource");
    }

}


1.3.3 Flink提供的sink

Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
Google PubSub (source/sink)

1.3.4 自定义sink

/**
 * 把数据写入redis
 */
public class SinkForRedisDemo {
    public static void main(String[] args) throws  Exception {
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //数据源
        DataStreamSource<String> text = env.socketTextStream("bigdata02", 8888, "\n");
        //lpsuh l_words word
        //对数据进行组装,把string转化为tuple2<String,String>
        DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                //k v
                return new Tuple2<>("f", value);
            }
        });
//        //创建redis的配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("bigdata04").setPort(6379).setPassword("bigdata04").build();
//
//        //创建redissink
        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());



        l_wordsData.addSink(redisSink);


        env.execute("StreamingDemoToRedis");

    }

    /**
     * 把数据插入到redis到逻辑
     */
    public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
        //表示从接收的数据中获取需要操作的redis key
        @Override
        public String getKeyFromData(Tuple2<String, String> data) {
            return data.f0;
        }
        //表示从接收的数据中获取需要操作的redis value
        @Override
        public String getValueFromData(Tuple2<String, String> data) {
            return data.f1;
        }

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.LPUSH);
        }
    }
}

1.4 【State】

1.4.1 state概述

Apache Flink® — Stateful Computations over Data Streams
回顾单词计数的例子

//实时统计单词出现次数
public class WordCount {
    public static void main(String[] args) throws Exception{
        //创建程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //数据的输入
        DataStreamSource<String> myDataStream = env.socketTextStream("bigdata02", 1234);
        //数据的处理
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = myDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    out.collect(new Tuple2<>(word, 1));
                    //out.collect(Tuple2.of(word,1));
                }

            }
        }).keyBy(0)
                .sum(1);
        //数据的输出
        result.print();
        //启动应用程序
        env.execute("WordCount");

    }
}

输入

hadoop,hadoop 
hadoop 
hive,hadoop 

输出

4> (hadoop,1) 
4> (hadoop,2) 
4> (hadoop,3) 
1> (hive,1) 
4> (hadoop,4)

我们会发现,单词出现的次数有累计的效果。如果没有状态的管理,是不会有累计的效果的,所以Flink 里面还有state的概念。

state:一般指一个具体的task/operator的状态。State可以被记录,在失败的情况下数据还可以恢复, Flink中有两种基本类型的State:Keyed State,Operator State,他们两种都可以以两种形式存在:原 始状态(raw state)和托管状态(managed state)
托管状态:由Flink框架管理的状态,我们通常使用的就是这种。
原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态 内容,对其内部数据结构一无所知。通常在DataStream上的状态推荐使用托管的状态,当实现一个用 户自定义的operator时,会使用到原始状态。但是我们工作中一般不常用,所以我们不考虑他。

1.4.2 State类型

Operator State(task级别的)

Keyed State(针对每一个key)

state理解

数据源是Kafka

1.4.3 Keyed State的案例演示

ValueState

public class CountWindowAverageWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {

    /**
     * 1.valueState 属于keyed state
     * 2.valueState里面只能存储一条数据
     * 
     * 思路:
     * long1:当前key出现的次数
     * long2:累加的value值
     * if(long1=3){
     * long2/long1 =avg
     * }
     */

    private ValueState<Tuple2<Long, Long>> countAndSum;

    @Override
    public void open(Configuration parameters) throws Exception {

        ValueStateDescriptor<Tuple2<Long, Long>> average = new ValueStateDescriptor<>(
                "average",
                Types.TUPLE(Types.LONG, Types.LONG)
        );

        countAndSum = getRuntimeContext().getState(average);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {

        Tuple2<Long, Long> currentState = countAndSum.value();
        if (currentState == null) {
            currentState = Tuple2.of(0L, 0L);
        }
        //统计key出现的次数
        currentState.f0 += 1;
        //统计value总值
        currentState.f1 += element.f1;
        countAndSum.update(currentState);

        if (currentState.f0 ==3){
            double avg =(double)currentState.f1/currentState.f0;
            out.collect(Tuple2.of(element.f0,avg));
            //清空里面的数据
            countAndSum.clear();
        }
    }
}

/**
 *  需求:当接收到的相同 key 的元素个数等于 3 个
 *  就计算这些元素的 value 的平均值。
 *  计算 keyed stream 中每 3 个元素的 value 的平均值
 *
 *  1,3
 *  1,7
 *
 *  1,5
 *
 *  1,5.0
 *
 *  2,4
 *
 *  2,2
 *  2,5
 *
 *  2,3.666
 *
 *  key,value
 *  1 long,5 doulbe
 *
 */
public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //数据源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 输出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithValueState()) //flatMap,map + state = 自定义函数的感觉
                .print();

        env.execute("TestStatefulApi");
    }
}

结果输出:

ListState

public class CountWindowAverageWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {

    /**
     * 1,3
     * 1,7
     * 1,5
     */
    private ListState<Tuple2<Long, Long>> elementsByKey;


    @Override
    public void open(Configuration parameters) throws Exception {

        ListStateDescriptor<Tuple2<Long, Long>> average = new ListStateDescriptor<>(
                "average",
                Types.TUPLE(Types.LONG, Types.LONG)
        );
        elementsByKey = getRuntimeContext().getListState(average);
    }


    @Override
    public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {
        Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();
        if (currentState ==null){
            elementsByKey.addAll(Collections.emptyList());
        }
        elementsByKey.add(element);

        ArrayList<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get());

        if (allElements.size() ==3){
            long count =0;
            long sum=0;
            for(Tuple2<Long,Long> ele:allElements){
                count++;
                sum +=ele.f1;
            }
            double avg =(double)sum/count;
            out.collect(Tuple2.of(element.f0,avg));

            elementsByKey.clear();
        }


    }
}
public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //数据源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 输出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithListState()) //flatMap,map + state = 自定义函数的感觉
                .print();

        env.execute("TestStatefulApi");
    }
}

结果输出:

MapState

/**
 *  MapState<K, V> :这个状态为每一个 key 保存一个 Map 集合
 *      put() 将对应的 key 的键值对放到状态中
 *      values() 拿到 MapState 中所有的 value
 *      clear() 清除状态
 */
public class CountWindowAverageWithMapState
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    // managed keyed state
    //1. MapState :key 是一个唯一的值,value 是接收到的相同的 key 对应的 value 的值

    //我们开发过程当中声明的state其实我们可以理解为就是一个辅助变量。


    //Map的数据类型:key相同 数据就覆盖了
    /**
     * 1,3
     * 1,5
     * 1,7
     *
     */

    private MapState<String, Long> mapState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        MapStateDescriptor<String, Long> descriptor =
                new MapStateDescriptor<String, Long>(
                        "average",  // 状态的名字
                        String.class, Long.class); // 状态存储的数据类型
        mapState = getRuntimeContext().getMapState(descriptor);
    }

    /**
     * 1,3
     * 1,5
     * 1,7
     *
     * dfsfsdafdsf,3
     * dfsfxxxfdsf,5
     * xxxx323123,7
     *
     *
     * @param element
     * @param out
     * @throws Exception
     */
    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Double>> out) throws Exception {

        mapState.put(UUID.randomUUID().toString(), element.f1);

        // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
        List<Long> allElements = Lists.newArrayList(mapState.values());

        if (allElements.size() == 3) {
            long count = 0;
            long sum = 0;
            for (Long ele : allElements) {
                count++;
                sum += ele;
            }
            double avg = (double) sum / count;
            //
            out.collect(Tuple2.of(element.f0, avg));

            // 清除状态
            mapState.clear();
        }
    }
}

public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //数据源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 输出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithMapState()) //flatMap,map + state = 自定义函数的感觉
                .print();

        env.execute("TestStatefulApi");
    }
}

输出结果:

ReducingState

/**
 *  ReducingState<T> :这个状态为每一个 key 保存一个聚合之后的值
 *      get() 获取状态值
 *      add()  更新状态值,将数据放到状态中
 *      clear() 清除状态
 */
public class SumFunction
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    //sum = 最终累加的结果的数据类型
    private ReducingState<Long> sumState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        ReducingStateDescriptor<Long> descriptor =
                new ReducingStateDescriptor<Long>(
                        "sum",  // 状态的名字
                        new ReduceFunction<Long>() { // 聚合函数
                            @Override
                            public Long reduce(Long value1, Long value2) throws Exception {
                                return value1 + value2;
                            }
                        }, Long.class); // 状态存储的数据类型
        sumState = getRuntimeContext().getReducingState(descriptor);
    }

    /**
     *
     * 3
     * 5
     * 7
     *
     * @param element
     * @param out
     * @throws Exception
     */
    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Long>> out) throws Exception {
        // 将数据放到状态中
        sumState.add(element.f1);

        out.collect(Tuple2.of(element.f0, sumState.get()));
    }
}

public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //数据源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 输出:
        dataStreamSource
                .keyBy(0)
                .flatMap(new SumFunction()) //累加
                .print();

        env.execute("TestStatefulApi");
    }
}

输出

AggregatingState

public class ContainsValueFunction
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
    /**
     * 1, contains:3 and 5
     */
    private AggregatingState<Long, String> totalStr;//辅助字段

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        AggregatingStateDescriptor<Long, String, String> descriptor =
                new AggregatingStateDescriptor<Long, String, String>(
                        "totalStr",  // 状态的名字

                        //SparkSQL 自定义聚合函数
                        new AggregateFunction<Long, String, String>() {
                            //初始化的操作,只运行一次哦
                            @Override
                            public String createAccumulator() {
                                return "Contains:";
                            }

                            @Override
                            public String add(Long value, String accumulator) {
                                if ("Contains:".equals(accumulator)) {
                                    return accumulator + value;
                                }
                                return accumulator + " and " + value;
                            }

                            @Override
                            public String merge(String a, String b) {
                                return a + " and " + b;
                            }

                            @Override
                            public String getResult(String accumulator) {
                                //contains:1
                                //contains: 1 and 3 and
                                return accumulator;
                            }
                        }, String.class); // 状态存储的数据类型
        totalStr = getRuntimeContext().getAggregatingState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, String>> out) throws Exception {
        totalStr.add(element.f1);

        out.collect(Tuple2.of(element.f0, totalStr.get()));
    }
}
public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //数据源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 输出:
        dataStreamSource
                .keyBy(0)
                .flatMap(new ContainsValueFunction()) //flatMap,map + state = 自定义函数的感觉
                .print();

        env.execute("TestStatefulApi");
    }
}

输出

1.5 State backend

1.5.1 概述

Flink支持的StateBackend:

MemoryStateBackend
FsStateBackend
RocksDBStateBackend

1.5.2 MemoryStateBackend

默认情况下,状态信息是存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到 JobManager 的堆内存中。

1.5.3 FSStateBackend

状态信息存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)

1.5.4 RocksDBStateBackend

状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中 checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)

1.5.5 StateBackend配置方式

(1)单任务调整

修改当前任务代码 env.setStateBackend(new FsStateBackend("hdfs://bigdata02:9000/flink/checkpoints")); 
或者new MemoryStateBackend() 
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】

第三方依赖

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>

(2)全局调整

修改flink-conf.yaml

state.backend: filesystem state.checkpoints.dir: hdfs://bigdata02:9000/flink/checkpoints 
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

1.6 checkpoint(容错)

1.6.1 checkpoint概述

生成快照

恢复快照

1.6.2 checkpoint配置

默认checkpoint功能是disabled的,想要使用的时候需要先启用,checkpoint开启之后, checkPointMode有两种,Exactly-once和At-least-once,默认的checkPointMode是Exactly-once, Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终 延迟为几毫秒)。

/**
 * state:
 * keyed
 * operator  -> checkpoint
 */
public class WordCount4 {
    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostname = parameterTool.get("hostname");
        int port = parameterTool.getInt("port");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //10s 15s
        //如果数据量比较大,建议5分钟左右checkpoint的一次。
        //阿里他们使用的时候 也是这样建议的。
        env.enableCheckpointing(10000);//10s 15s state

        FsStateBackend fsStateBackend = new FsStateBackend("hdfs://bigdata02:9000/flink_1/checkpoint");

         MemoryStateBackend memoryStateBackend = new MemoryStateBackend();

        env.setStateBackend(fsStateBackend);

        env.setStateBackend(new RocksDBStateBackend("hdfs://bigdata02:9000/flink_2/checkpoint"));

        //setCheckpointingMode---是否允许数据重复
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //setMinPauseBetweenCheckpoints  ---两个checkpoint之间间隔多久
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        //setCheckpointTimeout ---超时时间 
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        //enableExternalizedCheckpoints---cancel程序的时候保存checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 尝试重启的次数
                Time.of(10, TimeUnit.SECONDS) // 间隔
        ));

        DataStreamSource<String> dataStream = env.socketTextStream(hostname, port);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        }).keyBy(0)
                .sum(1);

        result.print();

        env.execute("WordCount check point....");
    }
}

1.7 恢复数据(容错)

1.7.1 重启策略概述(重试)

Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策 略,在没有定义具体重启策略时会使用该默认策略。 如果在工作提交时指定了一个重启策略,该策略会 覆盖集群的默认策略,默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。

常用的重启策略

1.7.2 重启策略

固定间隔 (Fixed delay)

第一种:全局配置 flink-conf.yaml 
restart-strategy: fixed-delay 
restart-strategy.fixed-delay.attempts: 3 
restart-strategy.fixed-delay.delay: 10 s

第二种:应用代码设置
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 尝试重启的次数
  Time.of(10, TimeUnit.SECONDS) // 间隔
 ));

失败率 (Failure rate)

第一种:全局配置 flink-conf.yaml
 restart-strategy: failure-rate
 restart-strategy.failure-rate.max-failures-per-interval: 3
 restart-strategy.failure-rate.failure-rate-interval: 5 min 
 restart-strategy.failure-rate.delay: 10 s 

第二种:应用代码设置 
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 一个时间段内的最大失败次数
  Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
  Time.of(10, TimeUnit.SECONDS) // 间隔 
));

无重启 (No restart)

第一种:全局配置 flink-conf.yaml
 restart-strategy: none 
第二种:应用代码设置 
env.setRestartStrategy(RestartStrategies.noRestart());

1.7.3 多checkpoint

默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink 程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint, 并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录 处理有问题,希望将整个状态还原到4小时之前Flink可以支持保留多个Checkpoint,需要在Flink的配置 文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:

state.checkpoints.num-retained: 20

这样设置以后就查看对应的Checkpoint在HDFS上存储的文件目录

hdfs dfs -ls hdfs://bigdata02:9000/flink/checkpoints 

如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现

上一篇下一篇

猜你喜欢

热点阅读