Spark Streaming(三)集成Kafka

2018-03-14  本文已影响0人  Sx_Ren

Kafka从0.8版本到0.10版本提供了一种新的消费者api,所以根据你的kafka版本不同相应的有2种包可用,分别是spark-streaming-kafka-0-8spark-streaming-kafka-0-8,前者可兼容kafka 0.8及其以上版本,后者只能兼容0.10及其以上的版本,由于本篇文件基于Spark 2.2.0、Kafka 0.9.0,那么集成包的选择应该是选择前者,也就是spark-streaming-kafka-0-8 ,两个集成包的区别如下(图片截自官网):

SparkStreamingIntegrateKafka.png
使用spark-streaming-kafka-0-8,有两种方式可以使用Spark Streaming从Kafka接收数据,第一种是Receiver-based方式,基于Receiver使用Kafka高级API,第二种是Direct 方式(Since Spark 1.3),没有Receiver

这种方式使用Receiver 接收数据,接收到的数据会保存在Spark executors里,启动Spark Streaming作业的时候再处理这些数据,这种方式很容易造成数据丢失,为了确保没有数据丢失,需要在Spark Streaming里开启Write Ahead Logs(从Spark 1.2引进)模式,数据处理前先把数据存储在(可以说是备份)分布式文件系统里(比如HDFS),这样在处理失败数据丢失的时候可以从文件系统恢复数据。


Receiver-Based
  1. 引入pom依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>${spark.version}</version>
 </dependency>
  1. Spark Streaming程序代码
if(args.length!=4){
    System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
    System.exit(1)
}
val Array(zkQuorum,group,topics,numThreads) = args
    
 val sparkConf = new SparkConf()//.setAppName("KafkaReceiverWordCount").setMaster("local[2]")
 val ssc = new StreamingContext(sparkConf,Seconds(5))
    
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)    message.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
    
ssc.start()
ssc.awaitTermination()

Receiver这种方式需要注意三点:

  1. 打包到生产环境运行
    因为Spark Streaming集成Kafka的jar包并未打到程序包里,所以spark-submit启动的时候需要通过--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0添加该jar包,第一次会先去下载jar包,速度会稍慢,第二次就可以直接使用了,详细命令如下:
spark-submit \
--class com.yxzc.KafkaReceiverWordCount \
--master local[2] \
--name KafkaReceiverWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/home/hadoop/lib/sparktrain-1.0.jar  hadoop000:2181 test kafka_streaming_topic 1

如果生产环境不能连接外网,或者网速很差时,可以先从maven仓库下载该jar包,然后在spark-submit是通过--jars指定该jar,个人比较推荐这种方式

spark1.3开始,引入了Direct方式,这种方式没有Receiver,它会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch,使用的是Kafka简单API


Direct Approach

这种方式相比Receiver方式,有以下优势:

  1. 引入pom依赖 同Receiver方式一样
  2. Spark Streaming程序代码
if(args.length!=2){
   System.err.println("Usage KafkaDirectWordCount <brokers> <topics> ")
   System.exit(1)
}
val Array(brokers,topics) = args   
val sparkConf = new SparkConf()//.setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(5))
    
val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
    
val topicsSet = topics.split(",").toSet
    
val message = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicsSet)
//    message.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
  1. 打包到生产环境运行 同Receiver方式一样
上一篇 下一篇

猜你喜欢

热点阅读