structured streaming window官方例子

2019-02-02  本文已影响0人  会飞的蜗牛66666

窗口模式的聚合操作只能用complete模式
package com.ky.service

import java.sql.Timestamp

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

/**

def main(args: Array[String]) {

if (args.length < 3) {

  System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +

    " <window duration in seconds> [<slide duration in seconds>]")

  System.exit(1)

}

val host = args(0)

val port = args(1).toInt

val windowSize = args(2).toInt

val slideSize = if (args.length == 3) windowSize else args(3).toInt

if (slideSize > windowSize) {

  System.err.println("<slide duration> must be less than or equal to <window duration>")

}

val windowDuration = s"$windowSize seconds"

val slideDuration = s"$slideSize seconds"

val spark = SparkSession

  .builder

  .appName("StructuredNetworkWordCountWindowed")

  .master("local")

  .getOrCreate()

import spark.implicits._

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

  .format("socket")

  .option("host", host)

  .option("port", port)

  .option("includeTimestamp", value = true) //输出内容包括时间戳

  .load()



// Split the lines into words, retaining timestamps

val words = lines.as[(String, Timestamp)].flatMap(line =>

  line._1.split(",").map(word => (word, line._2))

).toDF("word", "timestamp")

// Group the data by window and word and compute the count of each group

//设置窗口大小和滑动窗口步长

val windowedCounts = words.groupBy(

  window($"timestamp", windowDuration, slideDuration), $"word"

).count().orderBy("window")

// Start running the query that prints the windowed word counts to the console

//由于采用聚合操作,所以需要指定"complete"输出形式。指定"truncate"只是为了在控制台输出时,不进行列宽度自动缩小。

val query = windowedCounts.writeStream

  .outputMode("complete")

  .format("console")

  .option("truncate", "false")

  .start()

query.awaitTermination()

}
}

非聚合操作只能用appen模式
package com.ky.service

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}

/**

def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
" <window duration in seconds> [<slide duration in seconds>]")
System.exit(1)
}
val host = args(0)
val port = args(1)
val windowSize = args(2).toInt
val slideSize = if (args.length == 3) windowSize else args(3).toInt
val triggerTime = args(4).toInt
if (slideSize > windowSize) {
System.err.println("<slide duration> must be less than or equal to <window duration>")
}
val windowDuration = s"windowSize seconds" val slideDuration = s"slideSize seconds"

val spark = SparkSession
  .builder()
  .appName(s"${this.getClass.getSimpleName}")
  .master("local")
  .getOrCreate()
spark.sparkContext.setLogLevel(logLevel = "error")

import spark.implicits._

val lines = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", value = true)
  .load()

val wordCounts: DataFrame = lines.select(window($"timestamp", windowDuration, slideDuration), $"value")

//非聚合操作是指接收到的数据DataFrame进行select等操作,其操作的特征是返回Dataset类型的数据。若Structured Streaming进行非聚合操作,那么输出形式必须为"append",否则程序会出现异常
val query = wordCounts.writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime(s"$triggerTime seconds"))
  .option("truncate", "false")
  .start()
query.awaitTermination()

}
}

上一篇 下一篇

猜你喜欢

热点阅读