大数据,机器学习,人工智能大数据玩转大数据

Spark Streaming整合Flume

2019-05-23  本文已影响1人  董二弯

前几章介绍了FlumeSpark Streaming入门Spark Streaming进阶。这一章一起学习Spark Streaming整合Flume。

概述

flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。Spark Streaming对接Flume有两种方式,一种是Flume将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Pull拉取数据。在实际中用Pull方式较多,因为其中使用到的SparkSink具有缓冲区的作用,SparkStreaming通过有效的flume receiver去从Sink中拉取数据,拉取的数据有多副本的存储方式,增加了容错性,稳定性好并且在拉取过程中具有事务性。而Push方式可能出现数据丢失的问题。所以这里只演示Pull的方式。

整合步骤

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command= tail -F /root/data/data.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.30.131
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
../bin/flume-ng agent -n a1 -c conf/ -f exec-memory-spark.conf -Dflume.root.logger=INFO,console
 <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
 </dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.2.0</version>
 </dependency>
<dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.5</version>
</dependency>

代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming整合Flume的Pull方式
  */
object FlumePullWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2) {
      System.err.println("Usage: FlumePullWordCount <hostname> <port>")
      System.exit(1)
    }

    val Array(hostname, port) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    //获取flume中数据
    val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)

    flumeStream.map(x=> new String(x.event.getBody.array()).trim)
      .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
./spark-submit --master local[2] --name FlumePullWordCount --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 --class com.imooc.spark.FlumePullWordCount ~/lib/sparktrain-1.0.jar 192.168.30.131 8888
上一篇 下一篇

猜你喜欢

热点阅读