Structured Streaming同一个进程支持多维度的统

2018-07-04  本文已影响56人  SunnyMore

Unsupported Operations

There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

If you try any of these operations, you will see an AnalysisException like “operation XYZ is not supported with streaming DataFrames/Datasets”. While some of them may be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting on the input stream is not supported, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently.

由于Structured Streaming不支持两个流之间的join,但是我们在流计算的业务场景中经常需要,既要按天统计,也需要按小时来统计,并同时输出的场景;但是Structured Streaming 可以支持同一个流分成两个相同的流去分组聚合,跑两个query;代码如下:

package com.spark.sunny.structuredstreaming

import com.spark.sunny.util.UdfUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoders

case class DeviceData(deviceId : String, usage : String,duration : String,eventBeginTime : String, serviceType : String)

/**
  * <Description> <br>
  *
  * @author Sunny<br>
  * @taskId: <br>
  * @version 1.0<br>
  * @createDate 2018/06/23 11:01 <br>
  * @see com.spark.sunny.structuredstreaming <br>
  */
object StructuredHdfsDevice {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("StructuredHdfsDevice")
      .master("local")
      .getOrCreate()
    val schema = Encoders.product[DeviceData].schema
    val lines =  spark.readStream
      .format("json")
      .schema(schema)
      .load("C:\\Users\\yaj\\Desktop\\dashboard\\test")
    import spark.implicits._
    val beginTimeDevice = lines
      .withColumn("eventBeginTime", UdfUtil.fmtTimestampUdf($"eventBeginTime", lit("yyyyMMddHHmmss")))
      .withColumn("eventBeginHour", substring($"eventBeginTime", 0, 10))
      .withColumn("eventBeginDay", substring($"eventBeginTime", 0, 8))

    val hourDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")

    val queryHour = hourDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    val dayDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")

    val queryDay = dayDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    queryHour.awaitTermination()
    queryDay.awaitTermination()
  }
}

需要注意的是:不能使用Query start之后,马上去调用awaitTermination,因为之会阻塞第二个分支Query的执行,而应该在所有的Query执行完start之后,使用sparkSession.streams.awaitAnyTermination(),只有这样才能确保两个分支Query都能启动。

上一篇 下一篇

猜你喜欢

热点阅读