Spark开发 之 SQL简介

2021-04-23  本文已影响0人  诺之林

本文基于Spark架构

目录

引入

定义

image.png

特点

整合RDD和SQL

cat /opt/services/spark/examples/src/main/resources/people.txt
# Michael, 29
# Andy, 30
# Justin, 19

/opt/services/spark/bin/spark-shell
case class People(name: String, age: Long)

val rdd = sc.textFile("/opt/services/spark/examples/src/main/resources/people.txt")

val mapRDD = rdd.map(_.split(",")).map(attributes => People(attributes(0), attributes(1).trim.toInt))

val filterRDD = mapRDD.filter(_.age > 20)

filterRDD.foreach(p => println(s"${p.name} ${p.age}"))

关于Scala字符串插值 可以参考Scala字符串插值

case class People(name: String, age: Long)

val rdd = sc.textFile("/opt/services/spark/examples/src/main/resources/people.txt")

# import spark.implicits._
val df = rdd.map(_.split(",")).map(attributes => People(attributes(0), attributes(1).trim.toInt)).toDF()

df.createOrReplaceTempView("people")

spark.sql("SELECT * FROM people WHERE age > 20").show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
+-------+---

统一数据访问

cat /opt/services/spark/examples/src/main/resources/people.json
# {"name":"Michael"}
# {"name":"Andy", "age":30}
# {"name":"Justin", "age":19}

/opt/services/spark/bin/spark-shell
val df = spark.read.json("/opt/services/spark/examples/src/main/resources/people.json")

df.createOrReplaceTempView("people")

spark.sql("SELECT * FROM people WHERE age > 20").show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

标准数据连接

docker run --name spark-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.17

docker exec -it spark-mysql /bin/bash

mysql -uroot -p123456
CREATE DATABASE IF NOT EXISTS db_spark DEFAULT CHARSET utf8 COLLATE utf8_general_ci;

USE db_spark;

CREATE TABLE users ( \
  id int(10) unsigned NOT NULL AUTO_INCREMENT, \
  name varchar(20) DEFAULT NULL COMMENT '用户名', \
  PRIMARY KEY (`id`) \
);

INSERT INTO users VALUES (1, 'XiaoWang');

INSERT INTO users VALUES (2, 'XiaoMing');
# cd /opt/services
# wget https://mirror.tuna.tsinghua.edu.cn/mysql/downloads/Connector-J/mysql-connector-java-5.1.49.tar.gz
# tar xf mysql-connector-java-5.1.49.tar.gz
/opt/services/spark/bin/spark-shell --jars /opt/services/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar
# :paste
val df = spark.read.format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306/db_spark")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("user", "root")
    .option("password", "123456")
    .option("dbtable", "users")
    .load()
# Ctrl + D

df.createOrReplaceTempView("users")

val sqlDF = spark.sql("SELECT * FROM users WHERE name = 'XiaoMing'")

sqlDF.show()
+---+--------+
| id|    name|
+---+--------+
|  2|XiaoMing|
+---+--------+

参考

上一篇下一篇

猜你喜欢

热点阅读