spark||flink||scala

本地测试Spark任务

2018-10-16  本文已影响381人  halfempty

1. 背景

在Linux下安装Ambari或者CDH并不复杂,但考虑到环境的维护、组件(尤其是Spark)版本的变更,以及测试数据的污染等因素,希望有一种解决方案能减弱这些困扰。

之所以选择本地执行:

2. 环境搭建

2.1 POM文件

在Intellij IDEA创建新的Maven Project,并配置pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.jmockit</groupId>
        <artifactId>jmockit</artifactId>
        <version>1.40</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testng</groupId>
        <artifactId>testng</artifactId>
        <version>6.14.3</version>
        <scope>test</scope>
    </dependency>
</dependencies>

这里使用了Spark 2.1.0版本,如果开发组件版本变更,将Maven源调整成对应版本即可。

2.2 调试

先来创建一下Sparksession

SparkSession session = SparkSession.builder()
                .appName("my local spark")
                .master("local[*]")
                .enableHiveSupport()
                .getOrCreate();

运行发现报错

18/10/15 15:43:30 ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:378)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:393)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:386)
at org.apache.hadoop.hive.conf.HiveConfConfVars.findHadoopBinary(HiveConf.java:2327)* ​ *at org.apache.hadoop.hive.conf.HiveConfConfVars.<clinit>(HiveConf.java:365)
at org.apache.hadoop.hive.conf.HiveConf.<clinit>(HiveConf.java:105)

根据错误日志的堆栈输出,将问题锁定在Shell类的getWinUtilsPath方法

public static final String getWinUtilsPath() {
        String winUtilsPath = null;

        try {
            if (WINDOWS) {
                winUtilsPath = getQualifiedBinPath("winutils.exe");
            }
        } catch (IOException var2) {
            LOG.error("Failed to locate the winutils binary in the hadoop binary path", var2);
        }

        return winUtilsPath;
    }

可以看到,在windows环境下,需要借助额外的winutils.exe工具(用来模拟hdfs文件操作,下载地下:https://github.com/srccodes/hadoop-common-2.2.0-bin

除此外,还需要配置hadoop.home.dir属性,指向winutils.exe工具所在目录

System.setProperty("hadoop.home.dir", "D:\\package\\hadoop");

2.3 HIVE配置

三个相关配置项:

3. SQL文件初始化

当使用Hive时,少不了表的初始化。如果在Linux上,直接执行hive -f xxx.sql即可实现批量创建。

然后上述方案并未提供hive指令,唯一知道可以执行sql语句的指令为SparkSession.sql方法,一次只能执行一条,且无法识别最后的;

没办法,既然没有现成的,就只好动手自己造呢。目的很明确,就是将sql文件解析成sql集合,再遍历执行即可

public class SqlReader {

    public static List<String> readSql(String sqlFile) {
        List<String> sqls = new ArrayList<String>();
        
        try {
            BufferedReader bufferedReader = new BufferedReader(
                    new InputStreamReader(
                            SqlReader.class.getClassLoader().getResourceAsStream(sqlFile), "utf-8"));

            StringBuilder sb = new StringBuilder();
            String line = null;
            boolean endFlag = false;
            while((line = bufferedReader.readLine()) != null) {
                String tmp = line.trim();
                if(tmp == "" || tmp.startsWith("--")) {
                    continue;
                } else {
                    if (tmp.endsWith(";")) {
                        sql = tmp.substring(0, tmp.length() - 1);
                        endFlag = true;
                    }
                    sb.append(sql).append(" ");
                }

                if (endFlag) {
                    sqls.add(sb.toString());
                    sb.setLength(0);
                    endFlag = false;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return sqls;
    }
}

4. 构造测试数据

二种方案供参考:

  1. 使用load data local inpath '/path/to/data' overwrite into table xxx [partition(xx='xx')]
  2. 当存在虚表dual(oracle叫法)时,可以使用insert overwrite table xxx select filed1, filed2, filed3, 1, 'a' from dual

小技巧:

5. 结果校验

这里不细展开,可以将结果转化成Array或者String,也可以通过Hash算法计算结果再比较

需要注意的是,返回结果的顺序可能错乱,无法与预期结果依次比对

上一篇下一篇

猜你喜欢

热点阅读