大数据,机器学习,人工智能大数据玩转大数据

Flink继续实践:从日志清洗到实时统计内容PV等多个指标

2019-08-21  本文已影响340人  LittleMagic

前言

最近正在将一些原本用Spark Streaming实现的流计算任务迁移到Flink,最简单也是最有代表性的就是实时点击量(PV)统计。除了PV之外,我们还希望同时将内容的回复、点赞、收藏指标也一并统计出来,用来后续确定内容的热度。这个作业会涉及到与消息队列的对接、常用算子的使用、事件时间、窗口、水印、状态等几乎所有Flink应用中的基础内容,所以本文来记录一下过程,使用Flink版本为1.8.1。

日志清洗

数据来源是Nginx日志,原始格式如下:

log_format  main  '$remote_addr $hostname   [$time_local]   "$request"  $status $request_length $body_bytes_sent    "$http_referer" "$http_user_agent"  "$http_x_forwarded_for" "$http_host"    "$request_time" "$upstream_response_time"   "$upstream_status"  "$request_body"';

日志会先预处理成JSON串,并打进RocketMQ,所以RocketMQ就是Flink程序的Source。清洗完毕之后的日志会Sink到Kafka,方便其他业务复用。

下面创建日志清洗Flink程序的StreamExecutionEnvironment。在这个阶段我们暂时还不关心事件时间,只需要将感兴趣的日志过滤出来就行,所以时间特征选用处理时间,顺便设定检查点相关的参数。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.enableCheckpointing(60000);
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

然后接入RocketMQ Source(预先写好Consumer参数,即consumerProps属性),使用filter()算子过滤掉不合法的数据,再用flatMap()算子将需要的日志转换成统一的四元组格式,即(用户ID, 行为对象ID, 时间戳, 行为类别)。Sink到Kafka时仍然是JSON字符串,简单方便。代码主体如下。

    env
      .addSource(new RocketMQSource<>(new StringDeserializationSchema(), consumerProps))
      .name("rocketmq-source-log-forward-forumapi")
      .filter(str -> str.contains("token=") && str.charAt(0) == '{')
      .flatMap((String str, Collector<String> out) -> {
        JSONObject record = JSON.parseObject(str);
        String url = record.getString("url").toLowerCase();
        long userId = URLHelper.getUserIdFromUrl(url);
        if (userId < 0) {
          return;
        }

        url = URLHelper.getAddress(url);
        LocalDateTime dateTime = LocalDateTime.parse(record.getString("time"), DATE_TIME_FORMATTER);
        long timestamp = dateTime.toInstant(UTC8_ZONE_OFFSET).getEpochSecond();
        Map<String, String> postData = URLHelper.getParamMap(record.getString("post_data").toLowerCase());

        String action = "", itemId = "";
        switch (url) {
          case "/group/topic/get":
            if (postData.getOrDefault("last_id", "0").equals("0") &&
              postData.getOrDefault("only_owner", "0").equals("0")) {
              action = ActionType.FORUM_TOPIC_VIEW;
              itemId = postData.getOrDefault("topic_id", "");
            }
            break;
          case "/group/topic/reply":
            action = ActionType.FORUM_TOPIC_REPLY;
            itemId = postData.getOrDefault("topic_id", "");
            break;
          // 其他情况略去......
          default: break;
        }

        if (!action.equals("") && !itemId.equals("")) {
          JSONObject result = new JSONObject();
          result.put("userId", userId);
          result.put("itemId", itemId);
          result.put("timestamp", timestamp);
          result.put("action", action);
          out.collect(result.toJSONString());
        }
      })
      .returns(TypeInformation.of(String.class))
      .addSink(new FlinkKafkaProducer011<>(
        "p-kafka-01:9092,p-kafka-02:9092,p-kafka-03:9092,p-kafka-04:9092,p-kafka-05:9092",
        "log_forward_user_forum_behavior",
        new SimpleStringSchema()
      ))
      .name("kafka-sink-log-forward-user-forum-behavior");

注意在Flink程序中使用Java Lambda表达式时,由于类型擦除,程序是无法推断出泛型类型的,所以在上述代码的flatMap()算子之后,必须要显式调用returns()方法提供type hint,这点在官方文档中已经有提及。另外,Kafka Sink的topic要预先创建,为了简化问题,创建时指定分区数为1。

看一下数据进到Kafka了没,然后下一步。

转化为POJO与设定水印

