Structured Streaming 官方示例运行及问题解决

2019-12-05  本文已影响0人  lei_charles
  1. 示例代码
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * 监听网络端口发来的内容,然后进行 WordCount
      */
    object StructuredStreamingDemo {
    
      def main(args: Array[String]): Unit = {
    
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        val conf = new SparkConf()
          .setIfMissing("spark.master", "local[4]")
          .setAppName("Structured Network Count")
          .set("fs.defaultFS","file://D:/temp/defaultFS/")
    
        // 创建程序入口 SparkSession,并引入 spark.implicits 来允许 Scalaobject 隐式转换为 DataFrame
        val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate()
        import spark.implicits._
        
        // 第二步: 创建流。配置从 socket 读取流数据,地址和端口为 localhost: 9999
        val lines: DataFrame = spark.readStream.format("socket")
        .option("host", "192.168.1.101")
        .option("port", "9999")
        .load()
    
        // 第三步: 进行单词统计。这里 lines 是 DataFrame ,使用 as[String]给它定义类型转换为 Dataset, 之后在 Dataset 里进行单词统计。
        val words: Dataset[String] = lines.as[String].flatMap(_.split(" "))
        val wordcount: DataFrame = words.groupBy("value").count()
    
        // 第四步: 创建查询句柄,定义打印结果方式并启动程序 这里使用 writeStream 方法, 输出模式为全部输出到控制台。
        val query: StreamingQuery = wordcount.writeStream
          .outputMode(OutputMode.Complete)
          .format("console")
          .start()
        // 调用 awaitTermination 方法来防止程序在处理数据时停止
        query.awaitTermination()
      }
    }
    
  2. 运行结果
    ...
    Connected to the target VM, address: '127.0.0.1:64497', transport: 'socket'
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    19/12/06 10:41:31 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +-----+-----+
    |value|count|
    +-----+-----+
    |  dog|    3|
    |  cat|    1|
    +-----+-----+
    
    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +-----+-----+
    |value|count|
    +-----+-----+
    |  dog|    3|
    |  cat|    2|
    |  owl|    1|
    +-----+-----+
    
    -------------------------------------------
    Batch: 2
    -------------------------------------------
    +-----+-----+
    |value|count|
    +-----+-----+
    |  dog|    4|
    |  cat|    2|
    |  owl|    2|
    +-----+-----+
    ...
    
  3. 遇到错误及解决

    错误日志:

    Connected to the target VM, address: '127.0.0.1:64189', transport: 'socket'
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    19/12/06 10:36:54 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
    Exception in thread "main" java.lang.IllegalArgumentException: Pathname /C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets from C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets is not a valid DFS filename.
     at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:196)
     at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
     at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
     at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
     at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:221)
     at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
     at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
     at com.cloudera.StructuredStreamingDemo$.main(StructuredStreamingDemo.scala:40)
     at com.cloudera.StructuredStreamingDemo.main(StructuredStreamingDemo.scala)
    Disconnected from the target VM, address: '127.0.0.1:64189', transport: 'socket'
    
    Process finished with exit code 1
    

    解决办法:

    1. 去掉 core-site.xml 配置文件或注释掉该文件中的 fs.defaultFS 配置
      <property>
        <name>fs.defaultFS</name>
        <value>hdfs://cdh01:8020</value>
      </property>
      
    2. 代码中添加 set("fs.defaultFS","file://D:/temp/defaultFS/")
上一篇下一篇

猜你喜欢

热点阅读