flink入门

flink学习之十-window&ProcessingT

2019-03-19  本文已影响33人  AlanKim

这里先使用Processing Time,使用window来处理,看下demo

package myflink.job;

import com.alibaba.fastjson.JSON;
import myflink.model.UrlInfo;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

import java.util.Date;
import java.util.Properties;

public class WindowTest {

    public static void main(String[] args) throws Exception {

        // 从kafka中获取数据
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "metric-group");
        properties.put("auto.offset.reset", "latest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource(
                new FlinkKafkaConsumer010<String>(
                        "testjin",// topic
                        new SimpleStringSchema(),
                        properties
                )
        ).setParallelism(1)
                // map操作,转换,从一个数据流转换成另一个数据流,这里是从string-->UrlInfo
                .map(string -> {
                    UrlInfo urlInfo = JSON.parseObject(string, UrlInfo.class);
                    urlInfo.setDomain(urlInfo.generateDomain());
                    return urlInfo;
                });

        // 根据domain做keyby
        KeyedStream<UrlInfo, String> keyedStream = dataStreamSource.keyBy(new KeySelector<UrlInfo, String>() {
            @Override
            public String getKey(UrlInfo urlInfo) throws Exception {
                return urlInfo.getDomain();
            }
        });

        // 设置时间类型为Processing Time
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 使用timeWindow
        SingleOutputStreamOperator<UrlInfo> windowReduceStream = keyedStream.timeWindow(Time.seconds(30))
        .reduce((ReduceFunction<UrlInfo>) (t1, t2) -> {
            UrlInfo urlInfo = new UrlInfo();

            // domain都是同一个partition,所以都是同一个
            urlInfo.setDomain(t1.getDomain());
            urlInfo.setUrl(urlInfo.getDomain() + "/reduce/" + DateFormatUtils.format(new Date(),"yyyy-MM-dd'T'HH:mm:ss"));
            urlInfo.setHash(DigestUtils.md5Hex(urlInfo.getUrl()));

            urlInfo.setCount(t1.getCount() + 1);// 在reduce中做累加计数

            return urlInfo;
        }).returns(UrlInfo.class);

        windowReduceStream.addSink(new PrintSinkFunction<>());

        env.execute("execute window reduce info");
    }
}

可以看到,这里使用window,在window内,每隔30秒做一次reduce,统计窗口内总共的数据个数。

由于用了window+reduce,这里30秒只有一个结果出来。

运行后,看下结果:

2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:35:56, hash=e7b48416a083727b703df80008dfe4e8, domain=so.com, count=16)
2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:35:59, hash=e478c32f727bd95507a409d6c6b08146, domain=baidu.com, count=6)

2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:36:26, hash=b22e6462ab7f2a263eb7934fa0fe110f, domain=baidu.com, count=3)
2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:36:29, hash=7da591487d9c624ae7209b7c2028eec0, domain=so.com, count=5)

2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:36:59, hash=f2a7487a54a4fb193d5acbac00a0d539, domain=so.com, count=5)
2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:36:53, hash=ce326552180fe4e1465a90ac7baeb380, domain=baidu.com, count=3)
上一篇下一篇

猜你喜欢

热点阅读