Flink

Flink实战双流join之interval Join

2021-03-14  本文已影响0人  bigdata张凯翔
Apache Flink.png

通俗易懂篇:

前面学习的Window Join必须要在一个Window中进行JOIN,那如果没有Window如何处理呢?

interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。也就是:流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且流B的元素的时间戳 ≤ 流A的元素时间戳 + 上界。

我们来看Flink官方的一张图。

image

我们看到,流A的每一个元素,都会和流B的一定时间范围的元素进行JOIN。
其中,上界和下界可以是负数,也可以是整数。Interval join目前只支持INNER JOIN。将连接后的元素传递给ProcessJoinFunction时,时间戳变为两个元素中最大的那个时间戳。

注意:
Interval Join只支持事件时间。

package com.istudy.work;

import com.istudy.bean.FactOrderItem;
import com.istudy.bean.Goods;
import com.istudy.bean.OrderItem;
import com.istudy.streamsource.GoodsSource;
import com.istudy.streamsource.OrderItemSource;
import com.istudy.watermark.GoodsWatermark;
import com.istudy.watermark.OrderItemWatermark;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;

/**
 * @projectname: HaiStream
 * @description:
 * @author: Mr.Zhang
 * @create: 2021-03-14 14:35
 **/
public class IntervalJoin {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 构建商品数据流
        SingleOutputStreamOperator<Goods> goodsDS = env.addSource(new GoodsSource(), TypeInformation.of(Goods.class))
                .assignTimestampsAndWatermarks(new GoodsWatermark() {
                });
        // 构建订单明细数据流
        SingleOutputStreamOperator<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class))
                .assignTimestampsAndWatermarks(new OrderItemWatermark());

        // 进行关联查询
        //todo 1、这里我们通过keyBy将两个流join到一起
        SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())
                //todo 2、interval join需要设置流A去关联哪个时间范围的流B中的元素。
                .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))
                //todo 此处,我设置的下界为-1、上界为0,
                .between(Time.seconds(-1), Time.seconds(0))
                //todo  且上界是一个开区间。表达的意思就是流A中某个元素的时间,对应上一秒的流B中的元素。
                .upperBoundExclusive()
                //todo process中将两个key一样的元素,关联在一起,并加载到一个新的FactOrderItem对象中
                .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {
                    @Override
                    public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {
                        FactOrderItem factOrderItem = new FactOrderItem();
                        factOrderItem.setGoodsId(right.getGoodsId());
                        factOrderItem.setGoodsName(right.getGoodsName());
                        factOrderItem.setCount(new BigDecimal(left.getCount()));
                        factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));

                        out.collect(factOrderItem);
                    }
                });

        factOrderItemDS.print();

        env.execute("Interval JOIN");
    }
}
image.png

运行结果:


image.png

深挖原理篇:

join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联。interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。

按照指定字段以及右流相对左流偏移的时间区间进行关联,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

image.png
interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。
示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。
clickRecordStream
  .keyBy(record -> record.getMerchandiseId())
  .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
  .between(Time.seconds(-30), Time.seconds(30))
  .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
      collector.collect(StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t'));
    }
  })
  .print().setParallelism(1);

由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。

interval join 的实现原理
以下是 KeyedStream.process(ProcessJoinFunction) 方法调用的重载方法的逻辑。

public <OUT> SingleOutputStreamOperator<OUT> process(
        ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
        TypeInformation<OUT> outputType) {
    Preconditions.checkNotNull(processJoinFunction);
    Preconditions.checkNotNull(outputType);
    final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
    final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
        new IntervalJoinOperator<>(
            lowerBound,
            upperBound,
            lowerBoundInclusive,
            upperBoundInclusive,
            left.getType().createSerializer(left.getExecutionConfig()),
            right.getType().createSerializer(right.getExecutionConfig()),
            cleanedUdf
        );
    return left
        .connect(right)
        .keyBy(keySelector1, keySelector2)
        .transform("Interval Join", outputType, operator);
}
上一篇下一篇

猜你喜欢

热点阅读