KafkaSpark

Spark从入门到精通36:Spark Streaming:集成

2020-04-15  本文已影响0人  勇于自信
1、Streaming定位

是Spark体系内的流式处理框架(和Storm对比)

2、和Storm对比

Storm:数据像水流一样,最基本的单位是tuple——毫秒级
Streaming:把水状的数据,按照时间进行离散化处理——秒级

3、和Spark Core对比

1.关系:
Spark Core是核心的计算引擎,支撑了很多项目,Streaming是其中一个
2.算子:
Core中,算子有两类:transformation和action ——> 懒惰机制
Streaming,算子有两类:transformation和output ——> 懒惰机制
3.开发形式:
Core:针对RDD开发,处理结构以DAG形式表现



Streaming:针对Dstream开发,处理结构以DstreamGraph形式表现



Dstream和RDD的关系:
1)Dstream:内部包含多个RDD,代表了一系列的连续的RDD,每一个RDD包含特定的时间间隔
2)DStream里面的各种操作是可以映射到内部的RDD上进行的
3)DStream的操作可以通过RDD的transformation生成新的Dstream
4)DStream的算子和RDD的算子不一样:
1)transformation
2)output:
执行算子:forEachRDD:对接外部服务:Hbase、Kafka、Hive

输出算子:saveAsTextFile:直接做输出

5、时间窗口

例如:统计最近一个小时内的PV量,要求每一分钟更新一次
1)窗口总长度(window length)—— 一个小时
2)滑动时间间隔(slide interval)—— 一分钟

6、Streaming架构:

master:分配任务:任务结构(Graph)
worker:处理任务,包含:接收数据+处理数据
client:喂数据
处理数据模式,2类:
1)recevier模式:被动——异步
优点:快
缺点:启动多executor
2)direct模式:主动——同步
优点:一个executor占用资源少
缺点:慢
架构图:



D-Stream lineage:DstreamGraph图
worker:Input receiver:对接client作数据接收
worker: Task excution:对数据做进一步计算
worker: Block manager、Comm Manager:数据管理模块

7、容错——数据容错WAL

streaming中wal工作原理:



当Driver失败重启后,恢复的流程:


8、streaming+kafka数据挤压,下游处理速度慢的问题解决

1)数据分布,调节offset(紧急情况)
2)并发调大(executor并发度,core数目等),需要kafka配合(增加分区数),提高线程数量
3)控制批次的规模——max.poll.records
4)控制数据处理超时时间(timeout)——max.poll.interval.ms

9、spark-core实现wordcount(本地提交方式)

1、wordcount实现代码:

package cn.spark.study.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object WordCount {
  
  def main(args: Array[String]){
    val conf = new SparkConf()
        .setAppName("WordCount");
    val sc = new SparkContext(conf)
    val lines = sc.textFile("file:///usr/local/spark-study/wc.txt",1)
    val words = lines.flatMap { line => line.split(" ")}
    val pairs = words.map{word => (word, 1)}
    val wordCounts = pairs.reduceByKey{_+_}
    wordCounts.foreach(wordCount =>println(wordCount._1+" appeared "+wordCount._2+" times."))
  }
}

2、打wordcount.jar包,放入/usr/local/spark-study目录下
3、在/usr/local/spark-study目录下新建文件wc.txt,内容例如:hello word
4、执行脚本:

/usr/local/src/spark-2.1.0-bin-hadoop2.7/bin/spark-submit \
--class cn.spark.study.core.WordCount \
--master local[1] \
/usr/local/spark-study/wordcount.jar

5、运行结果
hello 1
word 1

10、spark-core实现wordcount(本地运行方式)

1、代码编写

import org.apache.spark.{SparkConf, SparkContext}

object wordcount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("wordcount")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("D:/test_file/wc.txt",1)
    val words = lines.flatMap { line => line.split(" ")}
    val pairs = words.map{word => (word, 1)}
    val wordCounts = pairs.reduceByKey{_+_}
    wordCounts.foreach(wordCount => println(wordCount._1+" "+wordCount._2))
  }
}

2、在本地idea上直接右键运行,结果如下:


11、spark-streaming实现wordcount

1、wordcount实现代码:

package cn.spark.study.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
object WordCount {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("wordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("192.168.42.10", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    wordCounts.saveAsTextFiles("D:/stream_out", "txt")
    ssc.start()
    ssc.awaitTermination()
  }
}

