产品

2022-05-22-Flink-46(五)

2022-05-22  本文已影响0人  冰菓_

1. 数据分区

随机和轮询分区,广播,全局分区
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class richRescale {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> source = env.addSource(new RichParallelSourceFunction<Integer>() {
            @Override
            public void run(SourceContext<Integer> ctx) throws Exception {
                for (int i = 1; i < 10; i++) {
                    if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                        System.out.println(getRuntimeContext().getIndexOfThisSubtask());
                        ctx.collect(i);
                    }
                }
            }

            @Override
            public void cancel() {

            }
        });

        source.setParallelism(2).rescale().print().setParallelism(4);
        source.setParallelism(2).rebalance().print().setParallelism(4);
        source.broadcast().print().setParallelism(4);
        source.global().print().setParallelism(4);


        env.execute();
    }
}
自定义重分区

2. Sink

写入文件系统
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.TimeUtils;

import java.util.concurrent.TimeUnit;

public class toFile {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小红", "0002", 200000L),
                new Event("小黄", "0003", 200000L)

        );
        StreamingFileSink<String> build = StreamingFileSink.<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMinutes(15))
                                .build()
                ).build();
        source.map(Event::toString).addSink(build);

        env.execute();
    }
}
写入Kafka

在实际的生产环境中,经常会有一些场景,需要将Flink处理后的数据快速地写入到一个分布式、高吞吐、高可用、可用保证Exactly Once的消息中间件中,供其他的应用消费处理后的数据。Kafka就是Flink最好的黄金搭档,Flink不但可以从Kafka中消费数据,还可以将处理后的数据写入到Kafka,并且吞吐量高、数据安全、可以保证Exactly Once等。
Flink可以和Kafka多个版本整合,比如0.11.x、1.x、2.x等,从Flink1.9开始,使用的是kafka 2.2的客户端,所以这里使用kafka的版本是2.2.2,并且使用最新的API。
下面的例子就是将数据写入到Kafka中,首先要定义一个类实现KafkaSerializationSchema接口,指定一个泛型,String代表要写入到Kafka的数据为String类型。该类的功能是指定写入到Kafka中数据的序列化Schema,需要重写serialize方法,将要写入的数据转成二进制数组,并封装到一个ProducerRecord中返回。

//自定义String类型数据Kafka的序列化Schema
public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {
    private String topic;
    private String charset;
    //构造方法传入要写入的topic和字符集,默认使用UTF-8
    public KafkaStringSerializationSchema(String topic) {
        this(topic, “UTF-8”);
    }
    public KafkaStringSerializationSchema(String topic, String charset) {
        this.topic = topic;
        this.charset = charset;
    }
    //调用该方法将数据进行序列化
    @Override
    public ProducerRecord<byte[], byte[]> serialize(
            String element, @Nullable Long timestamp) {
        //将数据转成bytes数组
        byte[] bytes = element.getBytes(Charset.forName(charset));
        //返回ProducerRecord
        return new ProducerRecord<>(topic, bytes);
    }
}

然后将Kafka相关的参数设置到Properties中,再new FlinkKafkaProducer,将要写入的topic名称、Kafka序列化Schema、Properties和写入到Kafka的Semantic语义作为FlinkKafkaProducer构造方法参数传入。最好调用addSink方法将FlinkKafkaProducer的引用传入到该方法中。虽然下面的代码指定了EXACTLY_ONCE语义,但是没有开启Checkpointing,是没法实现的。

DataStream<String> dataSteam = …
//写入Kafka的topic
String topic = “test”;
//设置Kafka相关参数
Properties properties = new Properties();
properties.setProperty(“bootstrap.servers”, “node-1:9092,node-2:9092,node-3:9092”);
//创建FlinkKafkaProducer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
        topic, //指定topic
        new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
        properties, //指定Kafka的相关参数
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
);
//添加KafkaSink
dataSteam.addSink(kafkaProducer);
写入Ridis
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.12</artifactId>
    <version>1.1-SNAPSHOT</version>
</dependency>

