Spark开发 之 SQL简介
2021-04-23 本文已影响0人
诺之林
本文基于Spark架构
目录
引入
-
早期大数据开发者都是从Web转型而来 SQL又是Web开发者必备技能
-
Spark SQL提供了Data Frame 以简化RDD开发
定义
-
Data Frame = 以RDD为基础的分布式数据集
-
Data Frame = RDD + Scheme
特点
整合RDD和SQL
cat /opt/services/spark/examples/src/main/resources/people.txt
# Michael, 29
# Andy, 30
# Justin, 19
/opt/services/spark/bin/spark-shell
case class People(name: String, age: Long)
val rdd = sc.textFile("/opt/services/spark/examples/src/main/resources/people.txt")
val mapRDD = rdd.map(_.split(",")).map(attributes => People(attributes(0), attributes(1).trim.toInt))
val filterRDD = mapRDD.filter(_.age > 20)
filterRDD.foreach(p => println(s"${p.name} ${p.age}"))
关于Scala字符串插值 可以参考Scala字符串插值
case class People(name: String, age: Long)
val rdd = sc.textFile("/opt/services/spark/examples/src/main/resources/people.txt")
# import spark.implicits._
val df = rdd.map(_.split(",")).map(attributes => People(attributes(0), attributes(1).trim.toInt)).toDF()
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE age > 20").show()
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
+-------+---
统一数据访问
cat /opt/services/spark/examples/src/main/resources/people.json
# {"name":"Michael"}
# {"name":"Andy", "age":30}
# {"name":"Justin", "age":19}
/opt/services/spark/bin/spark-shell
val df = spark.read.json("/opt/services/spark/examples/src/main/resources/people.json")
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE age > 20").show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
标准数据连接
docker run --name spark-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.17
docker exec -it spark-mysql /bin/bash
mysql -uroot -p123456
CREATE DATABASE IF NOT EXISTS db_spark DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
USE db_spark;
CREATE TABLE users ( \
id int(10) unsigned NOT NULL AUTO_INCREMENT, \
name varchar(20) DEFAULT NULL COMMENT '用户名', \
PRIMARY KEY (`id`) \
);
INSERT INTO users VALUES (1, 'XiaoWang');
INSERT INTO users VALUES (2, 'XiaoMing');
# cd /opt/services
# wget https://mirror.tuna.tsinghua.edu.cn/mysql/downloads/Connector-J/mysql-connector-java-5.1.49.tar.gz
# tar xf mysql-connector-java-5.1.49.tar.gz
/opt/services/spark/bin/spark-shell --jars /opt/services/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar
# :paste
val df = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/db_spark")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "users")
.load()
# Ctrl + D
df.createOrReplaceTempView("users")
val sqlDF = spark.sql("SELECT * FROM users WHERE name = 'XiaoMing'")
sqlDF.show()
+---+--------+
| id| name|
+---+--------+
| 2|XiaoMing|
+---+--------+