数据大盘-搜索词统计(KeyedProcessFunction)

2019-11-12  本文已影响0人  LZhan
0.需求

实时输出当天的搜索词排名,即实时呈现热点搜索词

1.数据源读取

对接Kafka数据源,将消息转成实体类(实体类属性主要是关键词和搜索时间)

public class KafkaUtil {

    public static FlinkKafkaConsumerBase<String> text(String topic) throws IOException {
        return text(topic, "kafka.properties");
    }

    /**
     * @param topic 主题名称
     * @param configPath Kafka属性配置文件路径
     */
    public static FlinkKafkaConsumerBase<String> text(String topic,String configPath) throws IOException {

        //1.加载Kafka属性
        Properties prop=new Properties();
        //Class.getClassLoader.getResourceAsStream 默认是从ClassPath根下获取,path不能以“/"开头
        InputStream in=KafkaUtil.class.getClassLoader().getResourceAsStream(configPath);
        prop.load(in);
        //2.构造FlinkKafkaConsumer
        FlinkKafkaConsumerBase<String> consumer=new FlinkKafkaConsumer011<>(topic,new SimpleStringSchema(),prop);
        //todo 可以进行消费者的相关配置
        // 本地debug不提交offset consumer.setCommitOffsetsOnCheckpoints(false);
        return consumer;
    }
}

env.addSource(KafkaUtil.text("topic"));

2.搜索词统计

在进行窗口内统计时,首先需要根据yyyy-MM-dd的维度对搜索词消息进行keyby操作,形成KeyedStream,进一步调用实现了抽象类KeyedProcessFunction的方法。

2.1 KeyedProcessFunction介绍

顾名思义,KeyedProcessFunction,是针对具有相同key的stream进行元素处理的方法。
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
三个泛型分别是key的类型,流输入类型,流输出类型。

<1> 方法public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
处理流中的每一个元素,即每有一个元素进来,就会执行一次processElement方法。
该方法可以通过参数Collector<O> out来产生0个或者多个元素;
也可以通过Context ctx来更新state或者设置定时器timers。
ctx通过调用timerService()可以注册定时器。

Context的属性方法:

public abstract class Context {

        /**
         * Timestamp of the element currently being processed or timestamp of a firing timer.
         *
         * <p>This might be {@code null}, for example if the time characteristic of your program
         * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
         */
        public abstract Long timestamp();

        /**
         * A {@link TimerService} for querying time and registering timers.
         */
        public abstract TimerService timerService();

        /**
         * Emits a record to the side output identified by the {@link OutputTag}.
         *
         * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
         * @param value The record to emit.
         */
        public abstract <X> void output(OutputTag<X> outputTag, X value);

        /**
         * Get key of the element being processed.
         */
        public abstract K getCurrentKey();
    }

对于timestamp()方法,需要注意的是注释上说明可能会返回为null,如果设置的时间类型是ProcessingTime。

<2>方法public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
当通过某个定时器timer通过TimerService设置后,会调用onTimer方法。

2.2 具体实现

因为需要实时统计当天的搜索词热度,所以在KeyedProcessFunction实现方法中,需要使用到状态。
private MapState<String, DashboardKeyword> keywordState;
Map的键就是搜索词,值就是输出实体类(两个属性分别是关键词和搜索次数)

@Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        StateTtlConfig retainOneDay = StateTtlUtil.retainOneDay();
        MapStateDescriptor<String, DashboardKeyword> keywordStateDescriptor = new MapStateDescriptor<>(
                "keyword",
                Types.STRING,
                Types.POJO(DashboardKeyword.class)
        );
        keywordStateDescriptor.enableTimeToLive(retainOneDay);

        this.keywordState = getRuntimeContext().getMapState(keywordStateDescriptor);
    }

KeyedProcessFunction继承了AbstractRichFunction,因此可以使用到RuntimeContext。也进一步可以使用到状态,在open方法里面定义State并且设置State的存活时间为1天。


@Override
    public void processElement(Search search, Context context, Collector<Tuple2<Integer, DashboardKeyword>> collector) throws Exception {
        Result result = DicAnalysis.parse(search.getKeyword());
        for (Term term: result.getTerms()) {
            String key = term.getName();
            if (key.trim().length() < 2) continue;

            DashboardKeyword value;
            if (this.keywordState.get(key) != null) {
                value = this.keywordState.get(key);
            } else {
                value = new DashboardKeyword();
                value.setKeyword(key);
                value.setFrequency(0L);
            }
            value.setFrequency(value.getFrequency() + 1L);
            this.keywordState.put(key, value);
        }

        long coalescedTime = ((System.currentTimeMillis() + 5000) / 5000) * 5000;
        context.timerService().registerProcessingTimeTimer(coalescedTime);
    }

processElement里面的代码就是很常见的套路,如果MapState中已经存在当前搜索词了,即获取对应的value,并将其频次增加;如果MapState中并没有当前的搜索词,则将该关键词及对应value添加到Map当中。
这里是给定时器添加5s的时长,这里的写法((System.currentTimeMillis() + 5000) / 5000) * 5000是计时器合并的目的,Flink对于每个键和时间戳都只会维护一个计时器(计时器太多会影响性能),需要通过降低计时器的精度来合并计时器,从而减少计时器的数量。
假设现在是15s,那么定时器为20s,利用上面的写法,16s,17s,18s,19s的定时器都是20s。


 @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Integer, DashboardKeyword>> collector) throws Exception {
        java.util.stream.Collector<DashboardKeyword, ?, List<DashboardKeyword>> top50Collector = Comparators.greatest(
                50,
                Comparator.comparingLong(DashboardKeyword::getFrequency)
        );
        List<DashboardKeyword> top50 = Streams.stream(this.keywordState.values()).collect(top50Collector);

        for (int rank = 0; rank < top50.size(); rank++) {
            collector.collect(Tuple2.of(rank, top50.get(rank)));
        }
    }

统计前50搜索词。

3.数据存储

存储到redis中,
注意flink1.7版本之后,官方没有redis sink,可以去http://bahir.apache.org/(flink以及spark扩展库),
粘贴源码实现redis sink。

上一篇 下一篇

猜你喜欢

热点阅读