Spark 开发笔记
这两天尝试将 spark
与 spring boot
进行结合使用,这样一些数据比较大算法逻辑直接扔给 spark
进行处理,加上 sparkSQL
从而更加贴近 web
开发逻辑。
注
整体是在 spring boot
架构下进行的操作,因此本文中主要采用 java
为基础开发语言进行开发。
依赖包
- 版本属性
# spark
spring_data_hadoop_version = 2.3.0.RELEASE
spark_version = 2.3.2
scala_version = 2.11
- gradle 配置
compile("org.springframework.data:spring-data-hadoop-boot:${spring_data_hadoop_version}")
compile("org.springframework.data:spring-data-hadoop-batch:${spring_data_hadoop_version}")
compile("org.springframework.data:spring-data-hadoop-spark:${spring_data_hadoop_version}")
compile("org.apache.spark:spark-yarn_${scala_version}:${spark_version}")
compile("org.apache.spark:spark-core_${scala_version}:${spark_version}") {
exclude group: 'com.fasterxml.jackson.module', module: "jackson-module-scala_${scala_version}"
}
compile("com.fasterxml.jackson.module:jackson-module-scala_${scala_version}:2.9.4")
compile("org.apache.spark:spark-streaming_${scala_version}:${spark_version}")
compile("org.apache.spark:spark-hive_${scala_version}:${spark_version}")
compile("org.apache.spark:spark-sql_${scala_version}:${spark_version}")
compile("org.apache.spark:spark-mllib_${scala_version}:${spark_version}")
// hbase
compile 'org.apache.phoenix:phoenix-spark:5.0.0-HBase-2.0'
SparkSQL基础框架
-
json
文件进行查询
SparkConf sparkConf = new SparkConf().setAppName("sparkTest")
.setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
//把数据框读取过来完全可以理解为一张表
Dataset<Row> jsonDataFrame = sqlContext.read().json(SparkSQLTest.class.getResource("/data/json/students.json").getPath());
//打印这张表
jsonDataFrame.show();
//打印元数据
jsonDataFrame.printSchema();
//查询并列计算
jsonDataFrame.select("name").show();
jsonDataFrame.select(jsonDataFrame.col("name"), jsonDataFrame.col("score").plus(1)).show();//对socre列值进行加一
//过滤
jsonDataFrame.filter(jsonDataFrame.col("score").gt(80)).show();
//根据某一列进行分组然后统计
jsonDataFrame.groupBy("score").count().show();
sc.close();
-
json
文件进行SQL
查询
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
//把数据框读取过来完全可以理解为一张表
Dataset<Row> jsonDataFrame = sqlContext.read().json(SparkSQLTest.class.getResource("/data/json/students.json").getPath());
jsonDataFrame.createOrReplaceTempView("student");
sqlContext.sql("select * from student").show();
sc.close();
- 数据库查询进行临时表查询
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = SQLContext.getOrCreate(JavaSparkContext.toSparkContext(sc));
String sql = " (select * from t_app) as application";
DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://172.19.xxx.xxx:3306/stat");//数据库路径
reader.option("dbtable", sql);//数据表名
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "passwd123");
Dataset<Row> jdbcRDD = reader.load();
jdbcRDD.show();
jdbcRDD.createOrReplaceTempView("xxxx");
sqlContext.sql("select id, appid from xxxx").show();
sc.close();
- 文本数据处理
注
下面处理的过程中,如果不使用 lambda
函数方式进行编写会出现序列化异常(java.io.NotSerializableException
),目前还不知道原因
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> textFileRDD = sc.textFile(SparkSQLTest.class.getResource("/data/doc.txt").getPath());
# 下面的步骤都采用 lambda 函数方式进行编写
JavaRDD<String> wordsRDD = textFileRDD.flatMap((FlatMapFunction<String, String>) s -> {
String[] split = s.split(" ");
return Arrays.asList(split).iterator();
});
JavaPairRDD<String, Integer> wordsPairRDD = wordsRDD.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> wordCountRDD = wordsPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
wordCountRDD.foreach((VoidFunction<Tuple2<String, Integer>>) stringIntegerTuple2 -> System.out.println(stringIntegerTuple2));
sc.close();
- 数组数据查询
通过 JavaSparkContext.parallelize()
函数将常见的数据转为 JavaRDD
类型,这样进一步执行后续操作。实例如下:
- 数据倍乘
将 List
数据转为 JavaRDD
之后进行倍乘操作
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<String> result = rdd.map((Function<Integer, String>) x -> x * x + "");
System.out.println(StringUtils.join(result.collect(), ","));
- 人员数据查询
JavaSparkContext sc = new JavaSparkContext(sparkConf);
List<Tuple2<String, Integer>> words = new ArrayList<>();
words.add(new Tuple2<>("zhangsan", 22));
words.add(new Tuple2<>("lisi", 40));
words.add(new Tuple2<>("zhangsan", 21));
words.add(new Tuple2<>("lisi", 23));
words.add(new Tuple2<>("wangwu", 60));
// 构建 JavaRDD 实例
JavaPairRDD<String, Integer> wordsPairRDD = sc.parallelizePairs(words);
// 将数据转为 row 模式
JavaRDD<Row> wordRowRDD = wordsPairRDD.map(tuple -> RowFactory.create(tuple._1, tuple._2));
List<StructField> fieldList = new ArrayList<>();
fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
fieldList.add(DataTypes.createStructField("num", DataTypes.IntegerType, false));
// 创建 sql schema
StructType schema = DataTypes.createStructType(fieldList);
SQLContext sqlContext = new SQLContext(sc);
// 构建 Dataset<Row> 实例
Dataset<Row> personDF = sqlContext.createDataFrame(wordRowRDD, schema);
personDF.createOrReplaceTempView("person");
sqlContext.sql("select * from person").show();
sc.close();
上述例子中,将实际的 List
数据转化为 JavaRDD
类型的数据,再将其转化为 row
模式,并添加 schema
,通过 SQLContext
转化为 Dataset<Row>
对象,再创建临时表,进行 SQL
查询。
- 数据使用
SQL
查询
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<Person> personList = new ArrayList<>();
personList.add(new Person("zhangsan", 22));
personList.add(new Person("lisi", 40));
personList.add(new Person("zhangsan", 21));
personList.add(new Person("lisi", 23));
personList.add(new Person("wangwu", 60));
// 构建基础 Encoder 类, Encoders.bean() 中填入的类必须含有 Getter/Setter 函数,否则没有 schema
Encoder<Person> personEncoder = Encoders.bean(Person.class);
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sqlContext.createDataset(personList, personEncoder);
// 注册临时表
personDataset.createOrReplaceTempView("person");
sqlContext.sql("select name,sum(num) from person group by name").show();
sc.close();
- 由 Dataset<Person> 转 Dataset<Row>
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<Person> personList = new ArrayList<>();
personList.add(new Person("zhangsan", 22));
personList.add(new Person("lisi", 40));
personList.add(new Person("zhangsan", 21));
personList.add(new Person("lisi", 23));
personList.add(new Person("wangwu", 60));
// 构建基础 Encoder 类, Encoders.bean() 中填入的类必须含有 Getter/Setter 函数,否则没有 schema
Encoder<Person> personEncoder = Encoders.bean(Person.class);
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sqlContext.createDataset(personList, personEncoder);
// 由 Dataset<Person> 转化为 Dataset<Row>
// 注 这里的顺序需要与 Person 类保持一致
Dataset<Row> personDatasetRow = personDataset.toDF("name", "num");
// 注册临时表
personDatasetRow.createOrReplaceTempView("person");
sqlContext.sql("select name,sum(num) from person group by name").show();
sc.close();
注
JavaRDD<Object>
、JavaRDD<Row>
、Dataset<Row>
三者之间区别如下表:
名称 | 说明 |
---|---|
JavaRDD<Object> |
普通对象的数据块 |
JavaRDD<Row> |
Row 对象数据集 |
Dataset<Object> |
具有 Schema 类型安全检查,关系性模型,支持 SQL 查询 |
Dataset<Row> |
具有 Schema 类型安全检查,关系性模型,支持 SQL 查询 |
注
Dataset<Object>
中的 Schema
与其中的字段名一致
注
由 Dataset
对象转 JavaRDD
对象可以直接使用 Dataset.toJavaRDD()
函数转化,毕竟 Dataset 比 JavaRDD 数据结构要严格一些,所以转回去,是比较简单的,反向转化则需要添加 Schema
信息。
注
Encoders.bean()
中填入的类必须含有 Getter/Setter
函数,否则没有 schema
信息。
- 数据导入
- 通过
jdbc
自动加载数据
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String> cassConfigMap = new HashMap<>();
// 通过 jdbc 的方式进行查询数据
cassConfigMap.put("url", "jdbc:mysql://172.xxx.xxx.xxx:3306/sctest");
cassConfigMap.put("driver", "com.mysql.jdbc.Driver");
cassConfigMap.put("dbtable", "(select * from person) as people");
cassConfigMap.put("user", "user");
cassConfigMap.put("password", "password");
cassConfigMap.put("upperBound", "100");
cassConfigMap.put("lowerBound", "0");
cassConfigMap.put("numPartitions", "1");
cassConfigMap.put("partitionColumn", "id");
Dataset<Row> scheduleDs = sqlContext.read().format("jdbc").options(cassConfigMap).load();
scheduleDs.createOrReplaceTempView("people");
scheduleDs.show();
sc.close();
- 使用
JdbcRDD
加载数据
// TODO 这里使用 JDBCRDD 会失败
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
// XXX 注: jdbcRDD 中必须带有 lowerBound 和 upperBound ,且必须在 sql 中使用,
// 否则会出现问题 where upperBound > lowerBound 的错误
JdbcRDD<Person> jdbcRDD = new JdbcRDD<>(sc.sc(),
new DBConnection("com.mysql.jdbc.Driver",
"jdbc:mysql://172.xxx.xxx.xxx:3306/sctest", "user", "password"),
"select * from person where ID >= ? AND ID <= ?", 1, 100, 1,
new ResultObjectMapper<>(Person.class), ClassManifestFactory$.MODULE$.fromClass(Person.class));
long count = jdbcRDD.count();
assert count > 0;
log.debug("=================" + count);
Person[] result = (Person[])jdbcRDD.collect();
log.debug("======" + result);
sc.close();
注
如上述所示,其中所有参数均由 JdbcRelationProvider
中进行限定的,其中必须包含 partitionColumn
、lowerBound
、 upperBound
、numPartitions
等参数。
注
在 DataSource
类中定义了 backwardCompatibilityMap
来维护数据导入 format
,如下所示:
/** A map to maintain backward compatibility in case we move data sources around. */
private val backwardCompatibilityMap: Map[String, String] = {
val jdbc = classOf[JdbcRelationProvider].getCanonicalName
val json = classOf[JsonFileFormat].getCanonicalName
val parquet = classOf[ParquetFileFormat].getCanonicalName
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
Map(
"org.apache.spark.sql.jdbc" -> jdbc,
"org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
"org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
"org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
"org.apache.spark.sql.json" -> json,
"org.apache.spark.sql.json.DefaultSource" -> json,
"org.apache.spark.sql.execution.datasources.json" -> json,
"org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
"org.apache.spark.sql.parquet" -> parquet,
"org.apache.spark.sql.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.execution.datasources.parquet" -> parquet,
"org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
"org.apache.spark.sql.hive.orc" -> orc,
"org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc,
"org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv
)
}
如上所示,其中的包含基础的导入方式。
- 数据导出到文件
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<Person> personList = new ArrayList<>();
personList.add(new Person("zhangsan", 22));
personList.add(new Person("lisi", 40));
personList.add(new Person("zhangsan", 21));
personList.add(new Person("lisi", 23));
personList.add(new Person("wangwu", 60));
// 构建基础 Encoder 类
Encoder<Person> personEncoder = Encoders.bean(Person.class);
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sqlContext.createDataset(personList, personEncoder);
DataFrameWriter<Person> writer = personDataset.write();
// csv 格式保存
writer.csv("./person.csv");
// json 格式保存
writer.json("./person.json");
// 默认保存 parquet 格式文件
writer.save("./person");
sc.close();
注
DataFrameWriter
输出逻辑类似 hadoop
存储,分 partition
进行存放
注
SQLContext
对象中存在 json
, parquet
, csv
等文件导入为 Dataset<Row>
对象的方法
- 数据导出到数据库
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<Person> personList = new ArrayList<>();
personList.add(new Person("zhangsan", 22));
personList.add(new Person("lisi", 40));
personList.add(new Person("zhangsan", 21));
personList.add(new Person("lisi", 23));
personList.add(new Person("wangwu", 60));
// 构建基础 Encoder 类
Encoder<Person> personEncoder = Encoders.bean(Person.class);
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sqlContext.createDataset(personList, personEncoder);
DataFrameWriter<Person> writer = personDataset.write();
String url = "jdbc:mysql://172.19.xxx.xxx:3306/sctest";
Properties properties = new Properties();
properties.put("user", "root");
properties.put("password", "passwd123");
properties.put("driver", "com.mysql.jdbc.Driver");
// 自动创建表结构
writer.jdbc(url, "person", properties);
String url1 = "jdbc:mysql://172.19.xxx.xxx:3306/sctest";
Properties properties1 = new Properties();
properties.put("user", "root");
properties.put("password", "passwd123");
properties.put("driver", "com.mysql.jdbc.Driver");
// 追加数据
writer.mode("append").jdbc(url1, "person", properties1);
sc.close();
-
HBase
读写
所有操作之前,需要在HBase
中,通过 Phoenix
建立表结构:
$ sqlline.py am.xxx.com
0: jdbc:phoenix:am.xxx.com> create table t_person (id BIGINT NOT NULL PRIMARY KEY, name VARCHAR, num INTEGER);
0: jdbc:phoenix:am.xxx.com> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | |
| | | T_PERSON | TABLE | | | | | | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-+
- 从
HBase
中读取
Dataset<Row> jdbc = sparkSession.read().format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("url", "jdbc:phoenix:am.xxx.com,an1.xxx.com,an2.xxx.com:2181:/hbase-unsecure")
.option("fetchsize", "500")
.option("dbtable", "t_person").load();
jdbc.show(5);
- 保存到
hbase
List<Person> personList = new ArrayList<>();
personList.add(new Person(1, "zhangsan", 22));
personList.add(new Person(2, "lisi", 40));
personList.add(new Person(3, "zhangsan", 21));
personList.add(new Person(4, "lisi", 23));
personList.add(new Person(5, "wangwu", 60));
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sparkSession.createDataset(personList, Encoders.bean(Person.class));
personDataset.printSchema();
// 写入
personDataset.write()
.format("org.apache.phoenix.spark")
.mode(SaveMode.Overwrite)
.option("table", "t_person")
.option("zkUrl", "am.xxx.com,an1.xxx.com,an2.xxx.com:2181:/hbase-unsecure")
.save();
- 自定义 UDF
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<Weather> weatherList = new ArrayList<>();
weatherList.add(new Weather("jinan", 22));
weatherList.add(new Weather("lijiang", 25));
weatherList.add(new Weather("hainan", 31));
weatherList.add(new Weather("shangtou", 23));
weatherList.add(new Weather("beijing", 60));
// 构建基础 Encoder 类
Encoder<Weather> weatherEncoder = Encoders.bean(Weather.class);
// 根据数组创建 Dataset 对象
Dataset<Weather> weatherDataset = sqlContext.createDataset(weatherList, weatherEncoder);
// 由 Dataset<Person> 转化为 Dataset<Row>
// 注 这里的顺序需要与 Person 类保持一致
Dataset<Row> weatherDatasetRow = weatherDataset.toDF("city", "degree");
// 注册临时表
weatherDatasetRow.createOrReplaceTempView("weather");
sqlContext.udf().register("CTOF", (UDF1<Integer, Double>) degreesCelcius -> ((degreesCelcius * 9.0 / 5.0) + 32.0), DataTypes.DoubleType);
sqlContext.sql("select city, CTOF(city) from weather").show();
sc.close();
注
对 writer
对象如果不设置模式,则会自动创建表结构 ( 表存在会报错 ) ,设置则按设置的模式进行操作