(二)SparkStreaming 通过文件系统创建DStrea

2018-11-13  本文已影响0人  白面葫芦娃92

1.启动spark-shell

[hadoop@hadoop001 bin]$ ./spark-shell --master local[2]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop001:4040
Spark context available as 'sc' (master = local[2], app id = local-1538087287382).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val ssc = new StreamingContext(sc,Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@734cf881

scala> val lines = ssc.textFileStream("/streaming/input/")
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@4c4215d7

scala> val words = lines.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_)
words: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@4e1104f4

scala> words.print()

scala> ssc.start()

scala> ssc.awaitTermination()
-------------------------------------------                                     
Time: 1538087585000 ms
-------------------------------------------

-------------------------------------------
Time: 1538087590000 ms
-------------------------------------------

-------------------------------------------
Time: 1538087595000 ms
-------------------------------------------

-------------------------------------------
Time: 1538087600000 ms
-------------------------------------------

2.put文件至hdfs下的路径/streaming/input/

[hadoop@hadoop001 data]$ hadoop fs -ls /streaming/input/
Found 4 items
-rw-r--r--   1 hadoop supergroup         44 2018-09-28 06:24 /streaming/input/1.txt
-rw-r--r--   1 hadoop supergroup         44 2018-09-28 06:25 /streaming/input/2.txt
-rw-r--r--   1 hadoop supergroup         44 2018-09-28 06:25 /streaming/input/3.txt
[hadoop@hadoop001 data]$ hadoop fs -put ruozeinput.txt /streaming/input/10.txt

3.streaming输出处理结果

-------------------------------------------
Time: 1538087645000 ms
-------------------------------------------

-------------------------------------------                                     
Time: 1538087650000 ms
-------------------------------------------
(hello,4)
(welcome,1)
(world,2)

-------------------------------------------
Time: 1538087655000 ms
-------------------------------------------

sparkstreaming只能读取在它启动之后,写入hdfs的文件,原来已有的1.txt,2.txt,3.txt无法读取

查看UI界面,发现没有receiver

注意:All files must be in the same data format.所有文件必须为相同的格式

[hadoop@hadoop001 resources]$ hadoop fs -put users.parquet /streaming/input/11

把一个parquet文件写到/streaming/input/中去,streaming读出来的是乱码

-------------------------------------------
Time: 1538088485000 ms
-------------------------------------------
(\Hexample.avro.User
                    %name%
                          %fa+-_i+e_c-+-_%5fa+-_i+e_+++be__%a__ay<&
                                                                  +a+eDH&P
                                                                          fa+-_i+e_c-+-_<@&P&�%(fa+-_i+e_+++be__a__ay,1)
(PAR1"@A+y__aBe+,1)
(ZZ&��
      a+_-._che+a�{"+y-e":"_ec-_d","+a+e":"U_e_","+a+e_-ace":"e|a+-+e.a+_-","fie+d_":[{"+a+e":"+a+e","+y-e":"_+_i+g"},{"+a+e":"fa+-_i+e_c-+-_","+y-e":["_+_i+g","++++"]},{"+a+e":"fa+-_i+e_+++be__","+y-e":{"+y-e":"a__ay","i+e+_":"i++"}}]}-a_-+e+-+_ +e__i-+ 1.4.3�PAR1,1)
(@,1)
(0red88,,1)

-------------------------------------------
Time: 1538088490000 ms
-------------------------------------------
[hadoop@hadoop001 resources]$ hadoop fs -put people.json /streaming/input/12
[hadoop@hadoop001 resources]$ hadoop fs -put people.csv /streaming/input/16

json,csv等可以读出来,但是wordcount做的并不对,因此文件格式必须一致才可以正确的单词计数,生产上都是同一个格式的文件丢在一个文件夹里的

-------------------------------------------
Time: 1538088650000 ms
-------------------------------------------
({"name":"Michael"},1)
({"name":"Andy", "age":30},1)
({"name":"Justin", "age":19},1)

-------------------------------------------
Time: 1538088655000 ms
-------------------------------------------
......
......

-------------------------------------------
Time: 1538089055000 ms
-------------------------------------------
(Jorge;30;Developer,1)
(Bob;32;Developer,1)
(name;age;job,1)

-------------------------------------------
Time: 1538089060000 ms
-------------------------------------------

上一篇 下一篇

猜你喜欢

热点阅读