Spark从入门到放弃—Spark Streaming介绍

2021-03-04  本文已影响0人  HaloZhang

简介

Spark Streaming是核心Spark API的扩展,可对实时数据流进行可扩展,高吞吐量,容错处理。实时流可以有许多数据来源(例如Kafka,Flume,Kinesis或TCP套接字)等,并可以使用高级功能(如map,reduce,join和window)组成的复杂算法来处理数据。经过处理后的数据可以写入到文件系统、数据库、实时仪表盘等。 Spark Streaming总览
在Spark内部,Spark Streaming接收实时输入流,并将其分成若干批,这些批次被送进Spark Engine中处理,最后按批次产生最后的结果。Spark Streaming的处理管道示意图如下: Spark Streaming 处理过程

背景知识

什么是流处理

在正式介绍Spark Streaming之前,我们先来介绍一下什么叫做数据流。

数据流(data stream)是一组有序,有起点和终点的字节的数据序列。数据流传输是一种用于传输数据的技术,以便可以将数据作为稳定而连续的流进行处理。随着互联网的发展,流处理技术愈发变得重要。 流示意图
流处理是连续处理新到来的数据以更新计算结果的行为。在流处理中,输入数据是无边界的,没有预定的开始或结束。它是一系列到达流处理系统的事件(例如,信用卡交易,单击网站动作,或从物联网I o T传感器读取的数据),用户应用程序对此事件流可以执行各种查询操作(例如,跟踪每种事件类型的发生次数,或将这些事件按照某时间窗口聚合)。应用程序在运行时将输出多个版本的结果,或者在某外部系统 (如键值存储)中持续保持最新的聚合结果。

当然,我们可以将流处理(stream processing)与批处理(batch processing)进行比较,批处理是在固定大小输入数据集上进行计算的。通常,这可能是数据仓库中的大规模数据集,其包含来自应用程序的所有历史事件(例如,过去一个月的所有网站访问记录或传感器记录的 数据)。批处理也可以进行查询计算,与流处理差不多,但只计算一次结果。

流处理的应用场景

流处理系统主要有以下6个应用场景:

流处理的优点

在大多数情况下,批处理更容易理解、更容易调试、也更容易编写应用程序。此外,批量处理数据也使得数据处理的吞吐量大大高于许多流处理系统。然而,流处理对于以下两种情况非常必要。

Spark Streaming

Spark Streaming是用来处理实时数据流数据的,它是Spark Core API的一个非常有用的扩展。Spark Streaming可以对实时数据流进行高吞吐量、包含容错的流式处理。
Spark Streaming提供了一个高级抽象对象,名为discretized stream或者也叫DStream,用于代表连续的数据流。DStreams可以从输入数据源比如Kafka, Flume, and Kinesis等中创建得到,也可以通过对已有的DStream进行高阶操作得到。DStream代表的其实是一系列的RDDs。

Spark Streaming特性

Spark 流处理主要有以下6个特点:

Spark Streaming工作流程

Spark Streaming工作流程分为四个阶段。第一个是从各种数据源中获取流数据。这些数据源可以是流式数据源比如Akka, Kafka, Flume, AWS或者Parquet这样的实时数据流;也包括HBase, MySQL, PostgreSQL, Elastic Search, Mongo DB 以及 Cassandra这些用于静态或者批量流的数据源。一旦得到了数据流,Spakr可以在其之上使用Spark MLib API来进行机器学习算法处理,也可以使用Spark SQL来执行相关操作。最终,这些流的输出可以被存储在各种类型的数据存储系统中,比如HBase, Cassandra, MemSQL, Kafka, Elastic Search, HDFS 以及本地文件系统。 Spark 流处理总过程

Spark Streaming 基础组件

1. Streaming Context

Streaming Context用来消耗数据流。通过对它注册一个输入数据流,可以产生一个Receiver对象。这是使用Spark Streaming流处理功能的入口。Spark对包括Twitter、Akka和ZeroMQ等一些列数据源提供了默认的数据流读取实现。

