spark Streaming代码
- \pom.xml
- \sparkStreaming\pom.xml
- \sparkStreaming\sparkstreaming_customerReceiver\pom.xml
- \sparkStreaming\sparkstreaming_customerReceiver\src\main\resources\log4j.properties
- \sparkStreaming\sparkstreaming_customerReceiver\src\main\scala\com\atguigu\streaming\CustomReceiver.scala
- \sparkStreaming\sparkstreaming_helloworld\pom.xml
- \sparkStreaming\sparkstreaming_helloworld\src\main\scala\com\atguigu\streaming\WorldCount.scala
- \sparkStreaming\sparkstreaming_kafka\pom.xml
- \sparkStreaming\sparkstreaming_kafka\src\main\scala\com\atguigu\streaming\KafkaStreaming.scala
- \sparkStreaming\sparkstreaming_kafka\src\main\scala\com\atguigu\streaming\PooledKafkaProducerAppFactory.scala
- \sparkStreaming\sparkstreaming_queueRdd\pom.xml
- \sparkStreaming\sparkstreaming_queueRdd\src\main\scala\com\atguigu\streaming\QueueRdd.scala
- \sparkStreaming\sparkstreaming_statefulWordCount\pom.xml
- \sparkStreaming\sparkstreaming_statefulWordCount\src\main\scala\com\atguigu\streaming\WorldCount.scala
- \sparkStreaming\sparkstreaming_windowWordCount\pom.xml
- \sparkStreaming\sparkstreaming_windowWordCount\src\main\resources\log4j.properties
- \sparkStreaming\sparkstreaming_windowWordCount\src\main\scala\com\atguigu\streaming\WorldCount.scala
\pom.xml[1]
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>spark</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>sparkCore</module>
<module>sparkStreaming</module>
<module>sparkSql</module>
<module>sparkGraphx</module>
<module>sparkMLlib</module>
</modules>
<properties>
<mysql.version>6.0.5</mysql.version>
<spring.version>4.3.6.RELEASE</spring.version>
<spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
<log4j.version>1.2.17</log4j.version>
<quartz.version>2.2.3</quartz.version>
<slf4j.version>1.7.22</slf4j.version>
<hibernate.version>5.2.6.Final</hibernate.version>
<camel.version>2.18.2</camel.version>
<config.version>1.10</config.version>
<jackson.version>2.8.6</jackson.version>
<servlet.version>3.0.1</servlet.version>
<net.sf.json.version>2.4</net.sf.json.version>
<activemq.version>5.14.3</activemq.version>
<spark.version>2.1.1</spark.version>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- Logging End -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
\sparkStreaming\pom.xml[2]
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spark</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<artifactId>sparkStreaming</artifactId>
<modules>
<module>sparkstreaming_helloworld</module>
<module>sparkstreaming_customerReceiver</module>
<module>sparkstreaming_queueRdd</module>
<module>sparkstreaming_kafka</module>
<module>sparkstreaming_statefulWordCount</module>
<module>sparkstreaming_windowWordCount</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
</project>
\sparkStreaming\sparkstreaming_customerReceiver\pom.xml[3]
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sparkStreaming</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sparkstreaming_customerReceiver</artifactId>
<dependencies>
</dependencies>
<build>
<finalName>customwordcount</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.streaming.CustomReceiver</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
\sparkStreaming\sparkstreaming_customerReceiver\src\main\resources\log4j.properties[4]
#
# Copyright (c) 2017. WuYufei All rights reserved.
#
log4j.rootLogger=warn,stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%t] %-c(line:%L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=spark.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%t] %-c(line:%L) : %m%n
\sparkStreaming\sparkstreaming_customerReceiver\src\main\scala\com\atguigu\streaming\CustomReceiver.scala[5]
package com.atguigu.streaming
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
/**
* Created by wuyufei on 06/09/2017.
*/
// String就是接收數據的類型
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
override def onStart(): Unit = {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
override def onStop(): Unit = {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
// 內部的函數,將數據存儲下倆
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
object CustomReceiver {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.receiverStream(new CustomReceiver("master01", 9999))
// Split each line into words
val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
//ssc.stop()
}
}
\sparkStreaming\sparkstreaming_helloworld\pom.xml[6]
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sparkStreaming</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sparkstreaming_helloworld</artifactId>
<dependencies>
</dependencies>
<build>
<finalName>networdcount</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.streaming.WorldCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
\sparkStreaming\sparkstreaming_helloworld\src\main\scala\com\atguigu\streaming\WorldCount.scala[7]
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by wuyufei on 06/09/2017.
*/
object WorldCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("zk1", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
//ssc.stop()
}
}
\sparkStreaming\sparkstreaming_kafka\pom.xml[8]
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sparkStreaming</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sparkstreaming_kafka</artifactId>
<dependencies>
<!-- 提供对象连接池的一种方式 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<!-- 用来连接Kafka的工具类 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<finalName>kafkastreaming</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.streaming.KafkaStreaming</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
\sparkStreaming\sparkstreaming_kafka\src\main\scala\com\atguigu\streaming\KafkaStreaming.scala[9]
package com.atguigu.streaming
import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.api.java.function.VoidFunction
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by wuyufei on 06/09/2017.
*/
//单例对象
object createKafkaProducerPool{
//用于返回真正的对象池GenericObjectPool
def apply(brokerList: String, topic: String): GenericObjectPool[KafkaProducerProxy] = {
val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic))
val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory)
//指定了你的kafka对象池的大小
val poolConfig = {
val c = new GenericObjectPoolConfig
val maxNumProducers = 10
c.setMaxTotal(maxNumProducers)
c.setMaxIdle(maxNumProducers)
c
}
//返回一个对象池
new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig)
}
}
object KafkaStreaming{
def main(args: Array[String]) {
//设置sparkconf
val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
//新建了streamingContext
val ssc = new StreamingContext(conf, Seconds(1))
//kafka的地址
val brobrokers = "192.168.56.150:9092,192.168.56.151:9092,192.168.56.152:9092"
//kafka的队列名称
val sourcetopic="source1";
//kafka的队列名称
val targettopic="target1";
//创建消费者组名
var group="con-consumer-group"
//kafka消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> brobrokers,//用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> group,
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交
"enable.auto.commit" -> (false: java.lang.Boolean),
//ConsumerConfig.GROUP_ID_CONFIG
);
//创建DStream,返回接收到的输入数据
var stream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(sourcetopic),kafkaParam))
//每一个stream都是一个ConsumerRecord
stream.map(s =>("id:" + s.key(),">>>>:"+s.value())).foreachRDD(rdd => {
//对于RDD的每一个分区执行一个操作
rdd.foreachPartition(partitionOfRecords => {
// kafka连接池。
val pool = createKafkaProducerPool(brobrokers, targettopic)
//从连接池里面取出了一个Kafka的连接
val p = pool.borrowObject()
//发送当前分区里面每一个数据
partitionOfRecords.foreach {message => System.out.println(message._2);p.send(message._2,Option(targettopic))}
// 使用完了需要将kafka还回去
pool.returnObject(p)
})
//更新offset信息
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
})
ssc.start()
ssc.awaitTermination()
}
}
\sparkStreaming\sparkstreaming_kafka\src\main\scala\com\atguigu\streaming\PooledKafkaProducerAppFactory.scala[10]
package com.atguigu.streaming
import java.util.Properties
import org.apache.commons.pool2.impl.DefaultPooledObject
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
/**
* Created by wuyufei on 06/09/2017.
*/
case class KafkaProducerProxy(brokerList: String,
producerConfig: Properties = new Properties,
defaultTopic: Option[String] = None,
producer: Option[KafkaProducer[String, String]] = None) {
type Key = String
type Val = String
require(brokerList == null || !brokerList.isEmpty, "Must set broker list")
private val p = producer getOrElse {
var props:Properties= new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
new KafkaProducer[String,String](props)
}
//把我的消息包装成了ProducerRecord
private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {
val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))
require(!t.isEmpty, "Topic must not be empty")
key match {
case Some(k) => new ProducerRecord(t, k, value)
case _ => new ProducerRecord(t, value)
}
}
def send(key: Key, value: Val, topic: Option[String] = None) {
//调用KafkaProducer他的send方法发送消息
p.send(toMessage(value, Option(key), topic))
}
def send(value: Val, topic: Option[String]) {
send(null, value, topic)
}
def send(value: Val, topic: String) {
send(null, value, Option(topic))
}
def send(value: Val) {
send(null, value, None)
}
def shutdown(): Unit = p.close()
}
abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable {
def newInstance(): KafkaProducerProxy
}
class BaseKafkaProducerFactory(brokerList: String,
config: Properties = new Properties,
defaultTopic: Option[String] = None)
extends KafkaProducerFactory(brokerList, config, defaultTopic) {
override def newInstance() = new KafkaProducerProxy(brokerList, config, defaultTopic)
}
// 继承一个基础的连接池,需要提供池化的对象类型
class PooledKafkaProducerAppFactory(val factory: KafkaProducerFactory)
extends BasePooledObjectFactory[KafkaProducerProxy] with Serializable {
// 用于池来创建对象
override def create(): KafkaProducerProxy = factory.newInstance()
// 用于池来包装对象
override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] = new DefaultPooledObject(obj)
// 用于池来销毁对象
override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = {
p.getObject.shutdown()
super.destroyObject(p)
}
}
\sparkStreaming\sparkstreaming_queueRdd\pom.xml[11]
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sparkStreaming</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sparkstreaming_queueRdd</artifactId>
<dependencies>
</dependencies>
<build>
<finalName>queueRdd</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.streaming.QueueRdd</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
\sparkStreaming\sparkstreaming_queueRdd\src\main\scala\com\atguigu\streaming\QueueRdd.scala[12]
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object QueueRdd {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("QueueRdd")
val ssc = new StreamingContext(conf, Seconds(1))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
//创建RDD队列
val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
// 创建QueueInputDStream
val inputStream = ssc.queueStream(rddQueue)
//处理队列中的RDD数据
val mappedStream = inputStream.map(x => (x % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//打印结果
reducedStream.print()
//启动计算
ssc.start()
// Create and push some RDDs into
for (i <- 1 to 30) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
//通过程序停止StreamingContext的运行
//ssc.stop()
}
ssc.awaitTermination()
}
}
\sparkStreaming\sparkstreaming_statefulWordCount\pom.xml[13]
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sparkStreaming</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sparkstreaming_statefulWordCount</artifactId>
<dependencies>
</dependencies>
<build>
<finalName>statefulwordcount</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.streaming.WorldCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
\sparkStreaming\sparkstreaming_statefulWordCount\src\main\scala\com\atguigu\streaming\WorldCount.scala[14]
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by wuyufei on 06/09/2017.
*/
object WorldCount {
def main(args: Array[String]) {
// 需要创建一个SparkConf
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
// 需要创建一个StreamingContext
val ssc = new StreamingContext(conf, Seconds(3))
// 需要设置一个checkpoint的目录。
ssc.checkpoint(".")
// 通过StreamingContext来获取master01机器上9999端口传过来的语句
val lines = ssc.socketTextStream("master01", 9999)
// 需要通过空格将语句中的单词进行分割DStream[RDD[String]]
val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// 需要将每一个单词都映射成为一个元组(word,1)
val pairs = words.map(word => (word, 1))
// 定义一个更新方法,values是当前批次RDD中相同key的value集合,state是框架提供的上次state的值
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
// 计算当前批次相同key的单词总数
val currentCount = values.foldLeft(0)(_ + _)
// 获取上一次保存的单词总数
val previousCount = state.getOrElse(0)
// 返回新的单词总数
Some(currentCount + previousCount)
}
// 使用updateStateByKey方法,类型参数是状态的类型,后面传入一个更新方法。
val stateDstream = pairs.updateStateByKey[Int](updateFunc)
//输出
stateDstream.print()
stateDstream.saveAsTextFiles("hdfs://master01:9000/statful/","abc")
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
//ssc.stop()
}
}
\sparkStreaming\sparkstreaming_windowWordCount\pom.xml[15]
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sparkStreaming</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sparkstreaming_windowWordCount</artifactId>
<dependencies>
</dependencies>
<build>
<finalName>statefulwordcount</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.streaming.WorldCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
\sparkStreaming\sparkstreaming_windowWordCount\src\main\resources\log4j.properties[16]
#
# Copyright (c) 2017. WuYufei All rights reserved.
#
log4j.rootLogger=warn,stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%t] %-c(line:%L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=spark.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%t] %-c(line:%L) : %m%n
\sparkStreaming\sparkstreaming_windowWordCount\src\main\scala\com\atguigu\streaming\WorldCount.scala[17]
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by wuyufei on 06/09/2017.
*/
object WorldCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
// batchInterval = 3s
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("./checkpoint")
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("master01", 9000)
// Split each line into words
val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
//val wordCounts = pairs.reduceByKey((a:Int,b:Int) => (a + b))
// 窗口大小 为12s, 12/3 = 4 滑动步长 6S, 6/3 =2
//val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6))
val wordCounts2 = pairs.reduceByKeyAndWindow(_ + _,_ - _ ,Seconds(12), Seconds(6))
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts2.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
//ssc.stop()
}
}
-
\pom.xml ↩
-
\sparkStreaming\pom.xml ↩
-
\sparkStreaming\sparkstreaming_customerReceiver\pom.xml ↩
-
\sparkStreaming\sparkstreaming_customerReceiver\src\main\resources\log4j.properties ↩
-
\sparkStreaming\sparkstreaming_customerReceiver\src\main\scala\com\atguigu\streaming\CustomReceiver.scala ↩
-
\sparkStreaming\sparkstreaming_helloworld\pom.xml ↩
-
\sparkStreaming\sparkstreaming_helloworld\src\main\scala\com\atguigu\streaming\WorldCount.scala ↩
-
\sparkStreaming\sparkstreaming_kafka\pom.xml ↩
-
\sparkStreaming\sparkstreaming_kafka\src\main\scala\com\atguigu\streaming\KafkaStreaming.scala ↩
-
\sparkStreaming\sparkstreaming_kafka\src\main\scala\com\atguigu\streaming\PooledKafkaProducerAppFactory.scala ↩
-
\sparkStreaming\sparkstreaming_queueRdd\pom.xml ↩
-
\sparkStreaming\sparkstreaming_queueRdd\src\main\scala\com\atguigu\streaming\QueueRdd.scala ↩
-
\sparkStreaming\sparkstreaming_statefulWordCount\pom.xml ↩
-
\sparkStreaming\sparkstreaming_statefulWordCount\src\main\scala\com\atguigu\streaming\WorldCount.scala ↩
-
\sparkStreaming\sparkstreaming_windowWordCount\pom.xml ↩
-
\sparkStreaming\sparkstreaming_windowWordCount\src\main\resources\log4j.properties ↩
-
\sparkStreaming\sparkstreaming_windowWordCount\src\main\scala\com\atguigu\streaming\WorldCount.scala ↩