dibo大数据学习一

Spark Streaming

2019-12-03  本文已影响0人  Movle

目录
一.Spark Streaming基础
    1.Spark Streaming简介
    2.Spark Streaming的特点
    3.Spark Streaming的内部结构
二.Spark Streaming进阶
    1.StreamingContext对象详解
    2.离散流(DStreams):Discretized Streams
    3.DStream中的转换操作(transformation)
    4.窗口操作
    5.输入DStreams和接收器
    6.DStreams的输出操作
    7.DataFrame和SQL操作
    8.缓存/持久化
    9.检查点支持
三.高级数据源
    1.Spark Streaming接收Flume数据
    2.Spark Streaming接收Kafka数据
四.性能优化
    1.减少批数据的执行时间
    2.设置正确的批容量
    3.内存调优

一.Spark Streaming基础

1.Spark Streaming简介

    Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。而且,您还可以在数据流上应用Spark提供的机器学习和图处理算法。

2.Spark Streaming的特点

(1)易用:集成在Spark中
(2)容错性:底层RDD,RDD本身就具备容错机制。
(3)支持多种编程语言:Java Scala Python

3.Spark Streaming的内部结构

    在内部,它的工作原理如下。Spark Streaming接收实时输入数据流,并将数据切分成批,然后由Spark引擎对其进行处理,最后生成“批”形式的结果流。

    Spark Streaming将连续的数据流抽象为discretizedstream或DStream。在内部,DStream 由一个RDD序列表示。

二.Spark Streaming进阶

1.StreamingContext对象详解

(1)初始化StreamingContext
(a)方式一:从SparkConf对象中创建

val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")

val src = new StreamContext(conf, Second(5))

(b)方式二:从一个现有的SparkContext实例中创建

scala >import org.apache.spark.streaming.{Second,StreamingContext}

scala >val ssc=new StreamContext(sc,Second(1))

(2).程序中的几点说明:

(3).请务必记住以下几点:

2.离散流(DStreams):Discretized Streams

(1)DiscretizedStream或DStream 是Spark Streaming对流式数据的基本抽象。它表示连续的数据流,这些连续的数据流可以是从数据源接收的输入数据流,也可以是通过对输入数据流执行转换操作而生成的经处理的数据流。在内部,DStream由一系列连续的RDD表示,如下图:

(2)举例分析:在之前的NetworkWordCount的例子中,我们将一行行文本组成的流转换为单词流,具体做法为:将flatMap操作应用于名为lines的 DStream中的每个RDD上,以生成words DStream的RDD。如下图所示:

但是DStream和RDD也有区别,下面画图说明:

RDD的结构 DStream的结构
3.DStream中的转换操作(transformation)

最后两个transformation算子需要重点介绍一下:

(1)transform(func)

(a)通过RDD-to-RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD

(b)举例:在NetworkWordCount中,也可以使用transform来生成元组对

(2)updateStateByKey(func)
(a)操作允许不断用新信息更新它的同时保持任意状态。

(c)输出结果:

(3)注意:如果在IDEA中,不想输出log4j的日志信息,可以将log4j.properties文件(放在src的目录下)的第一行改为:

log4j.rootCategory=ERROR,console
4.窗口操作

    Spark Streaming还提供了窗口计算功能,允许您在数据的滑动窗口上应用转换操作。下图说明了滑动窗口的工作方式:

    如图所示,每当窗口滑过originalDStream时,落在窗口内的源RDD被组合并被执行操作以产生windowed DStream的RDD。在上面的例子中,操作应用于最近3个时间单位的数据,并以2个时间单位滑动。这表明任何窗口操作都需要指定两个参数。

    这两个参数必须是源DStream的批间隔的倍数(上图示例中为:1)。

    我们以一个例子来说明窗口操作。 假设您希望对之前的单词计数的示例进行扩展,每10秒钟对过去30秒的数据进行wordcount。为此,我们必须在最近30秒的pairs DStream数据中对(word, 1)键值对应用reduceByKey操作。这是通过使用reduceByKeyAndWindow操作完成的。

    一些常见的窗口操作如下表所示。所有这些操作都用到了上述两个参数 - windowLength和slideInterval。
(1)window(windowLength, slideInterval)

(2)countByWindow(windowLength, slideInterval)

(3)reduceByWindow(func, windowLength, slideInterval)

