大数据,机器学习,人工智能大数据玩转大数据

Spark Streaming整合Kafka

2019-05-26  本文已影响2人  董二弯

前几章介绍了KafkaSpark Streaming入门Spark Streaming进阶。在这一章一起学习Spark Streaming和Kafka的整合。

概述

kafka作为一个实时的分布式消息队列,实时的生产和消费消息,这里我们可以利用SparkStreaming实时计算框架实时地读取kafka中的数据然后进行计算。在spark1.3版本后,kafkaUtils里面提供了两个创建dstream的方法,一种为KafkaUtils.createDstream(需要receiver接收),另一种为KafkaUtils.createDirectStream。其中推荐使用KafkaUtils.createDirectStream的方式相比基于Receiver方式有几个优点:

综上我们只演示KafkaUtils.createDirectStream的方式进行整合。

整合流程

zkServer.sh start
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafka_spark
//查看创建的topic,有记录说明创建成功
kafka-topics.sh --list --zookeeper localhost:2181
./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_spark
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>2.4.2</version>
</dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>2.4.3</version>
</dependency>

这里spark-streaming的版本我选择的是spark-streaming_2.12:2.42版本,这是由于我本地用的Scala的环境是2.12.8,spark-streaming这个版本中用到的Scala版本就是2.12.8。之前我使用的是spark-streaming_2.12:2.11.8版本,项目启动时报环境不匹配的问题。所以在本地演示时需要选择合适的版本。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
  * Spark Streaming整合Kafka
  *
  * @author zhiying.dong@hand-china.com 2019/05/24 16:54
  */
object KafkaDirectWordCount{
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("DirectKafka")
      .setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(2))

    val topicsSet = Array("kafka_spark")
    val kafkaParams = mutable.HashMap[String, String]()
    //必须添加以下参数,否则会报错
    kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
    kafkaParams.put("group.id", "group1")
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
      )
    )

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
image.png

观察控制台发现可以统计字符出现的此时,说明Spark Streaming可以消费到Kafka中生产的消息


image.png
./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.3 --class com.imooc.spark.Test ~/lib/sparktrain-1.0.jar
上一篇下一篇

猜你喜欢

热点阅读