定义一个类(或者静态内部类)实现RedisMapper即可,需要指定一个泛型,这里是Tuple2<String, Integer>,即写入到Redis中的数据的类型,并实现三个方法。第一个方法是getCommandDescription方法,返回RedisCommandDescription实例,在该构造方法中可以指定写入到Redis的方法类型为HSET,和Redis的additionalKey即value为HASH类型外面key的值;第二个方法getKeyFromData是指定value为HASH类型对应key的值;第三个方法geVauleFromData是指定value为HASH类型对应value的值

public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        //写入Redis的方法,value使用HASH类型,并指定外面key的值得名称
        return new RedisCommandDescription(RedisCommand.HSET, “WORD_COUNT”);
    }
    @Override
    public String getKeyFromData(Tuple2<String, Integer> data) {
        return data.f0; //指定写入Redis的value里面key的值
    }
    @Override
    public String getValueFromData(Tuple2<String, Integer> data) {
        return data.f1.toString(); //指定写入value里面value的值
    }
}
DataStream<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);//设置
Redis的参数,如地址、端口号等
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(“localhost”).setPassword(“123456”).build();

//将数据写入Redis
result.addSink(new RedisSink<>(conf, new RedisWordCountMapper()));
写入ES
写入Mysql
写入自定义Sink

3.时间和窗口

时间语义
  1. 处理时间
  2. 事件时间
水位线

实际应用中,如果数据量很大,很多数据的当前时间戳戳就会是一样的,这样每来一条数据就提取时间戳,插入水位线,就显得非常浪费,所以策略是周期性的插入一条水位线

乱序流中周期性生成水位线

如果我们考虑到大量的数据同时到来的处理效率,我们同样可以周期性的生成水位线,这样的乱序数据就会导致计算误差,迟到的数据就会落到不属于它的窗口,前一个窗口丢失数据,后一个窗口统计了不属于他的窗口
为了让窗口能够正确收集迟到的数据,我们也可以等上一段时间,比如两秒,也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳(我们认为会延迟两秒,但是可能不值延迟两秒,导致窗口丢失数据的)