2、安装nc工具:yum install nc
3、在192.168.42.10机器节点上开服务:
nc -l 9998
4、右键,点击Run运行代码
5、在nc服务上输入如下:



5、运行窗口打印结果如下:


12.streaming往hbase写入数据

streaming 代码:

package com.badou.streaming

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

object HbaseHandler {
  def insert(row: String, column: String, value: String) {
    // Hbase配置
    val tableName = "sparkstream_kafkahbase_table" // 定义表名
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "master,slave1,slave2")
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseConf.set("hbase.defaults.for.version.skip", "true")
    val hTable = new HTable(hbaseConf, tableName)
    val thePut = new Put(row.getBytes)
    thePut.add("info".getBytes,column.getBytes,value.getBytes)
    hTable.setAutoFlush(false, false)
    // 写入数据缓存
    hTable.setWriteBufferSize(3*1024*1024)
    hTable.put(thePut)
    // 提交
    hTable.flushCommits()
  }
}

object kafkaStreamHbase {
  def main(args: Array[String]) {

    val zkQuorum = "master:2181,slave1:2181,slave2:2181"
    val group = "group_1"
    val topics = "topic_1013"
    val numThreads = 1
    var output="hdfs://master:9000/stream_out/spark-log.txt"

    val sparkConf = new SparkConf().setAppName("kafkaStreamHbase").setMaster("local[2]")
    val ssc =  new StreamingContext(sparkConf, Seconds(10))
    ssc.checkpoint("hdfs://master:9000/hdfs_checkpoint")

    val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
    // lines.print()
    // lines.saveAsTextFiles("hdfs://master:9000/stream_out/sparkstreaming_hbasetest.log")
    val line = lines.flatMap(_.split("\n"))
    val words = line.map(_.split("\\|"))
    words.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        partitionOfRecords.foreach(pair => {
          val key = pair(0)
          val col = pair(1)
          val value = pair(2)
          println(key + "_" + col + " : " + value)
          HbaseHandler.insert(key, col, value)
        })
      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

补充
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _
  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

测试


结果

13.打通kafka-streaming-hbase

1.启动kafka:
]# ./bin/kafka-server-start.sh config/server.properties
2 查看topic list
./bin/kafka-topics.sh --list --zookeeper localhost:2181
3.streaming代码:

package com.badou.streaming

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

object HbaseHandler {
  def insert(row: String, column: String, value: String) {
    // Hbase配置
    val tableName = "sparkstream_kafkahbase_table" // 定义表名
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "master,slave1,slave2")
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseConf.set("hbase.defaults.for.version.skip", "true")
    val hTable = new HTable(hbaseConf, tableName)
    val thePut = new Put(row.getBytes)
    thePut.add("info".getBytes,column.getBytes,value.getBytes)
    hTable.setAutoFlush(false, false)
    // 写入数据缓存
    hTable.setWriteBufferSize(3*1024*1024)
    hTable.put(thePut)
    // 提交
    hTable.flushCommits()
  }
}

object kafkaStreamHbase {
  def main(args: Array[String]) {

    val zkQuorum = "master:2181,slave1:2181,slave2:2181"
    val group = "group_1"
    val topics = "topic_1013"
    val numThreads = 1
    var output="hdfs://master:9000/stream_out/spark-log.txt"

    val sparkConf = new SparkConf().setAppName("kafkaStreamHbase").setMaster("local[2]")
    val ssc =  new StreamingContext(sparkConf, Seconds(10))
    ssc.checkpoint("hdfs://master:9000/hdfs_checkpoint")

    val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
    // lines.print()
    // lines.saveAsTextFiles("hdfs://master:9000/stream_out/sparkstreaming_hbasetest.log")
    val line = lines.flatMap(_.split("\n"))
    val words = line.map(_.split("\\|"))
    words.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        partitionOfRecords.foreach(pair => {
          val key = pair(0)
          val col = pair(1)
          val value = pair(2)
          println(key + "_" + col + " : " + value)
          HbaseHandler.insert(key, col, value)
        })
      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

启动hbase





创建表


测试
启动producer
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_0309

idea运行代码

结果:

上一篇下一篇

猜你喜欢

热点阅读