下面开始写统计内容PV等指标的Flink程序。为了方便后续操作,先定义用户行为的POJO类。根据这篇文章中给出的建议,可以用继承TupleX类的方式达到可读性与易用性的平衡(毕竟是内置类型,还不用操心序列化的事情)。代码如下。

public class UserBehavior extends Tuple4<Long, String, Long, String> {
  private static final long serialVersionUID = -3144189553355270382L;

  public UserBehavior() {}

  public UserBehavior(long userId, String itemId, long timestamp, String action) {
    this.f0 = userId;
    this.f1 = itemId;
    this.f2 = timestamp;
    this.f3 = action;
  }

  public long getUserId() { return this.f0; }

  public void setUserId(long userId) { this.f0 = userId; }

  public String getItemId() { return this.f1; }

  public void setItemId(String itemId) { this.f1 = itemId; }

  public long getTimestamp() { return this.f2; }

  public void setTimestamp(long timestamp) { this.f2 = timestamp; }

  public String getAction() { return this.f3; }

  public void setAction(String action) { this.f3 = action; }
}

这次就需要用事件时间作为时间特征了。解析一遍新的JSON,将数据封装成上述UserBehavior实例,没什么好说的。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    // 检查点配置略。但后面要用到状态,所以状态后端必须预先配置,在flink-conf.yaml或者这里均可
    env.setStateBackend(new MemoryStateBackend(true));

    DataStream<UserBehavior> forumBehaviorStream = env
      .addSource(new FlinkKafkaConsumer011<>(
        "log_forward_user_forum_behavior",
        new SimpleStringSchema(),
        consumerProps
      ))
      .name("kafka-source-log-forward-forum")
      .map(str -> {
        JSONObject record = JSON.parseObject(str);

        return new UserBehavior(
          record.getLong("userId"),
          record.getString("itemId"),
          record.getLong("timestamp"),
          record.getString("action")
        );
      })
      .returns(TypeInformation.of(UserBehavior.class))

然后就得在业务数据里抽取时间戳作为水印。在上述DataStream上调用assignTimestampsAndWatermarks()方法,它接受的参数类型有AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks两种,分别对应周期性水印和打点(特定事件触发)水印。关于水印的细节之后再说,把TimestampAssigner的继承关系贴一下得了。

如果数据不会乱序(时间单调递增),就可以用简单的AscendingTimestampExtractor。但我们的日志数据可能会乱序,所以用BoundedOutOfOrdernessTimestampExtractor来抽取时间戳与打水印。注意构造它时,需要传入maxOutOfOrderness参数,表示最大能容忍的乱序区间大小。也就是说它实际发射的水印为当前程序看见的最大事件时间减去maxOutOfOrderness,这里给了1分钟。

    DataStream<UserBehavior> forumBehaviorTimedStream = forumBehaviorStream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor<UserBehavior>(Time.minutes(1)) {
        @Override
        public long extractTimestamp(UserBehavior userBehavior) {
          return userBehavior.getTimestamp() * 1000;
        }
      }
    );

开窗统计

先根据itemId生成KeyedStream。这种业务没有必要用滑动窗口,固定窗口就OK了。如果要用滑动窗口,就把TumblingEventTimeWindows换成SlidingEventTimeWindows。

    DataStream<TopicActionWindowStat> topicStatStream = forumBehaviorTimedStream
      .keyBy(1)
      .window(TumblingEventTimeWindows.of(Time.minutes(5)))
      .aggregate(new CountAggregateFunction(), new ResultWindowFunction());

我们用aggregate()算子进行聚合。更加通用的apply()算子实际上也可以,但是apply()算子会将窗口中所有数据都缓存下来,等到窗口结束再一起计算,如果数据量大的话,压力也会比较大。如果我们自己编写聚合函数(AggregateFunction),再利用aggregate()算子进行预聚合,可以减少内存中维护的数据量。这与编写Spark程序时尽量采用map side预聚合的算子(aggregateByKey、reduceByKey)是异曲同工。

