Flink入门技术分享之三(英文讲义)

2020-07-17  本文已影响0人  LittleMagic

今天用半小时为小伙伴们简单分享了Flink Streaming中窗口的一些基础扩展用法(增量聚合、触发器和双流join),将Markdown版讲义贴在下面。


Introducing Apache Flink - Part 3

Extended Usage of DataStream Windowing


Section A - Revision

Time Characteristics

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // EventTime / IngestionTime

Event Time & Watermarking

DataStream<OrderDoneLogRecord> watermarkedStream = recordStream
  .assignTimestampsAndWatermarks(  // AssignerWith[Periodic / Punctuated]Watermarks
    // This provides a certain tolerance interval for out-of-ordering
    new BoundedOutOfOrdernessTimestampExtractor<OrderDoneLogRecord>(Time.seconds(10)) {
      @Override
      public long extractTimestamp(OrderDoneLogRecord element) {
        return element.getTs();
      }
    }
  );

Windowing Basics

KeyedStream<OrderDoneLogRecord, Tuple> siteKeyedStream = watermarkedStream
  .keyBy("siteId", "siteName");
WindowedStream<OrderDoneLogRecord, Tuple, TimeWindow> siteWindowedStream = siteKeyedStream
  .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)));

Section B - Window Aggregation

AggregateFunction

@Getter
@Setter
public class BuyerAndGmvAccumulator {
  private Set<Long> buyerIds;
  private long gmv;

  public BuyerAndGmvAccumulator() {
    buyerIds = new HashSet<>();
    gmv = 0;
  }

  public void addGmv(long gmv) { this.gmv += gmv; }

  public void addBuyerId(long buyerId) { this.buyerIds.add(buyerId); }

  public void addBuyerIds(Collection<Long> buyerIds) { this.buyerIds.addAll(buyerIds); }
}

@Getter
@Setter
@NoArgsConstructor
@ToString
public class BuyerAndGmvResult {
  private long siteId;
  private String siteName;
  private long buyerCount;
  private long gmv;
  private long windowStartTs;
  private long windowEndTs;
}
private static class BuyerAndGmvAggregateFunc
  implements AggregateFunction<OrderDoneLogRecord, BuyerAndGmvAccumulator, BuyerAndGmvResult> {
  @Override
  public BuyerAndGmvAccumulator createAccumulator() {
    return new BuyerAndGmvAccumulator();
  }

  @Override
  public BuyerAndGmvAccumulator add(OrderDoneLogRecord record, BuyerAndGmvAccumulator acc) {
    acc.addBuyerId(record.getUserId());
    acc.addGmv(record.getQuantity() * record.getMerchandisePrice());
    return acc;
  }

  @Override
  public BuyerAndGmvResult getResult(BuyerAndGmvAccumulator acc) {
    BuyerAndGmvResult result = new BuyerAndGmvResult();
    result.setBuyerCount(acc.getBuyerIds().size());
    result.setGmv(acc.getGmv());
    return result;
  }

  @Override
  public BuyerAndGmvAccumulator merge(BuyerAndGmvAccumulator acc1, BuyerAndGmvAccumulator acc2) {
    acc1.addBuyerIds(acc2.getBuyerIds());
    acc1.addGmv(acc2.getGmv());
    return acc1;
  }
}

WindowFunction

private static class BuyerAndGmvResultWindowFunc
  implements WindowFunction<BuyerAndGmvResult, BuyerAndGmvResult, Tuple, TimeWindow> {
  @Override
  public void apply(
    Tuple keys,
    TimeWindow window,
    Iterable<BuyerAndGmvResult> agg,
    Collector<BuyerAndGmvResult> out
  ) throws Exception {
    // Fetch the result produced by AggregateFunction above
    BuyerAndGmvResult result = agg.iterator().next();
    // Explicit conversions here
    result.setSiteId(((Tuple2<Long, String>) keys).f0);
    result.setSiteName(((Tuple2<Long, String>) keys).f1);
    // Get window borders
    result.setWindowStartTs(window.getStart() / 1000);
    result.setWindowEndTs(window.getEnd() / 1000);
    // Emit the 'true' result
    out.collect(result);
  }
}

Do Aggregation

DataStream<BuyerAndGmvResult> gmvResultStream = siteWindowedStream
  .aggregate(new BuyerAndGmvAggregateFunc(), new BuyerAndGmvResultWindowFunc());
private static class GmvTopProcessFunc
  extends KeyedProcessFunction<Tuple, BuyerAndGmvResult, String> {
  private final int topN;
  private PriorityQueue<BuyerAndGmvResult> minHeap;

  public GmvTopProcessFunc(int topN) {
    this.topN = topN;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    minHeap = new PriorityQueue<>(topN, Comparator.comparingLong(BuyerAndGmvResult::getGmv));
  }

  @Override
  public void close() throws Exception {
    minHeap.clear();
    super.close();
  }

  @Override
  public void processElement(BuyerAndGmvResult value, Context ctx, Collector<String> out) throws Exception {
    if (minHeap.size() < topN) {
      minHeap.offer(value);
    } else if (minHeap.peek().getGmv() >= value.getGmv()) {
      minHeap.poll();
      minHeap.offer(value);
    }
    ctx.timerService().registerEventTimeTimer(value.getWindowEndTs() + 1);
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    List<BuyerAndGmvResult> ranking = new ArrayList<>();
    for (int k = 0; k < topN && !minHeap.isEmpty(); k++) {
      ranking.add(minHeap.poll());
    }
    Collections.reverse(ranking);

    StringBuilder output = new StringBuilder();
    output.append("-----------------\n");
    for (BuyerAndGmvResult result : ranking) {
      output.append(result.toString() + "\n");
    }
    output.append("-----------------\n");
    out.collect(output.toString());
  }
}

Section C - Window Trigger

Revisit Window Life Cycle

Built-in Triggers

siteIdWindowedStream.trigger(DeltaTrigger.of(
  100.0,   // Order ID offset threshold of 100
  (oldPoint, newPoint) -> newPoint.getOrderId() - oldPoint.getOrderId(),
  TypeInformation.of(OrderDoneLogRecord.class).createSerializer(env.getConfig())
));

Customize Trigger

Section D - Window Joining

Inner Join

clickRecordStream
  .join(orderRecordStream)
  .where(record -> record.getMerchandiseId())    // key from left stream
  .equalTo(record -> record.getMerchandiseId())  // key from right stream
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {
      return StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t');
    }
  });

Interval Inner Join

clickRecordStream
  .keyBy(record -> record.getMerchandiseId())
  .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
  .between(Time.seconds(-5), Time.seconds(15))    // lower & upper bounds
  .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
      collector.collect(StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t'));
    }
  });

Left/Right Outer Join

clickRecordStream
  .coGroup(orderRecordStream)
  .where(record -> record.getMerchandiseId())    // key from left stream
  .equalTo(record -> record.getMerchandiseId())  // key from right stream
  .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  .apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {
    @Override
    public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception {
      for (AnalyticsAccessLogRecord accessRecord : accessRecords) {
        boolean isMatched = false;
        for (OrderDoneLogRecord orderRecord : orderRecords) {
          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));
          isMatched = true;
        }
        if (!isMatched) {
          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));
        }
      }
    }
  });

THE END

上一篇下一篇

猜你喜欢

热点阅读