structured streaming window官方例子
窗口模式的聚合操作只能用complete模式
package com.ky.service
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
/**
- @Author: xwj
- @Date: 2019/2/2 0002 10:04
- @Version 1.0
*/
object WindowExample {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
" <window duration in seconds> [<slide duration in seconds>]")
System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val windowSize = args(2).toInt
val slideSize = if (args.length == 3) windowSize else args(3).toInt
if (slideSize > windowSize) {
System.err.println("<slide duration> must be less than or equal to <window duration>")
}
val windowDuration = s"$windowSize seconds"
val slideDuration = s"$slideSize seconds"
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCountWindowed")
.master("local")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", value = true) //输出内容包括时间戳
.load()
// Split the lines into words, retaining timestamps
val words = lines.as[(String, Timestamp)].flatMap(line =>
line._1.split(",").map(word => (word, line._2))
).toDF("word", "timestamp")
// Group the data by window and word and compute the count of each group
//设置窗口大小和滑动窗口步长
val windowedCounts = words.groupBy(
window($"timestamp", windowDuration, slideDuration), $"word"
).count().orderBy("window")
// Start running the query that prints the windowed word counts to the console
//由于采用聚合操作,所以需要指定"complete"输出形式。指定"truncate"只是为了在控制台输出时,不进行列宽度自动缩小。
val query = windowedCounts.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start()
query.awaitTermination()
}
}
非聚合操作只能用appen模式
package com.ky.service
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
- @Author: xwj
- @Date: 2019/2/2 0002 10:51
- @Version 1.0
*/
object StructuredNetworkWordCountWindowed {
def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
" <window duration in seconds> [<slide duration in seconds>]")
System.exit(1)
}
val host = args(0)
val port = args(1)
val windowSize = args(2).toInt
val slideSize = if (args.length == 3) windowSize else args(3).toInt
val triggerTime = args(4).toInt
if (slideSize > windowSize) {
System.err.println("<slide duration> must be less than or equal to <window duration>")
}
val windowDuration = s"slideSize seconds"
val spark = SparkSession
.builder()
.appName(s"${this.getClass.getSimpleName}")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel(logLevel = "error")
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", value = true)
.load()
val wordCounts: DataFrame = lines.select(window($"timestamp", windowDuration, slideDuration), $"value")
//非聚合操作是指接收到的数据DataFrame进行select等操作,其操作的特征是返回Dataset类型的数据。若Structured Streaming进行非聚合操作,那么输出形式必须为"append",否则程序会出现异常
val query = wordCounts.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime(s"$triggerTime seconds"))
.option("truncate", "false")
.start()
query.awaitTermination()
}
}