204、Spark 2.0之Structured Streami

2019-02-12  本文已影响0人  ZFH__ZJ

基础操作:选择、映射、聚合

我们可以对流式dataset/dataframe执行所有类型的操作,包括untyped操作,SQL类操作,typed操作。

case class DeviceData(device: String, type: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("type").count()                          // using untyped API

// Running average signal for each device type
Import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(_.type).agg(typed.avg(_.signal))    // using typed API

滑动窗口:基于event-time

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
structured-streaming-window.png

对延迟数据是天然支持的


structured-streaming-late-data.png

join操作

structured streaming,支持将一个流式dataset与一个静态dataset进行join。

val staticDf = spark.read. ...
val streamingDf = spark.readStream. ... 

streamingDf.join(staticDf, “type”)          // inner equi-join with a static DF
streamingDf.join(staticDf, “type”, “right_join”)  // right outer join with a static DF

不支持的操作

  1. streaming dataframe的chain aggregation
  2. limit and take
  3. distinct
  4. sort
    仅在聚合过后,同时使用complete output mode时可用
  5. streaming dataframe和static dataframe的outer join
    full outer join是不支持的
    streaming dataframe在左侧时,left outer join是不支持的
    streaming dataframe在右侧时,right outer join是不支持的
  6. 两个streaming dataframe的join是不支持的
  7. count()
    只能groupBy().count()
  8. foreach()
    只能df.writeStream.foreach()
  9. show()
    只能console output sink
上一篇下一篇

猜你喜欢

热点阅读