我爱编程

Spark基础

2018-01-25  本文已影响0人  cjyang

Spark基础


几个重要的概念:


RDD

RDD提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销。一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集来创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和groupBy)而创建得到新的RDD。

  1. RDD的两种创建方式(均通过SparkContext创建):
    1. 读取文件中的数据集:sc.textFile(filePath)
    2. 在Driver中已经存在的一个集合上创建:sc.parallelize(array)
  2. RDD创建好后,会进行两种操作:
    1. 转换:对现有数据集进行处理,结果还是数据集。常见转换操作有
      • filter(func) :筛选
      • map(func) :对每个元素进行操作
      • groupByKey() :专用于键值对。按key进行分组
      • reduceByKey(func) :专用于键值对。对相同key的value进行聚合操作
    2. 行动:在数据集上进行运算,产生结果。常见行动操作有
      • count() :返回集合中的元素个数
      • collect() :以数组形式返回集合中的所有元素(会把所有worker结点上的RDD都抓取过来,有内存溢出的风险,可以先take一部分)
      • first() :返回数据集中的第一个元素
      • take(n) :以数组形式返回数据集中的前n个元素
      • reduce(func) :对数据集进行聚合操作
      • foreach(func) :对数据集的每个元素进行操作
    3. 由于惰性机制,==每次==在有行动操作后才会==从头进行计算==。为避免多次行动重复计算,可以用对计算结果进行持久化persist。调用cache方法时会调用persist(MEMORY_ONLY)

共享变量

默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是有时候,需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。

  1. 广播变量:允许程序开发人员在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。通过这种方式,就可以非常高效地给每个节点(机器)提供一个大的输入数据集的副本。Spark的“动作”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播。通过广播方式进行传播的变量,会经过序列化,然后在被任务使用时再进行反序列化。这就意味着,显式地创建广播变量只有在下面的情形中是有用的:当跨越多个阶段的那些任务需要相同的数据,或者当以反序列化方式对数据进行缓存是非常重要的。
  2. 累加器:一个数值型的累加器,可以通过调用sc.longAccumulator()sc.doubleAccumulator()来创建。运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是,这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值

数据操作

  1. 文件数据读写
    • sc.textFile(filePath)
  2. Hbase数据库
    • Hbase是一个分布式,面向列的非关系型分布式数据库,列可以划分列族
    • 通过java API调用
  3. Spark SQL
    • 主要结构为DataFrame。RDD数据内部结构是不可知的,但是DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息。共同点为同样为惰性运算。
    • 创建方式
      • 直接创建
      • RDD转换
      • 读写Parquet:是一种列式存储格式,可以高效的存储具有嵌套字段的记录。存储非常高效
      • JDBC连接MySQL
      • Hive

流计算

  1. 静态数据对应批量计算(hadoop),流数据对应流计算(实时计算)
  2. 流计算处理过程:
    • 实时采集
    • 实时计算
    • 实时查询


      流计算过程
  3. Spark Streaming的基本原理是将实时输入数据流以时间片(秒级)为单位进行拆分(DStream),然后经Spark引擎以类似批处理的方式处理每个时间片数据
  4. Spark Streaming和Storm最大的区别在于,Spark Streaming无法实现毫秒级的流计算,而Storm可以实现毫秒级响应。
    Spark Streaming无法实现毫秒级的流计算,是因为其将流数据按批处理窗口大小(通常在0.5~2秒之间)分解为一系列批处理作业,在这个过程中,会产生多个Spark 作业,且每一段数据的处理都会经过Spark DAG图分解、任务调度过程,因此,无法实现毫秒级相应。Spark Streaming难以满足对实时性要求非常高(如高频实时交易)的场景,但足以胜任其他流式准实时计算场景。相比之下,Storm处理的单位为Tuple,只需要极小的延迟。
  5. 编写Spark Streaming程序的基本步骤是:
    1.通过创建输入DStream来定义输入源
    2.通过对DStream应用转换操作和输出操作来定义流计算。
    3.用streamingContext.start()来开始接收数据和处理流程。
    4.通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。
    5.可以通过streamingContext.stop()来手动结束流计算进程
  6. 输入源:
    • 文件流:支持以秒为单位时间监听文件变化,对新增的文件进行实时计算ssc.textFileStream(filePath)
    • 套接字流:可以通过socket端口监听文件数据变化ssc.socketTextStream(socketPath)
    • RDD队列流:ssc.queueStream(rddQueue)
    • 流行日志采集系统:KafkaFlume
  7. 转换操作
    • 无状态转换:每个批次的处理不依赖以前的处理结果。eg:监听时,只处理本次监听过程中获取的输入数据,不会与之前监听到的数据处理结果有关系
    • 有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括
      • 基于滑动窗口的转换:eg:val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2)。当滑动窗口到达一个新的位置时,原来之前被窗口框住的部分数据离开了窗口,又有新的数据被窗口框住,但是,这时计算窗口内单词的词频时,不需要对当前窗口内的所有单词全部重新执行统计,而是只要把窗口内新增进来的元素,增量加入到统计结果中,把离开窗口的元素从统计结果中减去,这样,就大大提高了统计的效率。
      • 追踪状态变化的转换:在跨批次之间维护状态updateStateByKey()
  8. 输出操作:
    • 输出到文本:saveAsTextFiles(filePath)
    • 写入数据库:conn = java.sql.DriverManager.getConnection(url, user, password)
上一篇下一篇

猜你喜欢

热点阅读