水位线的特性
  1. 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  2. 水位线的内容是一个时间戳,用来表示当前事件时间的进展
  3. 水位线是基于数据的时间戳生成的
  4. 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  5. 水位线可以通过设置延迟,来保证正确处理乱序数据
  6. 一个水位线Watermark(t),表示在当前流中事件时间已经到达了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t` <= t 的数据
水位线的生成策略
  1. 有序流的watermark生成
import DAY2.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class demo1 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
        env.getConfig().setAutoWatermarkInterval(100L);
        env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小红", "0002", 200000L),
                new Event("小黄", "0003", 200000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        })).print();

        env.execute();
    }
}
  1. 乱序流的watermark生成
import DAY2.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class demo2 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
        env.getConfig().setAutoWatermarkInterval(100L);
        env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小红", "0002", 200000L),
                new Event("小黄", "0003", 200000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMinutes(1L)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        })).print();

        env.execute();
    }
}
  1. Assigner with periodic waterm
  2. Assigner with punctuated watermark
水位线的传递

4. 窗口

窗口的基本概念

window 是一种切割无限数据为有限块进行处理的手段

窗口的分类
  1. 时间窗口,计数窗口
  2. 滚动窗口,滑动窗口,会话窗口(合并)
  3. 全局窗口(自定义触发器)
窗口API类型
  1. 非按键分区(Non-Keyed Windows)
  2. 按键分区窗口(Keyed Windows)
窗口函数
  1. 增量聚合函数
    归约函数/全窗口函数

import DAY2.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.Random;

public class reduceWindow {

    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默认是200L
        env.getConfig().setAutoWatermarkInterval(1000L);
        env.setParallelism(1);


        // 随机生成事件数据

        DataStreamSource<Event> addSource = env.addSource(new SourceFunction<Event>() {
            private Boolean flag = true;

            @Override
            public void run(SourceContext<Event> ctx) throws Exception {
                Random random = new Random();
                String[] uid = new String[]{"小", "中", "大"};
                String[] url = new String[]{"./click", "./show", "./slide"};
                while (flag) {
                    ctx.collect(new Event(uid[random.nextInt(uid.length)], url[random.nextInt(url.length)], System.currentTimeMillis()));
                    Thread.sleep(100L);
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }
        });

        // 乱序数据
        SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

        // map 和 keyby
        outputStreamOperator.map(data -> Tuple2.of(data.user,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG))
                .keyBy(data -> data.f0)
                // 滑动窗口
                //.window(SlidingEventTimeWindows.of(Time.milliseconds(1000L),Time.seconds(100L))
                // 滚动窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return Tuple2.of(value1.f0,value1.f1+ value2.f1);
                    }
                }).print();

        env.execute();

    }
}

  1. 聚合函数(aggregate)

import DAY2.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.Date;
import java.util.Random;

public class windowAggr_AVG {

    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默认是200L
        env.getConfig().setAutoWatermarkInterval(1000L);
        env.setParallelism(1);


        // 随机生成事件数据

        DataStreamSource<Event> addSource = env.addSource(new SourceFunction<Event>() {
            private Boolean flag = true;

            @Override
            public void run(SourceContext<Event> ctx) throws Exception {
                Random random = new Random();
                String[] uid = new String[]{"小", "中", "大"};
                String[] url = new String[]{"./click", "./show", "./slide"};
                while (flag) {
                    ctx.collect(new Event(uid[random.nextInt(uid.length)], url[random.nextInt(url.length)], System.currentTimeMillis()));
                    Thread.sleep(200L);
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }
        });

        // 乱序数据
        SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));


        outputStreamOperator.keyBy(data -> data.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .aggregate(new AggregateFunction<Event, Tuple3<Long,Integer,String>, Tuple2<String,Date>>() {
                    @Override
                    public Tuple3<Long, Integer, String> createAccumulator() {
                        //初始化
                        return Tuple3.of(0L,0,"");
                    }

                    @Override
                    public Tuple3<Long, Integer, String> add(Event value, Tuple3<Long, Integer, String> accumulator) {
                        //中间ACC计算
                        return Tuple3.of(accumulator.f0+value.timestamp , accumulator.f1+1,value.user);
                    }

                    @Override
                    public Tuple2<String, Date> getResult(Tuple3<Long, Integer, String> accumulator) {
                        return Tuple2.of(accumulator.f2, new Date(accumulator.f0 / accumulator.f1));
                    }
                    //用于会话窗口
                    @Override
                    public Tuple3<Long, Integer, String> merge(Tuple3<Long, Integer, String> a, Tuple3<Long, Integer, String> b) {
                        return Tuple3.of(a.f0+b.f0,a.f1+b.f1,b.f2);
                    }
                }).print();

        env.execute();


    }
}

  1. 全窗口函数(full window functions)
    与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算
    窗口函数
    处理窗口函数

import DAY2.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.HashSet;
import java.util.Random;

public class proessWindow {

    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默认是200L
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(1);


        // 随机生成事件数据

        DataStreamSource<Event> addSource = env.addSource(new SourceFunction<Event>() {
            private Boolean flag = true;

            @Override
            public void run(SourceContext<Event> ctx) throws Exception {
                Random random = new Random();
                String[] uid = new String[]{"小", "中", "大"};
                String[] url = new String[]{"./click", "./show", "./slide"};
                while (flag) {
                    ctx.collect(new Event(uid[random.nextInt(uid.length)], url[random.nextInt(url.length)], System.currentTimeMillis()));
                    Thread.sleep(1000L);
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }
        });
        addSource.print();
        // 乱序数据
        SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

        // map 和 keyby
        outputStreamOperator.map(data -> Tuple2.of(data.user,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG))
                .keyBy(data -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<Tuple2<String, Long>, String, Boolean, TimeWindow>() {
                    @Override
                    public void process(Boolean aBoolean, ProcessWindowFunction<Tuple2<String, Long>, String, Boolean, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                        HashSet<String> uidSst = new HashSet<>();
                        for (Tuple2<String, Long> element : elements) {
                             uidSst.add(element.f0);
                        }
                        long start = context.window().getStart();
                        long end = context.window().getEnd();
                        out.collect("窗口   " + uidSst.size() + "  " +new Timestamp(start) + "   " +new Timestamp(end));

                    }
                }).print();

        env.execute();
    }
}

设置延迟时间输出顺序的影响

  1. 两种窗口函数的结合
窗口其他API
延迟数据的处理
上一篇下一篇

猜你喜欢

热点阅读