SparkStream与kafka对接

2023-02-09  本文已影响0人  CoderInsight

1.官网简介

SparkStream与kafka对接.png

2.参考资料

1,必读:再讲Spark与kafka 0.8.2.1+整合
2,必读:Spark与kafka010整合
3,spark streaming kafka 整合(010-Consumer)

3.两版本代码整理

(1).spark-streaming-kafka-0-8

1),当前版本依赖的jar包

第一和第二个jar包可以直接在当前版本的lib目录中拷贝。第三个jar包需要从本地下载好,然后上传执行。所以在本地的jar路径中我保存了一份完整的jar包。

[root@localhost jars]# pwd
/usr/local/spark/jars

[root@localhost jars]# ll | grep kafka
-rw-r--r--. 1 root root   3954430 Mar 20 11:34 kafka_2.11-0.8.2.1.jar
-rw-r--r--. 1 root root    324010 Mar 20 11:34 kafka-clients-0.8.2.1.jar
-rw-r--r--. 1 root root    298522 Mar 19 16:07 spark-streaming-kafka-0-8_2.11-2.1.0.jar

2).producer(生产者)

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

 // 指定再在哪个kafka服务器运行
val brokers="localhost:9092"
// 指定topic
val topic="demo01"
// 指定每秒钟向topic放多少消息
val messagesPerSec=3
// 每个消息中包含多少个单词
val wordsPerMessage=5

// 以下四行代码一般是固定的写法,之后直接复制粘贴就可以
// 使用键值对的方式将
val props = new HashMap[String, Object]()
// 指定 运行 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
// 对key和value都去序列化,且因为我定义的Key和Value的类型都是String的,所以直接用的 StringSerializer去序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
// 生产者
val producer = new KafkaProducer[String, String](props)

// 在while循环中不断的生成数据
while(true) {
  // 控制一秒钟生成多少数据(1 to XX 会生成 Range 类型变量)
  (1 to messagesPerSec.toInt).foreach { messageNum =>
      // 控制一条消息中有几个单词 1 to XXX,接下来对集合中每一个元素执行map操作
    val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) // Random的nextInt(10)的作用是 生成0-9之间的一个随机数
      .mkString(" ") // 指定生成随机数根据什么符号进行分割
      print(str)
      println()
    // 生产者 生产出来的消息类型都是 ProducerRecord 类型的,
      // 指定 主题、key、value
    val message = new ProducerRecord[String, String](topic, null, str)
    // 将消息对象发送出去
    producer.send(message)
  }
 Thread.sleep(3000) // 线程休眠(3秒)
}

3).consumer(消费者)

import _root_.kafka.serializer.StringDecoder

import org.apache.spark._
import org.apache.spark.SparkConf

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

// 设置与brokers获取连接
val brokers="localhost:9092"
// 可以指定多个分区,然后使用 "," 分割
val topics="demo01"

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount2").setMaster("local[2]")
// 创建具有2秒批处理间隔的上下文
val ssc = new StreamingContext(sparkConf, Seconds(2))

// 为了 容错 (目录不存在的时候,其可以自动创建)
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") // 设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动hadoop

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

// 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
// 由于Producer生产的数据,第一个值是null,我们用不到,所以直接去数据的第二个数据
val lines = messages.map(_._2)
// 拿到数据进行拆分,拿到每一个单词
val words = lines.flatMap(_.split(" "))
// 对每一个单词,再去做map变换,然后进行计数
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
// 将结果打印
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()

4).知识补充

(2).spark-streaming-kafka-0-10

1).当前依赖的jar包

第一个jar包可以直接在当前版本的lib目录中拷贝。第二个jar包需要从本地下载好,然后上传执行。所以在本地的jar路径中我保存了一份完整的jar包。

[root@master jars]# pwd
/usr/local/spark/jars

[root@master jars]# ll | grep kafka
-rw-r--r--. 1 root root   946811 Mar 20 16:02 kafka-clients-0.10.2.0.jar
-rw-r--r--. 1 root root   190413 Mar 20 14:51 spark-streaming-kafka-0-10_2.11-2.3.0.jar

2).Producer(生产者)

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
// 注意当前版本的stream对象是与旧版本不一样的
import org.apache.spark.streaming.kafka010._


// 指定再在哪个kafka服务器运行
val brokers="localhost:9092"
// 指定topic
val topic="demo01"
// 指定每秒钟向topic放多少消息
val messagesPerSec=3
// 每个消息中包含多少个单词
val wordsPerMessage=5

// 以下四行代码一般是固定的写法,之后直接复制粘贴就可以
// 使用键值对的方式将
val props = new HashMap[String, Object]()
// 指定 运行 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
// 对key和value都去序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
// 生产者
val producer = new KafkaProducer[String, String](props)

// 在while循环中不断的生成数据
while(true) {
  // 控制一秒钟生成多少数据(1 to XX 会生成 Range 类型变量)
  (1 to messagesPerSec.toInt).foreach { messageNum =>
      // 控制一条消息中有几个单词 1 to XXX,接下来对集合中每一个元素执行map操作
    val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) // Random的nextInt(10)的作用是 生成0-9之间的一个随机数
      .mkString(" ") // 指定生成随机数根据什么符号进行分割
      print(str)
      println()
    // 生产者 生产出来的消息类型都是 ProducerRecord 类型的,
      // 指定 主题、key、value
    val message = new ProducerRecord[String, String](topic, null, str)
    // 将消息对象发送出去
    producer.send(message)
  }
 Thread.sleep(3000) // 线程休眠(3秒)
}

3).Consumer(消费者)

// 代码是在jupyter中运行的,所以没有加函数体等
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

// 创建配置文件对象
var sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkKafKaTest")
// 创建具有5秒批处理间隔的上下文
val ssc = new StreamingContext(sparkConf, Seconds(5))

// 如果指定多个topic,那么在这里直接使用","进行分割
val topics = Array("demo01")
// 配置kafka的相关参数
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

// 创建 DirectStream 对象(DStream对象:实际上就是一堆RDD的集合)
// 此时的数据来源是从kafka来的;每行数据是以(K,V)的形式存放??
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

// 将DStream中数据进行map变换,将每一行记录中的key和value值进行整理,然后打印输出
stream.map(record => (record.key, record.value)).print()
/*
这里是将计数操作直接写在了一行代码中:
1.将数据中value取出来;(数据是以key,value的形式存的吗?????)
2.将每一行数据进行单词的切分,
3.对切分之后的单词进行map变换,重新映射为另一种形式
4.然后对转换之后的数据进行单词计数
5.最后将结果打印在屏幕上
*/
stream.map(record =>  record.value).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

// 启动当前程序
ssc.start() 
// 等待一个终止命令
ssc.awaitTermination() 
def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] {...}
上一篇 下一篇

猜你喜欢

热点阅读