Structured Streaming 官方示例运行及问题解决
2019-12-05 本文已影响0人
lei_charles
-
示例代码
import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SparkSession} /** * 监听网络端口发来的内容,然后进行 WordCount */ object StructuredStreamingDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) val conf = new SparkConf() .setIfMissing("spark.master", "local[4]") .setAppName("Structured Network Count") .set("fs.defaultFS","file://D:/temp/defaultFS/") // 创建程序入口 SparkSession,并引入 spark.implicits 来允许 Scalaobject 隐式转换为 DataFrame val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate() import spark.implicits._ // 第二步: 创建流。配置从 socket 读取流数据,地址和端口为 localhost: 9999 val lines: DataFrame = spark.readStream.format("socket") .option("host", "192.168.1.101") .option("port", "9999") .load() // 第三步: 进行单词统计。这里 lines 是 DataFrame ,使用 as[String]给它定义类型转换为 Dataset, 之后在 Dataset 里进行单词统计。 val words: Dataset[String] = lines.as[String].flatMap(_.split(" ")) val wordcount: DataFrame = words.groupBy("value").count() // 第四步: 创建查询句柄,定义打印结果方式并启动程序 这里使用 writeStream 方法, 输出模式为全部输出到控制台。 val query: StreamingQuery = wordcount.writeStream .outputMode(OutputMode.Complete) .format("console") .start() // 调用 awaitTermination 方法来防止程序在处理数据时停止 query.awaitTermination() } }
-
运行结果
... Connected to the target VM, address: '127.0.0.1:64497', transport: 'socket' Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 19/12/06 10:41:31 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery. ------------------------------------------- Batch: 0 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | dog| 3| | cat| 1| +-----+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | dog| 3| | cat| 2| | owl| 1| +-----+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | dog| 4| | cat| 2| | owl| 2| +-----+-----+ ...
-
遇到错误及解决
错误日志:
Connected to the target VM, address: '127.0.0.1:64189', transport: 'socket' Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 19/12/06 10:36:54 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery. Exception in thread "main" java.lang.IllegalArgumentException: Pathname /C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets from C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets is not a valid DFS filename. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:196) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:221) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) at com.cloudera.StructuredStreamingDemo$.main(StructuredStreamingDemo.scala:40) at com.cloudera.StructuredStreamingDemo.main(StructuredStreamingDemo.scala) Disconnected from the target VM, address: '127.0.0.1:64189', transport: 'socket' Process finished with exit code 1
解决办法:
- 去掉
core-site.xml
配置文件或注释掉该文件中的fs.defaultFS
配置<property> <name>fs.defaultFS</name> <value>hdfs://cdh01:8020</value> </property>
- 代码中添加
set("fs.defaultFS","file://D:/temp/defaultFS/")
- 去掉