Tips | Flink 使用 union 代替 join、co

2020-10-04  本文已影响0人  大数据羊说

本系列每篇文章都比较短小,不定期更新,从一些实际的 case 出发抛砖引玉,提高小伙伴的姿♂势水平。本文介绍在满足原有需求、实现原有逻辑的场景下,在 Flink 中使用 union 代替 cogroup(或者join) ,简化任务逻辑,提升任务性能的方法,阅读时长大概一分钟,话不多说,直接进入正文!

需求场景分析

需求场景

需求诱诱诱来了。。。数据产品妹妹想要统计单个短视频粒度的点赞,播放,评论,分享,举报五类实时指标,并且汇总成 photo_id、1 分钟时间粒度的实时视频消费宽表(即宽表字段至少为:photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp)产出至实时大屏。

问题在于对同一个视频,五类视频消费行为的触发机制以及上报时间是不同,也就决定了对实时处理来说五类行为日志对应着五个不同的数据源。sql boy 们自然就想到了 join 操作将五类消费行为日志合并,可是实时 join(cogroup) 真的那么完美咩~,下文细谈。

source 输入以及特点

首先我们分析下需求中的 source 特点:

sink 输出以及特点

sink 特点如下:

source、sink 样例数据

source 数据:

photo_id timestamp user_id 说明
1 2020/10/3 11:30:33 3 播放
1 2020/10/3 11:30:33 4 播放
1 2020/10/3 11:30:33 5 播放
1 2020/10/3 11:30:33 4 点赞
2 2020/10/3 11:30:33 5 点赞
1 2020/10/3 11:30:33 5 评论

sink 数据:

photo_id timestamp play_cnt like_cnt comment_cnt
1 2020/10/3 11:30:00 3 1 1
2 2020/10/3 11:30:00 0 1 0

我们已经对数据源输入和输出有了完整的分析,那就瞧瞧有什么方案可以实现上述需求吧。

实现方案

我们先上 cogroup 方案的示例代码。

cogroup

cogroup 实现示例如下,示例代码直接使用了处理时间(也可替换为事件时间~),因此对数据源的时间戳做了简化(直接干掉):

public class Cogroup {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
        // Long -> photo_id 播放一次
        DataStream<Long> play = SourceFactory.getDataStream(xxx);
        // Long -> photo_id 点赞一次
        DataStream<Long> like = SourceFactory.getDataStream(xxx);
        // Long -> photo_id 评论一次
        DataStream<Long> comment = SourceFactory.getDataStream(xxx);
        // Long -> photo_id 分享一次
        DataStream<Long> share = SourceFactory.getDataStream(xxx);
        // Long -> photo_id 举报一次
        DataStream<Long> negative = SourceFactory.getDataStream(xxx);

        // Tuple3<Long, Long, Long> -> photo_id + play_cnt + like_cnt 播放和点赞的数据合并
        DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play
            .coGroup(like)
            .where(KeySelectorFactory.get(Function.identity()))
            .equalTo(KeySelectorFactory.get(Function.identity()))
            .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
            .apply(xxx1);

        // Tuple4<Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt 播放、点赞、评论的数据合并
        DataStream<Tuple4<Long, Long, Long, Long, Long>> playAndLikeAndComment = playAndLikeCnt
            .coGroup(comment)
            .where(KeySelectorFactory.get(playAndLikeModel -> playAndLikeModel.f0))
            .equalTo(KeySelectorFactory.get(Function.identity()))
            .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
            .apply(xxx2);

        // Tuple5<Long, Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt 播放、点赞、评论、分享的数据合并
        DataStream<Tuple5<Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = playAndLikeAndComment
            .coGroup(share)
            .where(KeySelectorFactory.get(playAndLikeAndCommentModel -> playAndLikeAndCommentModel.f0))
            .equalTo(KeySelectorFactory.get(Function.identity()))
            .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
            .apply(xxx2);

        // Tuple7<Long, Long, Long, Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp 播放、点赞、评论、分享、举报的数据合并
        // 同上~
        DataStream<Tuple7<Long, Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = ***;

        env.execute();
    }
}

粗暴一想,上面这样一搞不就结束了么,事情没那么简单,我们来做一个详细点的分析。

上述实现可能会存在的问题点

数据产品妹妹:🤩,小哥哥好棒,既然问题点都分析出来了,技术小哥哥就帮人家解决一下嘛~

头文字 ∩ 技术小哥哥:搞(欢迎关注mangodata gzh)。

头文字 ∩ 技术小哥哥:既然可能由于过多的窗口导致数据产出延迟,job 不稳定,那有没有什么方法减少窗口数量呢,思路转换一下。我们直接以整个 job 中只包含一个窗口算子操作为基点,逆推一下,则有以下数据链路。

逆推链路

1 - 5 为逆推的整条链路。

话不多说直接上 union 方案代码。

union

public class Union {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Tuple2<Long, String> -> photo_id + "PLAY"标签
        DataStream<Tuple2<Long, String>> play = SourceFactory.getDataStream(xxx);
        // Tuple2<Long, String> -> photo_id + "LIKE"标签
        DataStream<Tuple2<Long, String>> like = SourceFactory.getDataStream(xxx);
        // Tuple2<Long, String> -> photo_id + "COMMENT"标签
        DataStream<Tuple2<Long, String>> comment = SourceFactory.getDataStream(xxx);
        // Tuple2<Long, String> -> photo_id + "SHARE"标签
        DataStream<Tuple2<Long, String>> share = SourceFactory.getDataStream(xxx);
        // Tuple2<Long, String> -> photo_id + "NEGATIVE"标签
        DataStream<Tuple2<Long, String>> negative = SourceFactory.getDataStream(xxx);

        // Tuple5<Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + window_start_timestamp
        DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play
            .union(like)
            .union(comment)
            .union(share)
            .union(negative)
            .keyBy(KeySelectorFactory.get(i -> i.f0))
            .timeWindow(Time.seconds(60))
            .process(xxx);

        env.execute();
    }
}

可以发现,无论上游数据源怎样进行变化,上述 union 方案中始终可以保持只有一个窗口算子处理和计算数据,则可以解决之前列举的数据延迟以及 flink 任务算子过多的问题。

在数据源的 schema 相同(或者不同但经过处理之后可以 format 成相同格式)的情况下,或者处理逻辑相同的话,可以使用 union 进行逻辑简化。

总结

本文首先介绍了我们的需求场景,第二部分分析了使用 cogroup(案例代码)是如何解决此需求场景,再分析了此实现方案可能会存在一些问题,并引出了 union 解决方案的逆推和设计思路。
在第三部分针对此场景使用 union 代替 cogroup 进行了一定程度上的优化。如果针对此场景,大佬们有更好的优化方案的话,期待留言喔。

上一篇 下一篇

猜你喜欢

热点阅读