(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

(6)countByValueAndWindow(windowLength, slideInterval, [numTasks])

5.输入DStreams和接收器

    输入DStreams表示从数据源获取输入数据流的DStreams。在NetworkWordCount例子中,lines表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。

    输入DStreams表示从数据源获取的原始数据流。Spark Streaming拥有两类数据源:

下面通过具体的案例,详细说明:

(1)文件流:通过监控文件系统的变化,若有新文件添加,则将它读入并作为数据流

需要注意的是:

注意:要演示成功,需要在原文件中编辑,然后拷贝一份。

(2)RDD队列流
    使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream,用于调试Spark Streaming应用程序。

(3)套接字流:通过监听Socket端口来接收数据

6.DStreams的输出操作

    输出操作允许DStream的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。目前,定义了下面几种输出操作:

(1)foreachRDD的设计模式

DStream.foreachRDD是一个强大的原语,发送数据到外部系统中。

(a)第一步:创建连接,将数据写入外部数据库(使用之前的NetworkWordCount,改写之前输出结果的部分,如下)

出现以下Exception:

    原因是:Connection对象不是一个可被序列化的对象,不能RDD的每个Worker上运行;即:Connection不能在RDD分布式环境中的每个分区上运行,因为不同的分区可能运行在不同的Worker上。所以需要在每个RDD分区上单独创建Connection对象。

(b)第二步:在每个RDD分区上单独创建Connection对象,如下:

7.DataFrame和SQL操作

    我们可以很方便地使用DataFrames和SQL操作来处理流数据。您必须使用当前的StreamingContext对应的SparkContext创建一个SparkSession。此外,必须这样做的另一个原因是使得应用可以在driver程序故障时得以重新启动,这是通过创建一个可以延迟实例化的单例SparkSession来实现的。

    在下面的示例中,我们使用DataFrames和SQL来修改之前的wordcount示例并对单词进行计数。我们将每个RDD转换为DataFrame,并注册为临时表,然后在这张表上执行SQL查询。

8.缓存/持久化

    与RDD类似,DStreams还允许开发人员将流数据保留在内存中。也就是说,在DStream上调用persist() 方法会自动将该DStream的每个RDD保留在内存中。如果DStream中的数据将被多次计算(例如,相同数据上执行多个操作),这个操作就会很有用。对于基于窗口的操作,如reduceByWindow和reduceByKeyAndWindow以及基于状态的操作,如updateStateByKey,数据会默认进行持久化。 因此,基于窗口的操作生成的DStream会自动保存在内存中,而不需要开发人员调用persist()。

    对于通过网络接收数据(例如Kafka,Flume,sockets等)的输入流,默认持久化级别被设置为将数据复制到两个节点进行容错。

    请注意,与RDD不同,DStreams的默认持久化级别将数据序列化保存在内存中。

9.检查点支持

    流数据处理程序通常都是全天候运行,因此必须对应用中逻辑无关的故障(例如,系统故障,JVM崩溃等)具有弹性。为了实现这一特性,Spark Streaming需要checkpoint足够的信息到容错存储系统,以便可以从故障中恢复。

(1)一般会对两种类型的数据使用检查点:
(a)元数据检查点(Metadatacheckpointing) - 将定义流计算的信息保存到容错存储中(如HDFS)。这用于从运行streaming程序的driver程序的节点的故障中恢复。元数据包括以下几种:

(b)数据检查点(Datacheckpointing) - 将生成的RDD保存到可靠的存储层。对于一些需要将多个批次之间的数据进行组合的stateful变换操作,设置数据检查点是必需的。在这些转换操作中,当前生成的RDD依赖于先前批次的RDD,这导致依赖链的长度随时间而不断增加,由此也会导致基于血统机制的恢复时间无限增加。为了避免这种情况,stateful转换的中间RDD将定期设置检查点并保存到到可靠的存储层(例如HDFS)以切断依赖关系链。

    总而言之,元数据检查点主要用于从driver程序故障中恢复,而数据或RDD检查点在任何使用stateful转换时是必须要有的。

(2)何时启用检查点:
对于具有以下任一要求的应用程序,必须启用检查点:
(a)使用状态转:如果在应用程序中使用updateStateByKey或reduceByKeyAndWindow(具有逆函数),则必须提供检查点目录以允许定期保存RDD检查点。

(b)从运行应用程序的driver程序的故障中恢复:元数据检查点用于使用进度信息进行恢复。

(3)如何配置检查点:
    可以通过在一些可容错、高可靠的文件系统(例如,HDFS,S3等)中设置保存检查点信息的目录来启用检查点。这是通过使用streamingContext.checkpoint(checkpointDirectory)完成的。设置检查点后,您就可以使用上述的有状态转换操作。此外,如果要使应用程序从驱动程序故障中恢复,您应该重写streaming应用程序以使程序具有以下行为:
(a)当程序第一次启动时,它将创建一个新的StreamingContext,设置好所有流数据源,然后调用start()方法。

(b)当程序在失败后重新启动时,它将从checkpoint目录中的检查点数据重新创建一个StreamingContext。
使用StreamingContext.getOrCreate可以简化此行为

(4)改写之前的WordCount程序,使得每次计算的结果和状态都保存到检查点目录下

通过查看HDFS中的信息,可以看到相关的检查点信息,如下:

三.高级数据源

1.Spark Streaming接收Flume数据

(1)基于Flume的Push模式

    Flume被用于在Flume agents之间推送数据.在这种方式下,Spark Streaming可以很方便的建立一个receiver,起到一个Avro agent的作用.Flume可以将数据推送到改receiver.

(a)第一步:Flume的配置文件

(b)第二步:Spark Streaming程序

(c)第三步:注意除了需要使用Flume的lib的jar包以外,还需要以下jar包:

spark-streaming-flume_2.1.0.jar

(d)第四步:测试

(2)基于Custom Sink的Pull模式

    不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume sink。Flume将数据推送到sink中,并且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从sink拉取数据。只要当数据被接收并且被Spark Streaming备份后,转换器才运行成功。

    这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式需要为Flume配置一个正常的sink。

以下为配置步骤:

(a)第一步:Flume的配置文件

(b)第二步:Spark Streaming程序

(c)第三步:需要的jar包

spark-streaming-flume-sink_2.1.0.jar

(d)第四步:测试

2.Spark Streaming接收Kafka数据

Apache Kafka是一种高吞吐量的分布式发布订阅消息系统。

Kafka

(1)搭建ZooKeeper(Standalone):
(a)配置/root/training/zookeeper-3.4.10/conf/zoo.cfg文件

dataDir=/root/training/zookeeper-3.4.10/tmp
server.1=spark81:2888:3888

(b)在/root/training/zookeeper-3.4.10/tmp目录下创建一个myid的空文件

echo 1 > /root/training/zookeeper-3.4.6/tmp/myid

(2)搭建Kafka环境(单机单broker):
(a)修改server.properties文件

(b)启动Kafka

bin/kafka-server-start.sh config/server.properties &

出现以下错误:

需要修改bin/kafka-run-class.sh文件,将这个选项注释掉。

(c)测试Kafka

bin/kafka-topics.sh --create --zookeeper spark81:2181 -replication-factor 1 --partitions 3 --topic mydemo1
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
bin/kafka-console-consumer.sh --zookeeper spark81:2181 --topic mydemo1

(3)搭建Spark Streaming和Kafka的集成开发环境

    由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程。
下面是依赖的pom.xml文件:

(4)基于Receiver的方式

    这个方法使用了Receivers来接收数据。Receivers的实现使用到Kafka高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据。

(a)开发Spark Streaming的Kafka Receivers

(b)测试

bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1

(5)直接读取方式

    和基于Receiver接收数据不一样,这种方式定期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每个batch里面处理数据。当作业需要处理的数据来临时,spark通过调用Kafka的简单消费者API读取一定范围的数据。

(a)开发Spark Streaming的程序

(b)测试

bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1

四.性能优化

1、减少批数据的执行时间

在Spark中有几个优化可以减少批处理的时间:
(1)数据接收的并行水平

    通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上)接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream可以被切分为两个kafka输入流,每个接收一个topic。这将在两个worker上运行两个receiver,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作可以运用在合并的DStream上。

