程序员

mac上搭建kafka并利用spark streaming实时消

2018-04-30  本文已影响409人  学习之术
hello.jpg

Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,是由Apache软件基金会开发的一个开源流处理平台,由ScalaJava编写。

APP流行起来之后,企业就有了大量用户的行为数据,怎么有效的利用这些数据成了企业的重中之重。利用数据的第一步就是高效的采集数据,Kafka就是企业传输收集用户行为数据的常用系统。

程序员开发的算法都是部署在服务器上,要想在本地测试就得自己在本地搭建Kafka环境。通过模拟用户行为数据来营造真实环境,这样可以更方便地对代码进行调试。

mac上安装Kafka

采用mac下的安装利器homebrew,在终端输入brew install kafka即可,homebrew会自动安装kafka的依赖zookeeper。

使用brew安装后,kafka和zookeeper的配置文件路径如下,通常情况下我们也不需要做任何修改。

/usr/local/etc/kafka/server.properties
/usr/local/etc/kafka/zookeeper.properties

安装完之后,终端会显示启动提示消息:

brew install kafka

使用下面两个命令快速启动zookeeper和kafka:

brew services start zookeeper
brew services start kafka

模拟真实环境生产Kafka消息

虽然可以在终端生产消息,但要生产真实环境下的json格式的数据确很麻烦,所以写了脚本自动模拟真实环境下的数据。

先在idea上增加mavern-kafka依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
</dependency>

下面的代码创建了两个topic,每200毫秒随机产生一些用户在电商APP上的行为数据。

import java.util.Properties
import org.codehaus.jettison.json.JSONObject
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import org.apache.log4j.{Level, Logger}
import scala.util.Random

object KafkaEventProducer {
  private val goodsid = Array("1100", "1975", "24724", "4542")
  private val userid = Array("20ad3455", "47c4ea08","F4727214", "J9FE7E52")
  private val siteid = Array("67800", "60902")
  private val eventkey = Array("open", "goods_view", "addtobag", "impression", "checkout")
  private val pagename = Array("categories", "search", "goodsdetail", "Home")
  private val sid = Array("xie", "chen", "long")

  private val goodsLength = goodsid.length
  private val userdLength = userid.length
  private  val siteidLength = siteid.length
  private val eventkeyLength = eventkey.length
  private  val pagenameLength = pagename.length
  private val sidLength = sid.length

  private val random = new Random()
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    val topic = Array("impression", "event")
    val brokers = "127.0.0.1:9092"
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")

    val kafkaConfig = new ProducerConfig(props)
    val producer = new Producer[String, String](kafkaConfig)

    while(true) {
      val event = new JSONObject()
      event
        .put("userid", userid(random.nextInt(useridLength)))
        .put("eventtime", System.currentTimeMillis.toString)
        .put("siteid", siteid(random.nextInt(siteidLength)))
        .put("goodsid", goodsid(random.nextInt(goodsid.length)))
        .put("pagename", pagename(random.nextInt(pagenameLength)))
        .put("sid", sid(random.nextInt(sidLength)))
        .put("eventkey", eventkey(random.nextInt(eventkeyLength)))

      // produce event message
      producer.send(new KeyedMessage[String, String](topic(random.nextInt(2)), event.toString))
      println("Message sent: " + event)

      Thread.sleep(200)
    }
  }
}

在终端输入以下命令,接收topic-impression的消息.

kafka-console-consumer --bootstrap-server localhost:9092 --topic impression --from-beginning

spark streaming消费消息

Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。

下面的程序实现spark streaming每6秒打印一次从kafka收集到数据。

import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.broadcast.Broadcast


/**
  * @author XieChenlong
  * 流处理例子
  */
object streamingMab {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("StreamingMab").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Durations.seconds(GlobalConfig.KAFKA_INTERVAL_TIME))

    val KAFKA_TOPIC_NAME = Array("impression", "event")
    val KAFKA_BROKERS = "127.0.0.1:9092"
    val KAFKA_PARAMS = Map[String, Object](
        "bootstrap.servers" -> "127.0.0.1:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> “xieTest”,
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](KAFKA_TOPIC_NAME, GlobalConfig.KAFKA_PARAMS)
    )

    val eventDStream = stream.map(record => record.value)

    eventDStream.foreachRDD(
      rdd => if (!rdd.isEmpty()) {
        rdd.foreach {x =    >
            println(x)
        }
      }
    )
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

参考资料

  1. 不停游动的鱼, mac 本地安装kafka

  2. lusecond,mac环境下使用brew安装kafka

  3. spark, Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

  4. wiki, kafka

  5. Andi Ai, Spark编程指南 - 简体中文版

上一篇下一篇

猜你喜欢

热点阅读