下面就来编写一个聚合函数,它对一篇帖子的阅读、回复、收藏和点赞量进行累加。其中TopicActionAcc是一个特别简单的POJO,看官读源码就能知道怎么实现,因此不再贴了。

  public static class CountAggregateFunction
    implements AggregateFunction<UserBehavior, TopicActionAcc, TopicActionAcc> {
    private static final long serialVersionUID = 8926737679356974335L;

    @Override
    public TopicActionAcc createAccumulator() {
      return new TopicActionAcc();
    }

    @Override
    public TopicActionAcc add(UserBehavior value, TopicActionAcc accumulator) {
      switch (value.getAction()) {
        case ActionType.FORUM_TOPIC_VIEW: accumulator.addRead(1); break;
        case ActionType.FORUM_TOPIC_REPLY: accumulator.addReply(1); break;
        case ActionType.FORUM_TOPIC_FAVOR: accumulator.addFavor(1); break;
        case ActionType.FORUM_TOPIC_LIKE: accumulator.addLike(1); break;
        default: break;
      }
      return accumulator;
    }

    @Override
    public TopicActionAcc getResult(TopicActionAcc accumulator) {
      return accumulator;
    }

    @Override
    public TopicActionAcc merge(TopicActionAcc a, TopicActionAcc b) {
      a.addRead(b.getRead());
      a.addReply(b.getReply());
      a.addFavor(b.getFavor());
      a.addLike(b.getLike());
      return a;
    }
  }

aggregate()算子的第二个参数是做什么的呢?它可以将聚合函数输出的结果再进行处理,称作窗口函数(WindowFunction)。由于它对窗口本身是有感知的,所以我们可以方便地获取到窗口的开始与结束时间(左闭右开区间)。代码如下。

  public static class ResultWindowFunction
    implements WindowFunction<TopicActionAcc, TopicActionWindowStat, Tuple, TimeWindow> {
    private static final long serialVersionUID = 3431146528087070967L;

    @Override
    public void apply(
      Tuple key,
      TimeWindow window,
      Iterable<TopicActionAcc> input,
      Collector<TopicActionWindowStat> out
    ) throws Exception {
      String itemId = ((Tuple1<String>) key).f0;
      TopicActionAcc acc = input.iterator().next();

      out.collect(new TopicActionWindowStat(
        itemId,
        window.getEnd(),
        acc.getRead(),
        acc.getReply(),
        acc.getFavor(),
        acc.getLike()
      ));
    }
  }

这里的TopicActionWindowStat POJO就封装了一个窗口内对一篇帖子的统计信息,包括ID、窗口结束时间戳、阅读、回复、收藏和点赞量。

使用状态输出结果

由于数据有可能是乱序的,所以我们必须保证一个窗口内的所有数据都收集齐全之后才能输出,这就需要用到状态。先根据上面的窗口结束时间戳进行分组,然后编写ProcessFunction来处理它。

    topicStatStream
      .keyBy(1)
      .process(new OutputProcessFunction());

    env.execute();

ProcessFunction是Flink中的低级(亦即自由度较大)Function,可以将它近似地理解为Spark中的transform()算子。它能够提供基于TimerService的定时器功能,只要注册一个事件时间戳为windowEndTimestamp + 1的定时器(多次注册则只有第一次有效),在收到该时间戳的水印时,就表示该窗口内的所有数据都已到达,可以输出了。

维护已经到达的数据则需要使用状态,以保证在程序崩溃时不丢失数据。回忆一下基础知识:Flink中的状态分为Keyed State(KeyedStream级别的状态)和Operator State(算子级别的状态)。我们在这里使用Keyed State中的ListState,即基于列表的多个状态。其他常用的状态还有单值状态ValueState、映射状态MapState、Reduce状态ReduceState等。

说了这么多,还是看代码,很简单。

  public static class OutputProcessFunction
    extends KeyedProcessFunction<Tuple, TopicActionWindowStat, String> {
    private ListState<TopicActionWindowStat> state;

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      ListStateDescriptor<TopicActionWindowStat> stateDescriptor = new ListStateDescriptor<>(
        "topic-action-window-stat-state",
        TopicActionWindowStat.class
      );
      state = getRuntimeContext().getListState(stateDescriptor);
    }

    @Override
    public void processElement(TopicActionWindowStat value, Context ctx, Collector<String> out) throws Exception {
      state.add(value);
      ctx.timerService().registerEventTimeTimer(value.getWindowEndTimestamp() + 1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
      for (TopicActionWindowStat stat : state.get()) {
        System.out.println(stat.toString());
        out.collect(stat.toString());
      }

      state.clear();
    }
  }

为了看起来直观,这里将结果直接打到了标准输出,实际应用中Sink到HBase或者Redis等进行持久化。如果要实现窗口TopN的话,排个序就行了。下面是部分输出的截图。

The End

洗洗睡了。民那晚安。

上一篇下一篇

猜你喜欢

热点阅读