数据仓库数据化运营之路数据挖掘

Hive SQL 解析及应用

2017-03-23  本文已影响4173人  haitaoyao

SQL 已经成为各家"数据公司"必不可少的数据查询语言. Hive 在其中的地位也更是显而易见, 大多数批处理任务还是在使用 Hive SQL 开发. 从Table 级别看, 一个 Hive SQL 文件, 包含了如下信息:

那么, 给定一个 Hive SQL 文件, 如何获取到这些信息呢? 别查了, 没有 API 可以一下搞出来, 我们要站在巨人的肩膀上开发了.

熟悉 Hive 的人都知道, Hive 是将 SQL 翻译成 MapReduce 任务执行, 细节可以参考美团技术博客文章: Hive SQL的编译过程, 写的非常好. 关于 SQL 解析的需求, 我们仅仅需要知道 Hive 是借助于 antlr 开发的 SQL 解析逻辑. 如果想拆解一个 SQL 文件, 就从 SQL -> ASTree 这个地方入手.

获取 ASTree

Hive 中的 hive-exec 模块是包含 SQL 解析模块的. 因此项目的 pom.xml 中加上.

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <classifier>core</classifier>
</dependency>

需要注意的是, <classifier>core</classifier> 必不可少, 因为 2.1.1 版本的 hive-exec 默认打包是将所有依赖塞进一个 fat jar 中, 一个 jar 35 MB 大小, 我们仅仅想拆个 SQL, 用不了这么多...

当然, 作为一个有洁癖的 Java 程序员, 不用的 jar 包全部 exclude 掉是一个好习惯, 如果你跟我一样, 可以参考这个 Gist (别怕, 直接 copy 过来就好)

    import org.apache.hadoop.hive.ql.parse.ParseDriver;
    // 创建 ParseDriver
    ParseDriver pd = new ParseDriver();
    ASTNode tree = pd.parse(sql);
    if (LOGGER.isDebugEnabled()) {
      // 这里可以将 ASTree 打印出来, 用于调试
      LOGGER.debug("start to analyze sql: {}, ASTNode: {}", sql, tree.dump());
    }
    while ((tree.getToken() == null) && (tree.getChildCount() > 0)) {
      tree = (ASTNode) tree.getChild(0);
    }

使用 Visitor 模式遍历 ASTree

Vistor 模式 是 ASTree 遍历时经常使用的方法.

// 这里初始化我们的 Visitor 逻辑
final NodeProcessor processor = new OurNodeProcessor();

Map<Rule, NodeProcessor> rules = Maps.newLinkedHashMap();
Dispatcher disp = new DefaultRuleDispatcher(processor, rules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
final List<Node> topNodes = Lists.newArrayList(tree);

// 发车, 开始遍历
ogw.startWalking(topNodes, null);

关键的时刻到来了, 终于要收工回家了:

// OurProcessor.java 
@Override
  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
    ASTNode pt = (ASTNode) nd;

    switch (pt.getToken().getType()) {
      case HiveParser.TOK_TAB:
        // 这里是 insert 语句
        final String insertedTable = BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0));
        break;
      case HiveParser.TOK_TABREF:
        // 这里是 FROM 语句后面的内容
        final String selectedTable = getTableName(pt);
        break;
      case HiveParser.TOK_DROPTABLE:
        // 这里是 DROP 语句
        final String droppedTable = getTableName(pt);
        break;
      case HiveParser.TOK_CREATETABLE:
        // 这里是 CREATE 语句
        final String createdTable = getTableName(pt);
        break;
      // 将上面获取到的各种类型的 Table Name 存储起来就 OK 了
  }
  return null;
}

完了么? 然而并没有. 还记得"大明湖畔"的 CTE(Common Table Expression)
所谓 CTE 就是说 SQL 可以写成这个模式:

WITH data1 AS (SELECT id, age FROM table1 WHERE id > 0),
data2 AS (SELECT id, name FROM table2 WHERE id > 0)

-- 通过 WITH 语句, 将复杂 SQL 拆解, 提高易读性
SELECT a.*, b.* FROM data1 JOIN data2 on a.id = b.id

