用Spark将mysql数据库的表保存到本地的多种实现方式

2019-07-18  本文已影响0人  不愿透露姓名的李某某

package SparkSql

import org.apache.spark.sql.{DataFrame, SparkSession}

object JdbcDataSource {

def main(args: Array[String]): Unit = {

val spark=SparkSession.builder()

.appName(this.getClass.getSimpleName)

.master("local[*]").getOrCreate()

val logs:DataFrame = spark.read.format("jdbc").options(

Map("url" ->"jdbc:mysql://localhost:3306/mysql",

"driver" ->"com.mysql.jdbc.Driver",

"dbtable" ->"help_category",

"user" ->"root",

"password" ->"root")

).load()

//    val filter: Dataset[Row] = logs.filter(r => {

//      r.getAs[Int](2) <=34

//    })

//    filter.show()

//lambda表达式

    import spark.implicits._

val lo = logs.filter($"parent_category_id" <=34)

//    val lop=logs.where($"parent_category_id"<=34)

//查询

    val res = lo.select($"help_category_id",$"name",$"parent_category_id" *10 as"new_age")

//将查询的结果设置成一张新表

   val props=new Properties()

   props.put("user","root")

 props.put("password","root")

//ignore :当该表已经存在时既不对其增加操作也不执行覆盖操作

   res.write.mode("ignore").jdbc("jdbc:mysql://localhost:3306/mysql","logs1",props)

//将文件保存到本地

//只能保存一列,且该列必须是字符串类型

//    res.write.text("D:\\out\\sql")

//将文件保存到本地且为json格式

//{"help_category_id":1,"name":"Geographic","new age":0}

//    res.write.json("D:\\out\\sql")

//将文件保存到本地且为xls工作表格式,不记录列名和表头

//    res.write.csv("D:\\out\\sql")

    res.write.parquet("D:\\out\\sql")

spark.stop()

}

}

上一篇下一篇

猜你喜欢

热点阅读