205、Spark 2.0之Structured Streami

2019-02-12  本文已影响0人  ZFH__ZJ

output操作

定义好了各种计算操作之后,就需要启动这个应用。此时就需要使用DataStreamWriter,通过spark.writeStream()方法返回。此时需要指定以下一些信息:
output sink的一些细节:数据格式、位置等。
output mode:以哪种方式将result table的数据写入sink。
query name:指定查询的标识。
trigger interval:如果不指定,那么默认就会尽可能快速地处理数据,只要之前的数据处理完,就会立即处理下一条数据。如果上一个数据还没处理完,而这一个trigger也错过了,那么会一起放入下一个trigger再处理。
checkpoint地址:对于某些sink,可以做到一次且仅一次的语义,此时需要指定一个目录,进而可以将一些元信息写入其中。一般会是类似hdfs上的容错目录。

output mode

目前仅仅支持两种output mode
append mode:仅适用于不包含聚合操作的查询。
complete mode:仅适用于包含聚合操作的查询。

output sink

目前有一些内置支持的sink
file sink:在spark 2.0中,仅仅支持parquet文件,以及append模式
foreach sink
console sink:仅供调试
memory sink:仅供调试


output sink.png

代码

val noAggDF = deviceDataDf.select("device").where("signal > 10")   

noAggDF
   .writeStream
   .format("console")
   .start()

noAggDF
   .writeStream
   .parquet("path/to/destination/directory")
   .start()
   
val aggDF = df.groupBy(“device”).count()

aggDF
   .writeStream
   .outputMode("complete")
   .format("console")
   .start()

aggDF
   .writeStream
   .queryName("aggregates")    // this query name will be the table name
   .outputMode("complete")
   .format("memory")
   .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table

foreach sink详解

使用foreach sink时,我们需要自定义ForeachWriter,并且自定义处理每条数据的业务逻辑。每次trigger发生后,根据output mode需要写入sink的数据,就会传递给ForeachWriter来进行处理。使用如下方式来定义ForeachWriter:

datasetOfString.write.foreach(new ForeachWriter[String] {
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
  }
  def process(record: String) = {
    // write string to connection
  }
  def close(errorOrNull: Throwable): Unit = {
    // close the connection
  }
})

需要有如下一些注意点:

  1. ForeachWriter必须支持序列化,因为该对象会被序列化后发送到executor上去执行。
  2. open、process和close这三个方法都会给executor调用。
  3. ForeachWriter所有的初始化方法,必须创建数据库连接,开启一个事务,打开一个IO流等等,都必须在open方法中完成。必须注意,如果在ForeachWriter的构造函数中进行初始化,那么这些操作都是在driver上发生的。
  4. open中有两个参数,version和partition,可以唯一标识一批需要处理的数据。每次发生一次trigger,version就会自增长一次。partition是要处理的结果数据的分区号。因为output操作是分布式执行的,会分布在多个executor上并行执行。
  5. open可以使用version和partition来决定,是否要处理这一批数据。此时可以选择返回true或false。如果返回false,那么process不会被调用。举个例子来说,有些partition的数据可能已经被持久化了,而另外一些partiton的处理操作由于失败被重试,此时之前已经被持久化的数据可以不再次进行持久化,避免重复计算。
  6. close方法中,需要处理一些异常,以及一些资源的释放。
上一篇下一篇

猜你喜欢

热点阅读