flink学习之十-window&ProcessingT
2019-03-19 本文已影响33人
AlanKim
这里先使用Processing Time,使用window来处理,看下demo
package myflink.job;
import com.alibaba.fastjson.JSON;
import myflink.model.UrlInfo;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import java.util.Date;
import java.util.Properties;
public class WindowTest {
public static void main(String[] args) throws Exception {
// 从kafka中获取数据
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "metric-group");
properties.put("auto.offset.reset", "latest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource(
new FlinkKafkaConsumer010<String>(
"testjin",// topic
new SimpleStringSchema(),
properties
)
).setParallelism(1)
// map操作,转换,从一个数据流转换成另一个数据流,这里是从string-->UrlInfo
.map(string -> {
UrlInfo urlInfo = JSON.parseObject(string, UrlInfo.class);
urlInfo.setDomain(urlInfo.generateDomain());
return urlInfo;
});
// 根据domain做keyby
KeyedStream<UrlInfo, String> keyedStream = dataStreamSource.keyBy(new KeySelector<UrlInfo, String>() {
@Override
public String getKey(UrlInfo urlInfo) throws Exception {
return urlInfo.getDomain();
}
});
// 设置时间类型为Processing Time
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 使用timeWindow
SingleOutputStreamOperator<UrlInfo> windowReduceStream = keyedStream.timeWindow(Time.seconds(30))
.reduce((ReduceFunction<UrlInfo>) (t1, t2) -> {
UrlInfo urlInfo = new UrlInfo();
// domain都是同一个partition,所以都是同一个
urlInfo.setDomain(t1.getDomain());
urlInfo.setUrl(urlInfo.getDomain() + "/reduce/" + DateFormatUtils.format(new Date(),"yyyy-MM-dd'T'HH:mm:ss"));
urlInfo.setHash(DigestUtils.md5Hex(urlInfo.getUrl()));
urlInfo.setCount(t1.getCount() + 1);// 在reduce中做累加计数
return urlInfo;
}).returns(UrlInfo.class);
windowReduceStream.addSink(new PrintSinkFunction<>());
env.execute("execute window reduce info");
}
}
可以看到,这里使用window,在window内,每隔30秒做一次reduce,统计窗口内总共的数据个数。
由于用了window+reduce,这里30秒只有一个结果出来。
运行后,看下结果:
2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:35:56, hash=e7b48416a083727b703df80008dfe4e8, domain=so.com, count=16)
2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:35:59, hash=e478c32f727bd95507a409d6c6b08146, domain=baidu.com, count=6)
2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:36:26, hash=b22e6462ab7f2a263eb7934fa0fe110f, domain=baidu.com, count=3)
2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:36:29, hash=7da591487d9c624ae7209b7c2028eec0, domain=so.com, count=5)
2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:36:59, hash=f2a7487a54a4fb193d5acbac00a0d539, domain=so.com, count=5)
2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:36:53, hash=ce326552180fe4e1465a90ac7baeb380, domain=baidu.com, count=3)