一个StreamContext对象可以从SparkContext对象构造出来。SparkContext对象表示与Spark集群的连接关系,可以用来在该集群上创建RDD、累加器和广播变量。
可以通过以下两种方式来创建一个StreamingContext对象。

  1. 通过已有的SparkContext对象来创建:
import org.apache.spark._
import org.apache.spark.streaming._
var ssc = new StreamingContext(sc,Seconds(1)) //这个处理间隔时间要根据具体业务来设定
  1. 通过SparkCconf对象来创建:
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1)) //这个处理间隔时间要根据具体业务来设定

在定义好StreamingContext对象之后,可以做以下事情:

注意事项:

2. DStream

Discretized Stream 或者 DStream 是Spark Streaming提供的基础抽象对象。它代表了连续的数据流,可以是从数据源中获取到的数据流,也可以是通过对输入流进行Transformation操作之后得到的处理后的数据流。在Spark内部,DStream代表的是一个RDD序列。DStream中的每个RDD包含了数据流按照指定时间间隔分割后的某一段。如下图所示: 任何对DStreams的操作,实际上会应用到底层的RDD上去。比如我们需要统计流数据中的Word数量,那么我们对DStream执行flatMap操作,在底层,这个操作会被施加到每个RDD中,示意图如下: 底层的RDD的Transformation操作是由Spark Engine来执行的,DStream封装了大部分操作,给用户提供更高抽象级别的API。
对DStream的Transformation操作与RDD类似,并且在普通RDD上的Transformation操作大多数在DStream上也可用。 DStream Transformation操作 DStream上面使用较为广泛的Transformation操作:

这里再补充一些Transformation操作:

3. Input DStream

Input DStream是代表从原始数据流源头得到的数据流。每个Input DStream都有一个Receiver对象与之搭配,Receiver对象负责从源头获取数据流然后保存在Spark内存中用于处理。

Spark Streaming的内置数据源有两种分类,如下:

4. Output DStream

输出操作允许将DStream的数据写入到外部的系统中,比如数据库,文件系统。与RDD中的惰性求值类似,OutPut操作执行的时候,才会触发所有应用在DStream上的Transformation操作的实际执行。

输出操作一般有:

5. Caching

DStream可以允许开发者在内存中缓存或者持久化流数据,如果DStream中的数据会被多次计算的话,这通常是很有用的一种做法。可以通过调用DStream的persist()方法来实现缓存操作。 数据缓存过程

对于从网络中接收到的数据流,比如Kafka,Flume,Socket等,默认的数据持久化级别是将数据复制两份,然后存储到两个节点中以实现容错机制。

6. CheckPoint

一个流程序必须24/7全天候运行,因此必须能够抵抗与应用程序逻辑无关的故障,比如系统错误、JVM崩溃等。为了实现这种功能,Spark Streaming需要保存足够的信息到容错存储系统中,以便能够从故障中恢复。CheckPoint有两种类型:

简单描述一下,Metadata checkpointing 主要是用于从驱动错误等中恢复;对于某些有状态的Transformation操作,如果期间出现了错误,可以使用Data checkpointing从错误中恢复。

如果应用程序有以下要求,那么必须使用Checkpointing技术:

对于普通的流式程序,即没有执行带状态的Transformation操作,那么无需打开checkpointing操作。
通过在容错的、可靠的文件系统上设置一个检查点目录即可启动Spark的CheckPointing功能,即可以保存检查点的信息,以便令程序可以从错误中恢复。可以通过执行streamingContext.checkpoint(checkpointDirectory)来实现这个功能。如果应用程序要使用带状态的Transformation操作,这一步是必须的。创建Checkpointing的示意代码如下:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果checkpointDirectory目录存在,那么context会从checkpoint数据中重建;如果目录不存在,那么函数functionToCreateContext会被调用以创建一个新的context对象,并且设置DStream。

参考

上一篇下一篇

猜你喜欢

热点阅读