(2)数据处理的并行水平

    如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。默认的并发任务数通过配置属性来确定spark.default.parallelism。

(3)数据序列化

    可以通过改变序列化格式来减少数据序列化的开销。在流式传输的情况下,有两种类型的数据会被序列化:

    在上述两种情况下,使用Kryo序列化格式可以减少CPU和内存开销。

2.设置正确的批容量

    为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这可以通过流的网络UI观察得到。批处理时间应该小于批间隔时间。

    根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。可以考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能可以每2秒打印一次单词计数(批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。

    找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。

3.内存调优

在这一节,重点介绍几个强烈推荐的自定义选项,它们可以减少Spark Streaming应用程序垃圾回收的相关暂停,获得更稳定的批处理时间。
(1)Default persistence level of DStreams:
    和RDDs不同的是,默认的持久化级别是序列化数据到内存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即使保存数据为序列化形态会增加序列化/反序列化的开销,但是可以明显的减少垃圾回收的暂停。

(2)Clearing persistent RDDs:
    默认情况下,通过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。如果spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化RDD将会被定时的清理掉。正如前面提到的那样,这个值需要根据Spark Streaming应用程序的操作小心设置。然而,可以设置配置选项spark.streaming.unpersist为true来更智能的去持久化(unpersist)RDD。这个配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。

(3)Concurrent garbage collector:
    使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。尽管并发的垃圾回收会减少系统的整体吞吐量,但是仍然推荐使用它以获得更稳定的批处理时间。

上一篇 下一篇

猜你喜欢

热点阅读