大数据

一篇文章学会spark-streaming

2017-03-10  本文已影响193人  bigdataer

版权申明:转载请注明出处。
文章来源:http://bigdataer.net

1.什么是spark-streaming?

实际生产中会有许多应用到实时处理的场景,比如:实时监测页面点击,实时监测系统异常,实时监测来自于外部的攻击。针对这些场景,twitter研发了实时数据处理工具storm,并在后来开源。spark针对这些场景设计了spark-streaming实时计算模型,它允许用户使用一系列批处理的API去处理实时数据,能做到代码逻辑的重复使用。
和spark中的rdd非常相似,spark-streaming中使用离散化流(discretized stream)作为抽象的表示,叫做DStream。它是随时间推移而收集数据的序列,每个时间段收集到的数据在DStream内部以一个RDD的形式存在。DStream支持从kafka,flume,hdfs,s3等获取输入。DStream也支持两种操作,即转化操作和输出操作(区别于RDD中的行动操作)。转化操作又分为无状态的转化操作和有状态的转化操作,无状态的转化操作有map,filter,flatmap,repartition等,是针对单个时间区间内的操作。而有状态的转化操作可以针对不同的时间区间,后面详述。

2.两个简单的例子

2.1 监听socket获取数据,代码如下:
这里使用nc -lk 9999 在ip为10.121.33.44的机器上发送消息

object SocketStream {
  def main(args: Array[String]): Unit = {
    //本地测试,设置4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒为一个批次
    val ssc = new StreamingContext(conf,Seconds(10))
    //接收消息
    val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER)
    //监测关键字error,出现则print
    dstream.filter(_.contains("error")).foreachRDD(rdd=>{
      rdd.foreach(println(_))
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

2.2 从kafka读取数据,比较常用

object KafkaStream {

  def main(args: Array[String]): Unit = {
    //本地测试,设置4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒为一个批次
    val ssc = new StreamingContext(conf,Seconds(10))

    val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
    val group_id = "realtime_data"

    //kafka相关参数
    val kafka_param = Map[String,String](
      "zookeeper.connect" ->zkQuorum,
      "group.id" -> group_id,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760"
    )
    val topic = Map[String,Int]("test_topic" -> 16)
    //接收消息
    val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    //监测关键字error,出现则print
    dstream.filter(_.contains("error")).foreachRDD(rdd=>{
      rdd.foreach(println(_))
    })
    
    ssc.start()
    ssc.awaitTermination()
  }
}

3.再来谈架构

通过上面两个例子,你可能对spark-streaming有了初步的了解,我们再来看一下它的架构。
Spark-streaming使用"微批次"的架构,把流式计算当做一系列微型的批处理操作来对待,每个时间段都产生一个RDD。如图:


wpc

作用于一个DStream上的无状态转化操作会对它其中的每个RDD生效,如针对一个输入为语句的DStream做flatMap操作的示意图如下:


shiyitu

4.转化操作

4.1 无状态的转化操作。
无状态转化操作就是简单的将转化作用于DStream的每个RDD上面。下面列举了一些常见的转化操作,其中最后一个transform表示可以试用自定义的转化函数,尽管它前面已经提供了很多现成的API。


wzt

4.2有状态的转化操作。
有状态的转化操作是跨时间段的数据操作,一些先前的批次也被用来在新的批次中做计算。主要有滑动窗口和updateStateByKey。前者以一个时间段为滑动窗口进行操作,后者则用来跟踪每个键的状态变化。有状态的转化操作需要打开检查点机制来保证容错性。即:给ssc.checkpoint()设置一个检查点目录。
(1)基于窗口的转化操作会在一个比ssc设置的更长的时间段内,通过整合多个批次的,计算出整个大的时间窗口的结果。基于窗口的操作需要两个参数,一个是窗口时长,一个是滑动步长。这两个参数是ssc设置的时长的整数倍。下面的图表示了一个时间窗口为3,滑动步长为2的窗口转化操作。


window
前面提到的监测关键字error的例子,现在需要每隔20s就对前面30s有error的日志记录做计数,代码如下:
object KafkaStream {

  def main(args: Array[String]): Unit = {
    //本地测试,设置4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒为一个批次
    val ssc = new StreamingContext(conf,Seconds(10))

    val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
    val group_id = "realtime_data"

    //kafka相关参数
    val kafka_param = Map[String,String](
      "zookeeper.connect" ->zkQuorum,
      "group.id" -> group_id,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760"
    )
    val topic = Map[String,Int]("test_topic" -> 16)
    //接收消息
    val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER)
      .map(_._2)
    //每隔20s对前30s出现error的日志做计数
    val errors = dstream.window(Seconds(30),Seconds(20))
        .filter(_.contains("error"))
        .count()
    errors.foreachRDD(rdd=>{
      rdd.foreach(println(_))
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

(2)updateStateByKey
updateStateByKey能对键值对的数据进行不同批次间的数据计算,使用updateStateByKey,需要传入一个update函数,这个函数接收某个key最新批次对应的values,以及该key之前对应的value,按照自定义的逻辑返回一个新的value。如需要计算一个实时日志中http响应码的计数,代码如下:

object KafkaStream {

  def main(args: Array[String]): Unit = {
    //输出目录
    val output = args(0)
    //本地测试,设置4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒为一个批次
    val ssc = new StreamingContext(conf,Seconds(10))

    val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
    val group_id = "realtime_data"

    //kafka相关参数
    val kafka_param = Map[String,String](
      "zookeeper.connect" ->zkQuorum,
      "group.id" -> group_id,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760"
    )
    val topic = Map[String,Int]("test_topic" -> 16)
    //接收消息
    val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    val rdd = dstream.map(_.split("\001"))
      .map(x=>(x(0),x(1).toLong))
      .updateStateByKey(update)
    //输出
    rdd.foreachRDD(_.saveAsTextFile(output))
    ssc.start()
    ssc.awaitTermination()
  }
  //update函数
  def update(new_values:Seq[Long],old_value:Option[Long]):Option[Long]={
    val current_num = new_values.size
    val result_num = current_num + old_value.getOrElse(0L)
    Some(result_num)
  }
}

(3)所有有状态转化操作


state

5.输出操作

输出操作比较简单,有以下几种:


save

6.作业稳定性

spark-streaming作业一般都要全天候不间断运行,那么作业的稳定性如何保证?主要有以下几点:
6.1 检查点机制。
其原理就是阶段性的将作业运行的数据存放到存储系统,如hdfs,s3等。当作业运行出现异常时可以从上述数据中恢复。
6.2 驱动器容错。
在创建实时计算作业的上下文时使用getOrCreate函数。代码如下:

    val ssc = StreamingContext.getOrCreate(cp_dir,createContext )
    def createContext(): StreamingContext  ={
      val sc = new SparkContext(conf)
      val ssc = new StreamingContext(sc,Seconds(10))
      ssc.checkpoint(cp_dir)
    }

更多文章请关注微信公众号:bigdataer

wx
上一篇下一篇

猜你喜欢

热点阅读