杂物间程序员首页投稿(暂停使用,暂停投稿)

Spark Streaming + Kakfa 编程指北

2016-03-27  本文已影响2821人  牛肉圆粉不加葱

本文简述如何结合 Spark Streaming 和 Kakfa 来做实时计算。截止目前(2016-03-27)有两种方式:

  1. 使用 kafka high-level API 和 Receivers,不需要自己管理 offsets
  2. 不使用 Receivers 而直接拉取 kafka 数据,需要自行管理 offsets

两种方式在编程模型、运行特性、语义保障方面均不相同,让我们进一步说明。
如果你对 Receivers 没有概念,请先移步:揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入

方式一:Receiver-based

这种方法使用一个 Receiver 来接收数据。在该 Receiver 的实现中使用了 Kafka high-level consumer API。Receiver 从 kafka 接收的数据将被存储到 Spark executor 中,随后启动的 job 将处理这些数据。

在默认配置下,该方法失败后会丢失数据(保存在 executor 内存里的数据在 application 失败后就没了),若要保证数据不丢失,需要启用 WAL(即预写日志�至 HDFS、S3等),这样再失败后可以从日志文件中恢复数据。WAL 相关内容请参见:http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications


接下来讨论如何在 streaming application 中应用这种方法。使用 KafkaUtils.createStream,实例代码如下:

def main(args: Array[String]) {
  if (args.length < 4) {
    System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
    System.exit(1)
  }

  StreamingExamples.setStreamingLogLevels()

  val Array(zkQuorum, group, topics, numThreads) = args
  val sparkConf = new SparkConf().setAppName("KafkaWordCount")
  val ssc = new StreamingContext(sparkConf, Seconds(2))
  ssc.checkpoint("checkpoint")

  val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
  val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
  val words = lines.flatMap(_.split(" "))
  val wordCounts = words.map(x => (x, 1L))
    .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
  wordCounts.print()

  ssc.start()
  ssc.awaitTermination()
}

需要注意的点:

方式二:Without Receiver

自 Spark-1.3.0 起,提供了不需要 Receiver 的方法。替代了使用 receivers 来接收数据,该方法定期查询每个 topic+partition 的 lastest offset,并据此决定每个 batch 要接收的 offsets 范围。需要注意的是,该特性在 Spark-1.3(Scala API)是实验特性。

该方式相比使用 Receiver 的方式有以下好处:

当然,方式二相比于方式一也有缺陷,即不会自动更新消费的 offsets 至 Zookeeper,从而一些监控工具就无法看到消费进度。方式二需要自行保存消费的 offsets,这在 topic 新增 partition 时会变得更加麻烦。

下面来说说怎么使用方式二,示例如下:

 import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

Kafka 参数中,需要指定 metadata.broker.listbootstrap.servers。默认会从每个 topic 的每个 partition 的 lastest offset 开始消费,也可以通过将 auto.offset.reset 设置为 smallest 来从每个 topic 的每个 partition 的 smallest offset 开始消费。

使用其他重载的 KafkaUtils.createDirectStream 函数也支持从任意 offset 消费数据。另外,如果你想在每个 bath 内获取消费的 offset,可以按下面的方法做:

// Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array[OffsetRange]()
    
 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }

你可以用上面的方法获取 offsets 并保存到Zookeeper或数据库中等。

需要注意的是,RDD partition 与 Kafka partition 的一一对应关系在shuffle或repartition之后将不复存在( 如reduceByKey() 或 window() ),所以要获取 offset 需要在此之前。

另一个需要注意的是,由于方式二不使用 Receiver,所以任何 Receiver 相关的配置,即spark.streaming.receiver.*均不生效,需要转而使用 spark.streaming.kafka.*。一个重要的参数是 spark.streaming.kafka.maxRatePerPartition,用来控制每个 partition 每秒能接受的数据条数的上限。

参考

  1. http://spark.apache.org/docs/latest/streaming-kafka-integration.html

欢迎关注我的微信公众号:FunnyBigData

FunnyBigData
上一篇 下一篇

猜你喜欢

热点阅读