Spark从入门到精通36:Spark Streaming:集成
1、Streaming定位
是Spark体系内的流式处理框架(和Storm对比)
2、和Storm对比
Storm:数据像水流一样,最基本的单位是tuple——毫秒级
Streaming:把水状的数据,按照时间进行离散化处理——秒级
3、和Spark Core对比
1.关系:
Spark Core是核心的计算引擎,支撑了很多项目,Streaming是其中一个
2.算子:
Core中,算子有两类:transformation和action ——> 懒惰机制
Streaming,算子有两类:transformation和output ——> 懒惰机制
3.开发形式:
Core:针对RDD开发,处理结构以DAG形式表现
Streaming:针对Dstream开发,处理结构以DstreamGraph形式表现
Dstream和RDD的关系:
1)Dstream:内部包含多个RDD,代表了一系列的连续的RDD,每一个RDD包含特定的时间间隔
2)DStream里面的各种操作是可以映射到内部的RDD上进行的
3)DStream的操作可以通过RDD的transformation生成新的Dstream
4)DStream的算子和RDD的算子不一样:
1)transformation
2)output:
执行算子:forEachRDD:对接外部服务:Hbase、Kafka、Hive
输出算子:saveAsTextFile:直接做输出
5、时间窗口
例如:统计最近一个小时内的PV量,要求每一分钟更新一次
1)窗口总长度(window length)—— 一个小时
2)滑动时间间隔(slide interval)—— 一分钟
6、Streaming架构:
master:分配任务:任务结构(Graph)
worker:处理任务,包含:接收数据+处理数据
client:喂数据
处理数据模式,2类:
1)recevier模式:被动——异步
优点:快
缺点:启动多executor
2)direct模式:主动——同步
优点:一个executor占用资源少
缺点:慢
架构图:
D-Stream lineage:DstreamGraph图
worker:Input receiver:对接client作数据接收
worker: Task excution:对数据做进一步计算
worker: Block manager、Comm Manager:数据管理模块
7、容错——数据容错WAL
streaming中wal工作原理:
当Driver失败重启后,恢复的流程:
8、streaming+kafka数据挤压,下游处理速度慢的问题解决
1)数据分布,调节offset(紧急情况)
2)并发调大(executor并发度,core数目等),需要kafka配合(增加分区数),提高线程数量
3)控制批次的规模——max.poll.records
4)控制数据处理超时时间(timeout)——max.poll.interval.ms
9、spark-core实现wordcount(本地提交方式)
1、wordcount实现代码:
package cn.spark.study.core
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCount {
def main(args: Array[String]){
val conf = new SparkConf()
.setAppName("WordCount");
val sc = new SparkContext(conf)
val lines = sc.textFile("file:///usr/local/spark-study/wc.txt",1)
val words = lines.flatMap { line => line.split(" ")}
val pairs = words.map{word => (word, 1)}
val wordCounts = pairs.reduceByKey{_+_}
wordCounts.foreach(wordCount =>println(wordCount._1+" appeared "+wordCount._2+" times."))
}
}
2、打wordcount.jar包,放入/usr/local/spark-study目录下
3、在/usr/local/spark-study目录下新建文件wc.txt,内容例如:hello word
4、执行脚本:
/usr/local/src/spark-2.1.0-bin-hadoop2.7/bin/spark-submit \
--class cn.spark.study.core.WordCount \
--master local[1] \
/usr/local/spark-study/wordcount.jar
5、运行结果
hello 1
word 1
10、spark-core实现wordcount(本地运行方式)
1、代码编写
import org.apache.spark.{SparkConf, SparkContext}
object wordcount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("wordcount")
val sc = new SparkContext(conf)
val lines = sc.textFile("D:/test_file/wc.txt",1)
val words = lines.flatMap { line => line.split(" ")}
val pairs = words.map{word => (word, 1)}
val wordCounts = pairs.reduceByKey{_+_}
wordCounts.foreach(wordCount => println(wordCount._1+" "+wordCount._2))
}
}
2、在本地idea上直接右键运行,结果如下:
11、spark-streaming实现wordcount
1、wordcount实现代码:
package cn.spark.study.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
object WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("wordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("192.168.42.10", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
wordCounts.saveAsTextFiles("D:/stream_out", "txt")
ssc.start()
ssc.awaitTermination()
}
}
2、安装nc工具:yum install nc
3、在192.168.42.10机器节点上开服务:
nc -l 9998
4、右键,点击Run运行代码
5、在nc服务上输入如下:
5、运行窗口打印结果如下:
12.streaming往hbase写入数据
streaming 代码:
package com.badou.streaming
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
object HbaseHandler {
def insert(row: String, column: String, value: String) {
// Hbase配置
val tableName = "sparkstream_kafkahbase_table" // 定义表名
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "master,slave1,slave2")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set("hbase.defaults.for.version.skip", "true")
val hTable = new HTable(hbaseConf, tableName)
val thePut = new Put(row.getBytes)
thePut.add("info".getBytes,column.getBytes,value.getBytes)
hTable.setAutoFlush(false, false)
// 写入数据缓存
hTable.setWriteBufferSize(3*1024*1024)
hTable.put(thePut)
// 提交
hTable.flushCommits()
}
}
object kafkaStreamHbase {
def main(args: Array[String]) {
val zkQuorum = "master:2181,slave1:2181,slave2:2181"
val group = "group_1"
val topics = "topic_1013"
val numThreads = 1
var output="hdfs://master:9000/stream_out/spark-log.txt"
val sparkConf = new SparkConf().setAppName("kafkaStreamHbase").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("hdfs://master:9000/hdfs_checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
// lines.print()
// lines.saveAsTextFiles("hdfs://master:9000/stream_out/sparkstreaming_hbasetest.log")
val line = lines.flatMap(_.split("\n"))
val words = line.map(_.split("\\|"))
words.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val key = pair(0)
val col = pair(1)
val value = pair(2)
println(key + "_" + col + " : " + value)
HbaseHandler.insert(key, col, value)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}
补充
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
测试
结果
13.打通kafka-streaming-hbase
1.启动kafka:
]# ./bin/kafka-server-start.sh config/server.properties
2 查看topic list
./bin/kafka-topics.sh --list --zookeeper localhost:2181
3.streaming代码:
package com.badou.streaming
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
object HbaseHandler {
def insert(row: String, column: String, value: String) {
// Hbase配置
val tableName = "sparkstream_kafkahbase_table" // 定义表名
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "master,slave1,slave2")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set("hbase.defaults.for.version.skip", "true")
val hTable = new HTable(hbaseConf, tableName)
val thePut = new Put(row.getBytes)
thePut.add("info".getBytes,column.getBytes,value.getBytes)
hTable.setAutoFlush(false, false)
// 写入数据缓存
hTable.setWriteBufferSize(3*1024*1024)
hTable.put(thePut)
// 提交
hTable.flushCommits()
}
}
object kafkaStreamHbase {
def main(args: Array[String]) {
val zkQuorum = "master:2181,slave1:2181,slave2:2181"
val group = "group_1"
val topics = "topic_1013"
val numThreads = 1
var output="hdfs://master:9000/stream_out/spark-log.txt"
val sparkConf = new SparkConf().setAppName("kafkaStreamHbase").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("hdfs://master:9000/hdfs_checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
// lines.print()
// lines.saveAsTextFiles("hdfs://master:9000/stream_out/sparkstreaming_hbasetest.log")
val line = lines.flatMap(_.split("\n"))
val words = line.map(_.split("\\|"))
words.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val key = pair(0)
val col = pair(1)
val value = pair(2)
println(key + "_" + col + " : " + value)
HbaseHandler.insert(key, col, value)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}
启动hbase
创建表
测试
启动producer
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_0309
idea运行代码
结果: