spark

SparkSQL读写JDBC数据

2018-09-26  本文已影响0人  白面葫芦娃92

一、使用IDEA sparksql读取jdbc数据源
首先看一下mysql中的数据:

mysql> use test;

mysql> create table emp(empno int, ename varchar(100),job varchar(100),mgr int, hiredate varchar(100), sal double, comm double, deptno int);

mysql> load data infile '/usr/local/mysql/data/emp.txt' replace into table emp fields terminated by'\t';

mysql> create table dept(deptno int, dname varchar(100),loc varchar(100));   

mysql> load data infile '/usr/local/mysql/data/dept.txt' replace into table dept fields terminated by'\t';      

mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| dept           |
| emp            |
| user           |
+----------------+
3 rows in set (0.00 sec)

mysql> select * from emp;
+-------+--------+-----------+------+------------+-------+------+--------+
| empno | ename  | job       | mgr  | hiredate   | sal   | comm | deptno |
+-------+--------+-----------+------+------------+-------+------+--------+
|  7369 | SMITH  | CLERK     | 7902 | 1980-12-17 |   800 |    0 |     20 |
|  7499 | ALLEN  | SALESMAN  | 7698 | 1981-2-20  |  1600 |  300 |     30 |
|  7521 | WARD   | SALESMAN  | 7698 | 1981-2-22  |  1250 |  500 |     30 |
|  7566 | JONES  | MANAGER   | 7839 | 1981-4-2   |  2975 |    0 |     20 |
|  7654 | MARTIN | SALESMAN  | 7698 | 1981-9-28  |  1250 | 1400 |     30 |
|  7698 | BLAKE  | MANAGER   | 7839 | 1981-5-1   |  2850 |    0 |     30 |
|  7782 | CLARK  | MANAGER   | 7839 | 1981-6-9   |  2450 |    0 |     10 |
|  7788 | SCOTT  | ANALYST   | 7566 | 1987-4-19  |  3000 |    0 |     20 |
|  7839 | KING   | PRESIDENT |    0 | 1981-11-17 |  5000 |    0 |     10 |
|  7844 | TURNER | SALESMAN  | 7698 | 1981-9-8   |  1500 |    0 |     30 |
|  7876 | ADAMS  | CLERK     | 7788 | 1987-5-23  |  1100 |    0 |     20 |
|  7900 | JAMES  | CLERK     | 7698 | 1981-12-3  |   950 |    0 |     30 |
|  7902 | FORD   | ANALYST   | 7566 | 1981-12-3  |  3000 |    0 |     20 |
|  7934 | MILLER | CLERK     | 7782 | 1982-1-23  |  1300 |    0 |     10 |
|  8888 | HIVE   | PROGRAM   | 7839 | 1988-1-23  | 10300 |    0 |   NULL |
+-------+--------+-----------+------+------------+-------+------+--------+
15 rows in set (0.00 sec)

IDEA代码如下:

import org.apache.spark.sql.SparkSession

object ExtDSApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("ExtDSApp")
      .master("local[2]")
      .getOrCreate()

val emp = spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://hadoop000:3306/test?user=root&password=123456","dbtable"->"emp","driver"->"com.mysql.jdbc.Driver")).load().show()

    spark.stop()
  }
}

运行报错:

Exception in thread "main" java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
找不到jdbc驱动,所以需要在加入驱动jar包

也可以在pom文件中直接添加依赖

<dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
    </dependency>

加入jar包之后再运行,报错:

Exception in thread "main" java.sql.SQLException: null,  message from server: "Host '192.168.137.1' is not allowed to connect to this MySQL server"

没有权限访问数据库,需要开放数据库的访问权限

mysql> grant all privileges on test.* to root@'192.168.137.251' identified by '123456';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)

再次运行,读取成功

+-----+------+---------+----+----------+-------+------+------+
|empno| ename|      job| mgr|  hiredate|    sal|  comm|deptno|
+-----+------+---------+----+----------+-------+------+------+
| 7369| SMITH|    CLERK|7902|1980-12-17|  800.0|   0.0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20| 1600.0| 300.0|    30|
| 7521|  WARD| SALESMAN|7698| 1981-2-22| 1250.0| 500.0|    30|
| 7566| JONES|  MANAGER|7839|  1981-4-2| 2975.0|   0.0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28| 1250.0|1400.0|    30|
| 7698| BLAKE|  MANAGER|7839|  1981-5-1| 2850.0|   0.0|    30|
| 7782| CLARK|  MANAGER|7839|  1981-6-9| 2450.0|   0.0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987-4-19| 3000.0|   0.0|    20|
| 7839|  KING|PRESIDENT|   0|1981-11-17| 5000.0|   0.0|    10|
| 7844|TURNER| SALESMAN|7698|  1981-9-8| 1500.0|   0.0|    30|
| 7876| ADAMS|    CLERK|7788| 1987-5-23| 1100.0|   0.0|    20|
| 7900| JAMES|    CLERK|7698| 1981-12-3|  950.0|   0.0|    30|
| 7902|  FORD|  ANALYST|7566| 1981-12-3| 3000.0|   0.0|    20|
| 7934|MILLER|    CLERK|7782| 1982-1-23| 1300.0|   0.0|    10|
| 8888|  HIVE|  PROGRAM|7839| 1988-1-23|10300.0|   0.0|  null|
+-----+------+---------+----+----------+-------+------+------+

二、使用spark-shell测试

[hadoop@hadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar 
scala> val empDF = spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://hadoop000:3306/test?user=root&password=123456","dbtable"->"emp","driver"->"com.mysql.jdbc.Driver")).load()
empDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]

scala> empDF.show
+-----+------+---------+----+----------+-------+------+------+
|empno| ename|      job| mgr|  hiredate|    sal|  comm|deptno|
+-----+------+---------+----+----------+-------+------+------+
| 7369| SMITH|    CLERK|7902|1980-12-17|  800.0|   0.0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20| 1600.0| 300.0|    30|
| 7521|  WARD| SALESMAN|7698| 1981-2-22| 1250.0| 500.0|    30|
| 7566| JONES|  MANAGER|7839|  1981-4-2| 2975.0|   0.0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28| 1250.0|1400.0|    30|
| 7698| BLAKE|  MANAGER|7839|  1981-5-1| 2850.0|   0.0|    30|
| 7782| CLARK|  MANAGER|7839|  1981-6-9| 2450.0|   0.0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987-4-19| 3000.0|   0.0|    20|
| 7839|  KING|PRESIDENT|   0|1981-11-17| 5000.0|   0.0|    10|
| 7844|TURNER| SALESMAN|7698|  1981-9-8| 1500.0|   0.0|    30|
| 7876| ADAMS|    CLERK|7788| 1987-5-23| 1100.0|   0.0|    20|
| 7900| JAMES|    CLERK|7698| 1981-12-3|  950.0|   0.0|    30|
| 7902|  FORD|  ANALYST|7566| 1981-12-3| 3000.0|   0.0|    20|
| 7934|MILLER|    CLERK|7782| 1982-1-23| 1300.0|   0.0|    10|
| 8888|  HIVE|  PROGRAM|7839| 1988-1-23|10300.0|   0.0|  null|
+-----+------+---------+----+----------+-------+------+------+

三、SparkSQL/Hive中数据与JDBC中数据做聚合
SparkSQL中数据如下:

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|     dept|      false|
| default|      emp|      false|
+--------+---------+-----------+

scala> spark.sql("select * from dept").show
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPREATIONS|  BOSTON|
+------+----------+--------+

下面和mysql中的emp表做join(上文中已经把mysql中的emp表转成empDF)

scala> val deptDF = spark.table("dept")
deptDF: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 1 more field]

scala> deptDF.show
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPREATIONS|  BOSTON|
+------+----------+--------+

scala> empDF.join(deptDF, empDF.col("deptno")===deptDF.col("deptno")).show
+-----+------+---------+----+----------+------+------+------+------+----------+--------+
|empno| ename|      job| mgr|  hiredate|   sal|  comm|deptno|deptno|     dname|     loc|
+-----+------+---------+----+----------+------+------+------+------+----------+--------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0|    30|    30|     SALES| CHICAGO|
| 7521|  WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0|    30|    30|     SALES| CHICAGO|
| 7566| JONES|  MANAGER|7839|  1981-4-2|2975.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0|    30|    30|     SALES| CHICAGO|
| 7698| BLAKE|  MANAGER|7839|  1981-5-1|2850.0|   0.0|    30|    30|     SALES| CHICAGO|
| 7782| CLARK|  MANAGER|7839|  1981-6-9|2450.0|   0.0|    10|    10|ACCOUNTING|NEW YORK|
| 7788| SCOTT|  ANALYST|7566| 1987-4-19|3000.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
| 7839|  KING|PRESIDENT|   0|1981-11-17|5000.0|   0.0|    10|    10|ACCOUNTING|NEW YORK|
| 7844|TURNER| SALESMAN|7698|  1981-9-8|1500.0|   0.0|    30|    30|     SALES| CHICAGO|
| 7876| ADAMS|    CLERK|7788| 1987-5-23|1100.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
| 7900| JAMES|    CLERK|7698| 1981-12-3| 950.0|   0.0|    30|    30|     SALES| CHICAGO|
| 7902|  FORD|  ANALYST|7566| 1981-12-3|3000.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
| 7934|MILLER|    CLERK|7782| 1982-1-23|1300.0|   0.0|    10|    10|ACCOUNTING|NEW YORK|
+-----+------+---------+----+----------+------+------+------+------+----------+--------+

四、从sparksql写到mysql中

scala> val empDF = spark.table("emp");
empDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]

scala> empDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()

查看mysql

mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| dept           |
| emp            |
| emp_sparksql   |
| user           |
+----------------+
4 rows in set (0.00 sec)

再写一次会报错:表已经存在

scala> empDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()
org.apache.spark.sql.AnalysisException: Table or view 'test.emp_sparksql' already exists. SaveMode: ErrorIfExists.;

需要加入mode参数

scala> empDF.write.mode("overwrite").format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()

还有mode("append")可以在有需要的时候使用
这样写入之后有个区别:

mysql> desc emp_sparksql;
+----------+---------+------+-----+---------+-------+
| Field    | Type    | Null | Key | Default | Extra |
+----------+---------+------+-----+---------+-------+
| empno    | int(11) | YES  |     | NULL    |       |
| ename    | text    | YES  |     | NULL    |       |
| job      | text    | YES  |     | NULL    |       |
| mgr      | int(11) | YES  |     | NULL    |       |
| hiredate | text    | YES  |     | NULL    |       |
| salary   | double  | YES  |     | NULL    |       |
| comm     | double  | YES  |     | NULL    |       |
| deptno   | int(11) | YES  |     | NULL    |       |
+----------+---------+------+-----+---------+-------+
8 rows in set (0.00 sec)

mysql> desc emp;
+----------+--------------+------+-----+---------+-------+
| Field    | Type         | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| empno    | int(11)      | YES  |     | NULL    |       |
| ename    | varchar(100) | YES  |     | NULL    |       |
| job      | varchar(100) | YES  |     | NULL    |       |
| mgr      | int(11)      | YES  |     | NULL    |       |
| hiredate | varchar(100) | YES  |     | NULL    |       |
| sal      | double       | YES  |     | NULL    |       |
| comm     | double       | YES  |     | NULL    |       |
| deptno   | int(11)      | YES  |     | NULL    |       |
+----------+--------------+------+-----+---------+-------+
8 rows in set (0.00 sec)

数据类型出现了变化,可以加入一个option指定每列的数据类型

scala> empDF.write.mode("overwrite").format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp1_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").option("createTableColumnTypes", "ename varchar(100),job varchar(100), hiredate varchar(100)").save()

mysql> desc emp1_sparksql;
+----------+--------------+------+-----+---------+-------+
| Field    | Type         | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| empno    | int(11)      | YES  |     | NULL    |       |
| ename    | varchar(100) | YES  |     | NULL    |       |
| job      | varchar(100) | YES  |     | NULL    |       |
| mgr      | int(11)      | YES  |     | NULL    |       |
| hiredate | varchar(100) | YES  |     | NULL    |       |
| salary   | double       | YES  |     | NULL    |       |
| comm     | double       | YES  |     | NULL    |       |
| deptno   | int(11)      | YES  |     | NULL    |       |
+----------+--------------+------+-----+---------+-------+
8 rows in set (0.00 sec)

附:table函数的源码:

/**
   * Returns the specified table/view as a `DataFrame`.
   *
   * @param tableName is either a qualified or unqualified name that designates a table or view.
   *                  If a database is specified, it identifies the table/view from the database.
   *                  Otherwise, it first attempts to find a temporary view with the given name
   *                  and then match the table/view from the current database.
   *                  Note that, the global temporary view database is also valid here.
   * @since 2.0.0
   */
  def table(tableName: String): DataFrame = {
    table(sessionState.sqlParser.parseTableIdentifier(tableName))
  }

五、sql的方式读取JDBC数据

[hadoop@hadoop000 bin]$ ./spark-sql --master local[2] --driver-class-path ~/software/mysql-connector-java-5.1.27.jar
spark-sql>CREATE TEMPORARY VIEW emp_mysql USING org.apache.spark.sql.jdbc OPTIONS (url "jdbc:mysql://hadoop000:3306/",dbtable "test.emp",user 'root', password '123456');
Time taken: 0.517 seconds
18/09/26 02:53:17 INFO thriftserver.SparkSQLCLIDriver: Time taken: 0.517 seconds
spark-sql> show tables;
default dept    false
default emp     false
        emp_mysql       true
Time taken: 0.142 seconds, Fetched 3 row(s)
spark-sql> select* from emp_mysql;
7369    SMITH   CLERK   7902    1980-12-17      800.0   0.0     20
7499    ALLEN   SALESMAN        7698    1981-2-20       1600.0  300.0   30
7521    WARD    SALESMAN        7698    1981-2-22       1250.0  500.0   30
7566    JONES   MANAGER 7839    1981-4-2        2975.0  0.0     20
7654    MARTIN  SALESMAN        7698    1981-9-28       1250.0  1400.0  30
7698    BLAKE   MANAGER 7839    1981-5-1        2850.0  0.0     30
7782    CLARK   MANAGER 7839    1981-6-9        2450.0  0.0     10
7788    SCOTT   ANALYST 7566    1987-4-19       3000.0  0.0     20
7839    KING    PRESIDENT       0       1981-11-17      5000.0  0.0     10
7844    TURNER  SALESMAN        7698    1981-9-8        1500.0  0.0     30
7876    ADAMS   CLERK   7788    1987-5-23       1100.0  0.0     20
7900    JAMES   CLERK   7698    1981-12-3       950.0   0.0     30
7902    FORD    ANALYST 7566    1981-12-3       3000.0  0.0     20
7934    MILLER  CLERK   7782    1982-1-23       1300.0  0.0     10
8888    HIVE    PROGRAM 7839    1988-1-23       10300.0 0.0     NULL
Time taken: 2.844 seconds, Fetched 15 row(s)
上一篇 下一篇

猜你喜欢

热点阅读