spark大数据学习

SparkStreaming

2020-03-20  本文已影响0人  TZX_0710

SparkStreaming是spark的一个子模块,用与快速构建可扩展,高吞吐量,高容错的流处理程序

  1. 通过高级API构建应用程序,简单易用。
  2. 支持多种语言 ,Java、Scala、Python
  3. 良好的容错性,SparkStreaming 支持快速从失败中恢复丢失的操作状态
  4. 能够和Spark其他模块无缝集成,将流处理与批处理完美结合
  5. SparkStreaming可以从HDFS、Flume、kafka、Twitter和ZeroMQ读取数据,也支持自定义数据源。
  1. 什么是流处理?
    在讲什么是流处理的时候我们可以回想一下以前在程序计算或者查询数据的时候是怎么做的。
    在流处理之前,数据通常存储在数据库,或者文件存储系统种。应用程序根据需要去查询数据计算数据。Hadoop采用Hdfs存储数据,采用Mapreduce处理数据,这是一个典型的静态数据处理架构。数据计算必须从存储系统当中获取数据,所以其中必须先存入存储系统我们才可以使用这一部分数据。那么有没有什么办法可以在数据产生的过程从就直接进行使用计算呢?答案肯定是有的。所以流处理应需而生。

流处理:对运动中搞的数据处理,在接受数据时直接计算。流出来带来的优势如下:

  1. 应用程序立即对数据做出响应:降低数据的滞后性,使得数据更具有时效性,更能反映未来的预期。
  2. 流处理可以处理更大的数据量:直接处理数据量,只保留有意义的数据,降低需要处理的数据量。
  3. 流处理更贴近显示的数据模型:在实际的环境中,一切数据都是持续变化的,最典型的案例就是股票市场,金融市场,流处理能更好的应对这些数据的连续性。
  4. 流处理分散和分离基础设施:流处理减少了对大型数据库的需求。每个流处理程序徐通过流处理框架维护自己的数据和状态。

DStream

Spark Streaming提供为离散流DStream高级抽象,用于表示连续的数据流。DStream可以来自kafka,flume等数据源的输入流创建,也可以由其他DStream转化而来。在内部,DStream表示为一系列RDD。

Spark&storm&Flink

storm 和 Flink 都是真正意义上的流计算框架,但 Spark Streaming 只是将数据流进行极小粒度的拆分,拆分为多个批处理,使得其能够得到接近于流处理的效果,但其本质上还是批处理(或微批处理)。
在windows下面开发测试采用 nc命令 安装包在百度云盘
链接:https://pan.baidu.com/s/1__dJ7dkOwtXDuK-xsQ_wxQ
提取码:hwjw
解压复制nc.exe 到C:/users/自己用户目录下就可以了
SparkStreaming示例
1.引入jar包
spark-streaming_2.11-2.1.2.jar

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStream_example {
  def main(args: Array[String]): Unit = {
    //指定时间间隔5s秒
    val sparkConf=new SparkConf().setAppName("SparkStreaming").setMaster("local[2]")
//StreamingContext是SparkStreaming的编程入口在创建的时候指明sparkConf和批次时间 
    val ssc=new StreamingContext(sparkConf,Seconds(5))
    //创建文本输入流进行词频统计  建立一个socket通道对node01的9999端口进行监听
    val lines=ssc.socketTextStream("node01",9999)//在windows下面的话就用localhost 此处采用hostname
    lines.flatMap(_.split("").map(x=>(x,1))).reduceByKey(_+_).print()
    //启动服务
    ssc.start()
   //等待服务结束
    ssc.awaitTermination()
  }
}


在示例代码中采用的是SocketTextStream来创建基于Socket的流,实际上Spark支持多种数据源,大体可以分为2类:
基本数据源:包括文件系统、Socket连接等
高级数据源:包括Kafka、Flume、Kinesis等
注:在基本数据源,Spark支持监听HDFS上指定目录,当有新文件加入时,会获取其文件内容作为输入流。创建方式如下:

//对于文本文件,指明监听目录就可以
streamingContext.textFileStream()
//对于其他文件,需要指定目录,以及键类型,值类型,和输入格式
streamingContext.fileStream()
//启动服务
streamingContext.start()
//使服务处于等待可用状态
streamingContext.awaitTermination()
//在遇到异常或者在需要情况下手动停用
streamingContext.stop()

DStream的Transformaction

DStream是SparkStreaming提供的基本抽象,但是本质上而言,应用于DStream底层还是会转换成RDD操作。所以DStream可以支持大部分RDD上的算子操作。除了RDD上的算子操作,DStream还提供部分仅有的Transformation算子。

1.updateStateBykey

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DStreamTransoformations {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    //第一步创建SparkConf
    val sparkConf = new SparkConf().setAppName("DStream").setMaster("local[2]")
    //SparkStreaming程序入口
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    //保存检查点的信息 也就是每一次输入的词数据都会保存在这里 然后累加求词频的时候从检查点中取出数据
    ssc.checkpoint("hdfs://node01:8020/spark-streaming")
    val lines = ssc.socketTextStream("localhost", 9999)
    lines.flatMap(_.split(" "))
      .map(x => (x, 1))
      .updateStateByKey[Int](updateFunction _)
      .print()
    ssc.start()
    ssc.awaitTermination()
  }
