本地测试Spark任务
1. 背景
在Linux下安装Ambari
或者CDH
并不复杂,但考虑到环境的维护、组件(尤其是Spark)版本的变更,以及测试数据的污染等因素,希望有一种解决方案能减弱这些困扰。
之所以选择本地执行:
- 环境独享,不被他人干扰
- 使用Jmockit,实现局部自定义改造
- 结合Testng,方便单元测试用例编写与执行
- 甚至可以通过Intellij IDEA实现代码调试
2. 环境搭建
2.1 POM文件
在Intellij IDEA创建新的Maven Project,并配置pom.xml
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<version>1.40</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.3</version>
<scope>test</scope>
</dependency>
</dependencies>
这里使用了Spark 2.1.0版本,如果开发组件版本变更,将Maven源调整成对应版本即可。
2.2 调试
先来创建一下Sparksession
SparkSession session = SparkSession.builder()
.appName("my local spark")
.master("local[*]")
.enableHiveSupport()
.getOrCreate();
运行发现报错
18/10/15 15:43:30 ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:378)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:393)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:386)
at org.apache.hadoop.hive.conf.HiveConfConfVars.<clinit>(HiveConf.java:365)
at org.apache.hadoop.hive.conf.HiveConf.<clinit>(HiveConf.java:105)
根据错误日志的堆栈输出,将问题锁定在Shell
类的getWinUtilsPath
方法
public static final String getWinUtilsPath() {
String winUtilsPath = null;
try {
if (WINDOWS) {
winUtilsPath = getQualifiedBinPath("winutils.exe");
}
} catch (IOException var2) {
LOG.error("Failed to locate the winutils binary in the hadoop binary path", var2);
}
return winUtilsPath;
}
可以看到,在windows环境下,需要借助额外的winutils.exe
工具(用来模拟hdfs文件操作,下载地下:https://github.com/srccodes/hadoop-common-2.2.0-bin)
除此外,还需要配置hadoop.home.dir
属性,指向winutils.exe
工具所在目录
System.setProperty("hadoop.home.dir", "D:\\package\\hadoop");
2.3 HIVE配置
三个相关配置项:
- spark.sql.warehouse.dir
- 位于SQLConf.scala,覆盖HiveConf.class下的hive.metastore.warehouse.dir,默认值为当前路径下的“spark-warehouse”
- hive.metastore.uris
- 如果不填,默认在使用derby在本地创建元库(可以使用jdk自带的ij工具进行连接,但只支持单会话)
- 如果想连接远程元库,可以配置“thrift://ip:9083”
- spark.sql.shuffle.partitions
- 官方解释:The default number of partitions to use when shuffling data for joins or aggregations,它只针对spark sql的连接和聚合操作,默认值为200。本地测试的话,可以配置为1,减少文件数,从而提高处理速度。
- 注意与“spark.default.parallelism”的区别(Default number of partitions in RDDs returned by transformations like
join
,reduceByKey
, andparallelize
when not set by user.)
3. SQL文件初始化
当使用Hive时,少不了表的初始化。如果在Linux上,直接执行hive -f xxx.sql
即可实现批量创建。
然后上述方案并未提供hive指令,唯一知道可以执行sql语句的指令为SparkSession.sql方法,一次只能执行一条,且无法识别最后的;
没办法,既然没有现成的,就只好动手自己造呢。目的很明确,就是将sql文件解析成sql集合,再遍历执行即可
public class SqlReader {
public static List<String> readSql(String sqlFile) {
List<String> sqls = new ArrayList<String>();
try {
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(
SqlReader.class.getClassLoader().getResourceAsStream(sqlFile), "utf-8"));
StringBuilder sb = new StringBuilder();
String line = null;
boolean endFlag = false;
while((line = bufferedReader.readLine()) != null) {
String tmp = line.trim();
if(tmp == "" || tmp.startsWith("--")) {
continue;
} else {
if (tmp.endsWith(";")) {
sql = tmp.substring(0, tmp.length() - 1);
endFlag = true;
}
sb.append(sql).append(" ");
}
if (endFlag) {
sqls.add(sb.toString());
sb.setLength(0);
endFlag = false;
}
}
} catch (IOException e) {
e.printStackTrace();
}
return sqls;
}
}
4. 构造测试数据
二种方案供参考:
- 使用
load data local inpath '/path/to/data' overwrite into table xxx [partition(xx='xx')]
- 当存在虚表dual(oracle叫法)时,可以使用
insert overwrite table xxx select filed1, filed2, filed3, 1, 'a' from dual
小技巧:
- 直接调整“spark.sql.warehouse.dir”下hive表的文件内容,包括修改,复制等,相当于实现update操作(虽然hive不支持update),所以对hive表数据的操作转化为本地文件的操作。
- 如果Hive表带分区,通过上一条复制分区的操作将不被识别,因为复制的分区信息并没有写入到Hive的元库中(可以通过ij连接本地derby,查看PARTITIONS表)。如果非要这么操作也不是不可,执行
msck repaire table xxx
可以修复hive表的分区信息 - 然后有一个问题,即HDFS的CRC校验,当我们修改数据文件时,校验码将不匹配,需要删除对应文件的crc文件;或者选择关闭HDFS的CRC校验,
fileSystem.setVerifyChecksum(false)
5. 结果校验
这里不细展开,可以将结果转化成Array或者String,也可以通过Hash算法计算结果再比较
需要注意的是,返回结果的顺序可能错乱,无法与预期结果依次比对