Hive-Spark-Flink 大小表join

2022-08-21  本文已影响0人  Eqo

在大数据离线批处理中,需求【大表(事实表)与小表(维度表)】关联字段,进行分析

Hive 默认开启Map端Join

  1. 先加载小表数据 存储到Hash table文件中
    2.将文件中的数据 存放到分布式缓存中
    3.大表中的每个task从分布式缓存中拉取数据
    MapJoin只有maptask 没有reduceTask 没有shuffle 提高了性能

Spark Broadcast Join

spark.sql.autoBroadcastJoinThreshold 值为-1

Flink

1_广播变量Broadcast 小表数据广播广播变量
2_分布式缓存 小表数据分布式缓存

广播变量Broadcast

package cn.itcast.flink.batch;
//
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

/**测试  Flink  小表数据广播到TaskManager内存中,当TM的Slot槽运行subTask子任务时,获取广播的变量值进行处理
 * @author ado
 */
public class BatchBroadcastDemo {

    public static void main(String[] args) throws Exception {
        // 1. 执行环境-env
        // 将执行环境设置成批处理 
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 数据源-source
        //大表数据
        DataSource<Tuple3<Integer, String, Integer>> scoreDataSet = env.fromCollection(
                Arrays.asList(
                        Tuple3.of(1, "语文", 50),
                        Tuple3.of(1, "数学", 70),
                        Tuple3.of(1, "英语", 86),
                        Tuple3.of(2, "语文", 80),
                        Tuple3.of(2, "数学", 86),
                        Tuple3.of(2, "英语", 96),
                        Tuple3.of(3, "语文", 90),
                        Tuple3.of(3, "数学", 68),
                        Tuple3.of(3, "英语", 92)
                )
        );

        // 小表数据
        DataSource<Tuple2<Integer, String>> studentDataSeT = env.fromCollection(Arrays.asList(

                Tuple2.of(1, "张三"), Tuple2.of(2, "李四"), Tuple2.of(3, "王五")
        )); 

        // 3. 数据转换-transformation
        MapOperator<Tuple3<Integer, String, Integer>, Object> result = scoreDataSet.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Object>() {

            HashMap<Integer, String> supMap = new HashMap<>();


            // todo 2 在open方法中获取 上下文对象
            @Override
            public void open(Configuration parameters) throws Exception {


                //todo 3使用上下文对象获取 广播小表中的数据
                List<Tuple2<Integer, String>> list = getRuntimeContext().getBroadcastVariable("students");
                //todo 4将小表dataset0中的数据存放到集合当中 准备join key->要关联的字段 value->值
                for (Tuple2<Integer, String> value : list) {
                    supMap.put(value.f0, value.f1);

                }
            }

            @Override
            // 实现map算子 完成两个表的join
            public String map(Tuple3<Integer, String, Integer> value) throws Exception {
                        /*
                            value -> (1, "数学", 70)  ,  依据学生标号:1 到map集合中获取学生姓名
                         */
                //获取要join的字段 然后从集合中获取对应的值
                Integer stuId = value.f0;
                //使用 获取默认值
                String stuName = supMap.getOrDefault(stuId, "未知");
                return stuName + "," + value.f1 + "," + value.f2;
            }

            //todo step1 在大表数据的处理算子后面加with...(小表的名称, 小表的广播名称)
        }).withBroadcastSet(studentDataSeT, "students");
        result.print();


        // 4. 数据终端-sink

        // 5. 触发执行-execute
//        env.execute("dsadajb");

}  }
上一篇 下一篇

猜你喜欢

热点阅读