203、Spark 2.0之Structured Streami

2019-02-12  本文已影响0人  ZFH__ZJ

创建流式的dataset和dataframe

流式dataframe可以通过DataStreamReader接口来创建,DataStreamReader对象是通过SparkSession的readStream()方法返回的。与创建静态dataframe的read()方法类似,我们可以指定数据源的一些配置信息,比如data format、schema、option等。spark 2.0中初步提供了一些内置的source支持。

  1. file source
    以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。文件必须是被移动到目录中的,比如用mv命令。
  2. socket source
    从socket连接中读取文本内容。driver是负责监听请求的server socket。socket source只能被用来进行测试。

代码

val socketDF = spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()

socketDF.isStreaming    
socketDF.printSchema 

val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
    .readStream
    .option("sep", ";")
    .schema(userSchema)      
    .csv("/path/to/directory")    

上面的例子都是产生untyped类型的dataframe,这就意味着在编译时是无法检查其schema的,只有在计算被提交并运行时才会进行检查。一些操作,比如map、flatMap等,需要在编译时就知道具体的类型。为了使用一些typed类型的操作,我们可以将dataframe转换为typed类型的dataset,比如df.as[String]。

上一篇下一篇

猜你喜欢

热点阅读