Spark之读取MySQL数据的五种方式

2018-09-25  本文已影响56人  阿坤的博客

本文介绍了使用Spark连接Mysql的五种方式。

主要内容:

  1. 不指定查询条件
  2. 指定数据库字段的范围
  3. 根据任意字段进行分区
  4. 通过load获取,和方式二类似
  5. 加载条件查询后的数据

1.不指定查询条件

def main(args: Array[String]): Unit = {

  val spark =
  SparkSession.builder()
  .appName("MysqlSupport")
  .master("local[2]")
  .getOrCreate()

  method1(spark)
  //method2(spark)
  //method3(spark)
  //method4(spark)
  //method5(spark)
}

/**
  * 方式一:不指定查询条件
  * 所有的数据由RDD的一个分区处理,如果你这个表很大,很可能会出现OOM
  *
  * @param spark
  */
def method1(spark: SparkSession): Unit = {
  val url = "jdbc:mysql://127.0.0.1:3306/test?user=root&password=root"
  val prop = new Properties()
  val df = spark.read.jdbc(url, "t_score", prop)

  println(df.count())
  println(df.rdd.partitions.size)
  df.createOrReplaceTempView("t_score")
  import spark.sql
  sql("select * from t_score where score=98").show()
}

2.指定数据库字段的范围

/**
  * 方式二:指定数据库字段的范围
  * 通过lowerBound和upperBound 指定分区的范围
  * 通过columnName 指定分区的列(只支持整形)
  * 通过numPartitions 指定分区数量 (不宜过大)
  *
  * @param spark
  */
def method2(spark: SparkSession): Unit = {
  val lowerBound = 1
  val upperBound = 100000
  val numPartitions = 5
  val url = "jdbc:mysql://127.0.0.1:3306/test?user=root&password=root"
  val prop = new Properties()
  val df = spark.read.jdbc(url, "t_score", "id", lowerBound, upperBound, numPartitions, prop)

  println(df.count())
  println(df.rdd.partitions.size)
}

3.根据任意字段进行分区

/**
    * 方式三:根据任意字段进行分区
    * 通过predicates将数据根据score分为2个区
    *
    * @param spark
    */
def method3(spark: SparkSession): Unit = {
  val predicates = Array[String]("score <= 97", "score > 97 and score <= 100")
  val url = "jdbc:mysql://127.0.0.1:3306/test?user=root&password=root"
  val prop = new Properties()
  val df = spark.read.jdbc(url, "t_score", predicates, prop)

  println(df.count())
  println(df.rdd.partitions.size)
  import spark.sql
  df.createOrReplaceTempView("t_score")
  sql("select * from t_score").show()
}

4.通过load获取,和方式二类似

/**
  * 方式四:通过load获取,和方式二类似
  * @param spark
  */
def method4(spark: SparkSession): Unit = {
  val url = "jdbc:mysql://127.0.0.1:3306/test?user=root&password=root"
  val df = spark.read.format("jdbc").options(Map("url" -> url, "dbtable" -> "t_score")).load()

  println(df.count())
  println(df.rdd.partitions.size)
  import spark.sql
  df.createOrReplaceTempView("t_score")
  sql("select * from t_score").show()
}

5.加载条件查询后的数据

/**
  * 方式五:加载条件查询后的数据
  * @param spark
  */
def method5(spark: SparkSession): Unit = {
  val url = "jdbc:mysql://127.0.0.1:3306/test?user=root&password=root"
  val df = spark.read.format("jdbc").options(Map("url" -> url, "dbtable" -> "(SELECT s.*,u.name FROM t_score s JOIN t_user u ON s.id=u.score_id) t_score")).load()

  println(df.count())
  println(df.rdd.partitions.size)
  import spark.sql
  df.createOrReplaceTempView("t_score")
  sql("select * from t_score").show()

  Thread.sleep(60 * 1000)
}

参考:
# Spark读取数据库(Mysql)的四种方式讲解

上一篇下一篇

猜你喜欢

热点阅读