//当前的值+上保存在检查点中的值
  def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
    val current = currentValues.sum
    val pre = preValues.getOrElse(0)
    Some(current + pre)
  }
}

删除无用的检查点
第一次
累加统计

Spark Streaming &&Flume

Apache Flumes是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中,Spark Streaming提供两种整合Flume的方式。
1.推送式方法
2.拉取式方法

下方示例在windows开发环境下测试

1. 推送式方法

推送式方法,SparkStreaming 程序需要对某个端口进行监听,Flume通过avro sink将数据源源不断推送到该端口。

pom文件添加依赖

     <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.7.7</version>
            </dependency>
            <!-- Spark Streaming-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- Spark Streaming整合Flume依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-flume_${scala.version}</artifactId>
                <version>2.4.3</version>
            </dependency>
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object spark_flume {

  def main(args: Array[String]): Unit = {
    //创建SparkConf
    val sparkConf=new SparkConf().setAppName("SparkFlume").setMaster("local[2]")
    //创建SparkStreaming编程入口
    val streamingContext = new StreamingContext(sparkConf,Seconds(5))
    //
    val flumeStream =FlumeUtils.createStream(streamingContext,"localhost",8888)
    //打印输入流的数据
    // 2.打印输入流的数据po
    flumeStream.map(line => new String(line.event.getBody.array()).trim)print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

windows下面使用tail 命令 解压放在C:\windows\System32下面
链接:https://pan.baidu.com/s/1FR7rwcxCFP5sSu8Y5JnEgA
提取码:uyec

解压flume压缩包
在解压的flume的conf文件目录下新建一个example.conf配置文件
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f demo.txt #tail命令windows环境下不能使用 所以需要自己配置一下tail在windows下使用
a1.sources.s1.channels = c1

#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1

#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

启动flume

采用如下命令在windows下面启动flume

flume-ng agent --conf ../conf --conf-file ../conf/example.conf --name a1 -property flume.root.logger=INFO,console

启动scala的sparkStreaming示例

控制台示例
powerShell追加内容到监控文件
控制台输出内容

2. 拉取式方法

拉取式方法是将数据推送到SparkSink接收器中,此时数据会保存缓冲状态,SparkStreaming定时从接收器中拉取数据。这种方法是基于事务的,只有SparkStreaming接受和复制完成数据后才会删除缓存数据。与第一种方式相比,具有更加可靠性和容错保证。

  <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.8.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.avro/avro-ipc -->
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-ipc</artifactId>
                <version>1.8.2</version>
            </dependency>
            <!-- Spark Streaming-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- Spark Streaming整合Flume依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-flume_${scala.version}</artifactId>
                <version>2.4.3</version>
            </dependency>
<!--部署的时候不需要scala-library、commons-lang3 因为Spark已经存在-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.12.8</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.5</version>
            </dependency>
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f demo.txt
a1.sources.s1.channels = c1

#配置sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1

#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

把Spark-strem-flume-{scala-version}.jar 拷贝到flume下面的lib目录

启动flume

flume-ng agent --conf ../conf --conf-file ../conf/pull.conf --name a1 -property flume.root.logger=INFO,console

启动示例代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object spark_flume {

  def main(args: Array[String]): Unit = {
    //创建SparkConf
    val sparkConf=new SparkConf().setAppName("SparkFlume").setMaster("local[2]")
    //创建SparkStreaming编程入口
    val streamingContext = new StreamingContext(sparkConf,Seconds(5))
    //
    val flumeStream =FlumeUtils.createPollingStream(streamingContext,"localhost",8888)
    //打印输入流的数据
    // 2.打印输入流的数据po2
    flumeStream.map(line => new String(line.event.getBody.array()).trim)print()
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

向demo.txt追加

powershell追加
控制台输出

在整合的时候遇到的问题:

  1. 启动flume的时候报错
Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.
解决方案如下:添加spark-streaming-flume_{scala.version}.jar到flume的lib下面
  1. flume启动不报错、spark启动雪崩如下信息
 Did not receive events from Flume agent due to error on the Flume agent: Unknown Error
解决方案:在flume下面的lib包中的scala-library.jar 修改成pom文件中对应的版本
  1. 控制台出现如下错误
org.apache.avro.AvroRuntimeException: Unknown datum type:
 java.lang.Exception: java.lang.NoClassDefFoundError: 
Could not initialize class org.apache.spark.streaming.flume.sink.EventBatch
<!--解决方案:pom文件中的原来的下面是1.77的 修改版本信息为1.82高版本-->
<dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.avro/avro-ipc -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-ipc</artifactId>
            <version>1.8.2</version>
        </dependency>
上一篇下一篇

猜你喜欢

热点阅读