SparkStreaming入门教程(五)输出操作Output

2018-02-25  本文已影响0人  胖滚猪学编程

Output Operations将DStream的数据推送到外部系统,如数据库或文件系统。类似于RDD的惰性求值,输出操作才会触发计算的实际执行。

前面几个都太简单,直接调用方法即可,只演示spark将数据输出插入到mysql数据库的代码

import java.sql.DriverManager
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(5))   
val wordcount = ssc.socketTextStream("localhost", 7799).flatMap(_.split(" ")).map(word => (word, 1))
wordcount.foreachRDD(wd => wd.foreachPartition(
      data => {
        val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
        try {
          for (row <- data) {
            println("input data is " + row._1 + "  " + row._2)
            val sql = "insert into stream(word,num) values(" + "'" + row._1 + "'," + row._2 + ")"
            conn.prepareStatement(sql).executeUpdate()
          }
        } finally {
          conn.close()
        }
      }))
      
ssc.start()
ssc.awaitTermination()
上一篇 下一篇

猜你喜欢

热点阅读