笔记汇总

2021-04-06  本文已影响0人  风筝flying

Hive Join

  1. common join
    如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join.整个过程包含Map、Shuffle、Reduce阶段。
  1. map join
    在map 端进行join,其原理是broadcast join,即把小表作为一个完整的驱动表来进行join操作。通常情况下,要连接的各个表里面的数据会分布在不同的Map中进行处理。即同一个Key对应的Value可能存在不同的Map中。这样就必须等到 Reduce中去连接。要使MapJoin能够顺利进行,那就必须满足这样的条件:除了一份表的数据分布在不同的Map中外,其他连接的表的数据必须在每 个Map中有完整的拷贝。MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率也会高很多。

Spark Join

spark提供了三种join实现:hash join,sort merge join以及broadcast join。

  1. hash join:
    通过分区的形式将大批量的数据通过hash划分成n份较小的数据集进行并行计算。

2、sort join:
hash join对于实现大小表比较合适,但是两个表都非常大时,对内存计算造成很大的压力。

3、broadCast join:
Broadcast不会内存溢出,因为数据保存级别StoreageLevel是MEMORY_AND_DISK模式

Full Outer Join

Spark RDD

弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

Spark RDD缓存

Spark RDD 是惰性求值的,而有时候希望能多次使用同一个 RDD。如果简单地对 RDD 调用行动操作,Spark 每次都会重算 RDD 及它的依赖,这样就会带来太大的消耗。为了避免多次计算同一个 RDD,可以让 Spark 对数据进行持久化。
Spark 可以使用 persist 和 cache 方法将任意 RDD 缓存到内存、磁盘文件系统中。缓存是容错的,如果一个 RDD 分片丢失,则可以通过构建它的转换来自动重构。被缓存的 RDD 被使用时,存取速度会被大大加速。一般情况下,Executor 内存的 60% 会分配给 cache,剩下的 40% 用来执行任务。
cache 是 persist 的特例,将该 RDD 缓存到内存中。persist 可以让用户根据需求指定一个持久化级别。

谓词下推

谓词下推的基本思想:将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据。
hive官网上给出了outer join的谓词下推规则

Spark DAG&Stage

https://www.studytime.xin/article/spark-knowledge-rdd-stage.html

Spark Shuffle机制及调优

https://blog.csdn.net/qichangjian/article/details/88039576

MR&Spark Shuffle详解

https://mp.weixin.qq.com/s?__biz=MzUxOTU5Mjk2OA==&mid=2247485991&idx=1&sn=79c9370801739813b4a624ae6fa55d6c&chksm=f9f60740ce818e56a18f8782d21d376d027928e434f065ac2c251df09d2d4283710679364639&scene=21#wechat_redirect

Spark为什么快

Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。

Select month_id, sum(sales) from T group by month_id;

这个查询只有一次 shuffle 操作,此时,也许 Hive HQL 的运行时间也许比 Spark 还快,反正 shuffle 完了都会落一次盘,或者都不落盘。
结论 :Spark 快不是绝对的,但是绝大多数,Spark 都比 Hadoop 计算要快。这主要得益于其对 mapreduce 操作的优化以及对 JVM 使用的优化

Spark reduce by key& group by key

  1. 都作用于RDD[k,v]
  2. 都根据Key来分组聚合
  3. 默认分区数量是不变的,但都可以通过参数指定分区数量
  1. group by key默认没有聚合函数,得到的返回值是RDD[k,Iterable[V]]
  2. reduce by key必须传聚合函数,得到的返回值是RDD[k,聚合后的V]
    3.groupbykey.map()=reducebykey

Spark读取文件,分片

  1. 调用textFile方法,需要传入文件路径,分区数
  2. 调用hadoopFile方法,获取Hadoop configuration并广播,设置读取的文件路径,实例化Hadoop RDD
  3. HadoopRDD的getPartitions()方法,设置分片,分片规则如下:
    Math.max(minSize,Math.min(goalSize,blockSize))

Spark统一内存模型

1.Spark统一内存模型
相关内存设置参数:
spark.executor.memory=8G;
spark.executor.memoryOverhead=6G;
spark.memory.fraction=0.6;
其中spark.memory.fraction不能设置太高,需要为otherememory留一些富裕内存因为spark内存统计信息收集是由延迟的,如果该值太大,且spill较重的情况下,会导致内存释放不及时而oom。
jvm堆内的内存分为四个部分:
Unified Memory:统一内存,包含Storage内存和Execution内存,由spark.memory.fraction控制(spark2.0+默认为0.6,占可用内存(系统内存减去预留内存)的60%,spark1.6默认0.75)
Storage内存:主要用于rdd的缓存(比如Broadcast的数据),缓存数据,由spark.storage.storageFraction控制(默认0.5,占统一内存的50%)
Execution内存:用于缓存在执行shuffle过程中产生的中间数据(由1-spark.storage.storageFraction控制),用户spark的计算,shuffle,sort,aggregation这些计算会用到的内存
Storage内存和Execution内存之间存在动态占用机制,若己方不足对方空余则可占用对方,Execution内存被对方占用后可强制回收
Other Memory:其它,默认占可用内存的40%,用于spark内部的一些元数据,用户的数据结构,防止在稀疏和异常大的记录的情况下出现对内存估计不足导致oom时的内存缓冲
reservedMemory:预留内存300M,用于保障spark的正常运行

execution内存和storage内存动态占用机制的理解:
1.不适用缓存(storage)的应用程序可以将整个空间用于执行(execution),从而避免不必要的磁盘溢写
2.storage曾经想execution借用了空间,它缓存的数据可能非常的多,然后execution又不需要那么大的空间,假设现在storage占了80%,execution占了20%,然后当execution空间不足时,execution会向内存管理器发信号把storage曾经占用的超过50%数据的那部分强制挤掉(注意:drop后数据会不会丢失主要是看你在程序设置的storage_level来决定你是Drop到哪里,可能是
drop到磁盘)
3.execution空间不足的情况下,除了选择向storage借用空间以外,也可以把一部分数据spill到磁盘,但很多时候基于性能调优的考虑不想把数据溢写到磁盘,会优先选择向storage借空间。如果此时storage实际占用不足50%,则会借空间给execution。但当storage发现自己空间不足时(指不能放下一个完整的block),只能等execution释放空间。

Flink相关问题

https://www.cnblogs.com/qiu-hua/p/13767131.html

上一篇 下一篇

猜你喜欢

热点阅读