js css html大数据学习

Spark SQL:Spark DataFrame写入Tidb代

2022-06-23  本文已影响0人  xiaogp

摘要:Spark SQLTidb

依赖准备

需要MySQL连接器驱动mysql-connector-java,upsert操作需要一个第三方依赖

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.36</version>
        </dependency>

        <dependency>
            <groupId>com.dounine</groupId>
            <artifactId>spark-sql-datasource</artifactId>
            <version>1.0.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_2.11</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

代码记录

先创建一个带有主键的Tidb表

CREATE TABLE `test` (
  `a` int(11) NOT NULL,
  `b` int(11) DEFAULT NULL,
  PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
(1)全量更新overwrite

挂一个jar用shell测试一下

spark2-shell --jars mysql-connector-java-5.1.36.jar

示例代码如下,需要指定"truncate" -> "true",否则删表,由程序自动建一个新表,导致数据类型不对,或者主键没有等问题

import org.apache.spark.sql.{SaveMode, SparkSession}

object test {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("yarn").appName("test").enableHiveSupport().getOrCreate()
    import spark.implicits._
    val df = Seq((1, 9), (2, 3)).toDF("a", "b")

    df.write.format("jdbc").options(Map(
      "url" -> "jdbc:mysql://172.20.3.78:4000/fin_operation",
      "dbtable" -> "test",
      "user" -> "username",
      "password" -> "password",
      "driver" -> "com.mysql.jdbc.Driver",
      "truncate" -> "true"))
      .mode(SaveMode.Overwrite).save()
  }
}
(2)有则更新无则插入

挂两个jar包用shell测试一下

spark2-shell --jars mysql-connector-java-5.1.36.jar,spark-sql-datasource-1.0.1.jar

示例代码如下,参数和正常的jdbc没什么区别

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.jdbc2.JDBCSaveMode

object test {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("yarn").appName("test").enableHiveSupport().getOrCreate()
    import spark.implicits._
    val df = Seq((1, 9), (2, 3)).toDF("a", "b")
    df.write.format("org.apache.spark.sql.execution.datasources.jdbc2").options(
        Map(
          "savemode" -> JDBCSaveMode.Update.toString,
          "driver" -> "com.mysql.jdbc.Driver",
          "url" -> "jdbc:mysql://172.20.3.78:4000/fin_operation",
          "user" -> "username",
          "password" -> "password",
          "dbtable" -> "test",
          "useSSL" -> "false",
          "showSql" -> "false"
        )
      ).save()
  }
}

需要插入的数据为

scala> df.show()
+---+---+
|  a|  b|
+---+---+
|  1|  9|
|  2|  3|
+---+---+

插入Tidb结果如下

MySQL [fin_operation]> select * from test;
+---+------+
| a | b    |
+---+------+
| 1 |    9 |
| 2 |    3 |

然后再造两条数据,一条主键冲突,一条是新的id不冲突,测试时候可以upsert

val df = Seq((1, 21), (3, 4)).toDF("a", "b")

再次调用代码插入Tidb结果如下

MySQL [fin_operation]> select * from test;
+---+------+
| a | b    |
+---+------+
| 1 |   21 |
| 2 |    3 |
| 3 |    4 |
+---+------+
3 rows in set (0.00 sec)

可以没问题

上一篇 下一篇

猜你喜欢

热点阅读