上述 SQL 扔到刚才的代码中, 解析的 selected tables 会包含 data1data2, 显然这两个不是真正查询过的表, 因此, 在 OurProcessor.java 中, 需要添加 HiveParser.TOK_CTE 的解析, 并在 HiveParser.TOK_TABREF 中将解析到的 CTE 别名剔除.

想收工回家? 哪儿那么简单, 需求才刚刚开始

来看一个 Hive SQL 文件长什么样儿:

-- test_job.sql
-- set mapreduce 参数
set hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.dynamic.partition.mode=nostrick;
set hive.groupby.skewindata=true;

-- set 变量
set hivevar:ds=2016-01-01;
set hivevar:appid='app1';

-- 在临时库创建临时 Table, 存储结果
use tmpdb;
DROP TABLE IF EXISTS tmp_users;
CREATE TABBLE tmp_users AS SELECT a.*, b.* FROM userdb.users a JOIN userdb.user_data b on a.id = b.id WHERE a.update_time >= '${ds}' AND appid=${appid}

-- 切换到 prod db
USE etldb;
INSERT INTO daily_users SELECT * FROM tmp_users;

总结一下, 包含如下功能(坑):

因此, SQL 文件解析的步骤如下:

  1. 切 SQL 语句. 使用 ; 将文件中的语句切成独立的 SQL
  2. 识别 SET 语句
  1. 识别 USE 语句, 保存当前所在 database 的 context. 当遇到直接使用 Table 名而不是 db.表名 的时候添加当前 database 的名称
  2. 识别正式的 SQL 语句, 根据当前 context 中存储的变量替换 SQL 类似 ${variable_name} 字符串
  3. 执行 SQL 分析流程, 将结果保存

好了, 这下没问题了, 上述 SQL 文件, 我输出如下结果

{
  "createdTables": [
    "tmpdb.tmp_users"
  ],
  "droppedTables": [
    "tmpdb.tmp_users"
  ],
  "inputTables": [
    "userdb.users",
    "userdb.user_data"
  ],
  "insertedTables": [
    "etldb.daily_users"
  ]
}

不光埋头拉车, 也要抬头看路: SQL 解析后用来干嘛

回到实际问题, 随着业务的发展, ETL 过程也越来越复杂, 动不动几十个上百个 SQL 文件, 之间会有错综复杂的依赖关系, 如何维护执行关系, 保证任务按顺序执行成为越来越头疼的事情.

比如下图是一个 DAG 的一部分, 仅仅是一部分

一个 DAG 的一部分
顿时感觉需要一个新的职位: DAG 维护工程师, 专门负责维护任务 DAG, 简称 DagOps.

但是我们思考, 在同一个时间维度下(例如都是每天的任务):

回到上述 test_job.sql 文件, 我们仅仅需要保证 test_job.sql 必须在写入 userdb.usersuserdb.user_data 的任务后面执行, 并且需要读取etldb.daily_users 的任务必须在 test_job.sql 完成后面执行.

那么无论有多少个 SQL 文件, 仅仅需要调用我们的 SQL 分析服务, 就可以构建一个 DAG 自动生成程序, 自动编排 SQL 文件的执行顺序.

从此, DagOps 职位成为了江湖中的传说....

除了消灭 DagOps, 还能不能再进一步?

我们进一步想, 既然所有的 Hive SQL 代码都在上线前被分析过了, 那分析过程的"下脚料"能否利用一下?
回忆一下, 是不是有如下场景:

我们已经基于 Table 知道了数据任务之间的依赖关系, 进一步说, 也就知道 ETL 的 Table 之间的依赖关系, 如果关联上代码信息, 展现给用户呢?

因此我们开发了数据血缘关系功能, 如下图:


数据血缘关系

数据的来龙去脉一目了然. 并且在 adhoc 查询工具中, 你写的每个 SQL 都会被分析, 画出一个类似上面的血缘图, 清晰知道自己的 SQL 语句查询的数据的来龙去脉.

总结

我们"拆"了 SQL,

还能攒点儿什么东西呢? 要再想一想.

上一篇 下一篇

猜你喜欢

热点阅读