hive

flamy graph源码分析

2017-08-29  本文已影响64人  Choooooper

hql的执行过程

hive在执行用户给定的hql时,会经过如下步骤:

  1. 语法解析

Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象 语法树AST Tree;

  1. 语义解析

遍历AST Tree,抽象出查询的基本组成单元QueryBlock;
生成逻辑执行计划:遍历QueryBlock,翻译为执行操作树OperatorTree;

  1. 优化逻辑执行计划

逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量;

  1. 生成物理执行计划

遍历OperatorTree,翻译为MapReduce任务;

  1. 优化物理执行计划

物理层优化器进行MapReduce任务的变换,生成最终的执行计划;

Hive SQL的编译过程

hive sql是如何被编译成mapreduce的,参考https://tech.meituan.com/hive-sql-to-mapreduce.html

使用hive API获取hql的执行计划、lineage关系

参考http://lxw1234.com/archives/2015/09/476.htm

最后也给了一个获取hql的lineage关系的小示例,

flamy hive graph

flamy是hive管理工具,它提供的一个功能就是根据给定的hql绘制会各个表的关系依赖graph。

flamy-demo提供了flamy的使用示例,这里只展示graph相关的功能使用。

cd flamy-demo
tree conf
-----------------
conf
├── flamy.properties
└── log4j2.properties

在conf文件夹下,flamy.properties是flamy的配置文件。配置了本地hsql文件的存放位置,已经hive连接相关的属性

cat flamy.properties
---------------------
### Flaminem project

flamy.model.dir.paths = model

flamy.variables.path = ${flamy.model.dir.paths}/VARIABLES.properties

flamy.env.model.hive.presets.path = ${flamy.model.dir.paths}/model_PRESETS.hql

flamy.env.local.hive.presets.path = ${flamy.model.dir.paths}/model_PRESETS.hql
flamy.env.local.hive.meta.fetcher.type = client
flamy.env.local.hive.metastore.uri = "thrift://localhost:9083"
flamy.env.local.hive.server.uri   = "localhost:10000"

flamy.model.dir.paths 属性就是配置了model路径,示例中就是解析这些hql文件,生成对应的graph

tree model
-----------------------
model
├── model_PRESETS.hql
├── nasa
│   ├── facts.db
│   │   └── http_status
│   │       └── CREATE.hql
│   ├── nasa_access.db
│   │   ├── daily_logs
│   │   │   ├── CREATE.hql
│   │   │   └── POPULATE.hql
│   │   ├── daily_url_error_rates
│   │   │   ├── CREATE.hql
│   │   │   └── POPULATE.hql
│   │   ├── daily_urls
│   │   │   ├── CREATE.hql
│   │   │   └── POPULATE.hql
│   │   └── daily_urls_with_error
│   │       ├── CREATE.hql
│   │       └── POPULATE.hql
│   └── nasa_access_import.db
│       ├── daily_logs
│       │   ├── CREATE.hql
│       │   └── POPULATE.hql
│       └── raw_data
│           └── CREATE.hql
└── VARIABLES.properties

flamy的安装与配置可以参考flamy的文档

运行demo.sh可以运行示例,并输入show graph可以得到如下输出

这就是对model文件夹下的hql之间的依赖关系图。

接下来就看看flamy是如何实现这个功能的。

flamy graph生成源码分析

先将flamy源码导入到idea中。

因为我们上面的运行入口是demo.sh,所以先看看demo.sh做了哪些工作

cat demo.sh
------------------------
#!/bin/bash

if [[ ! -f ${FLAMY_HOME}/bin/flamy ]]; then
  echo 'Could not find ${FLAMY_HOME}/bin/flamy executable. Please make sure the environment variable FLAMY_HOME is correctly set.'
fi

$FLAMY_HOME/bin/flamy --config-file conf/flamy.properties shell

可以看到,他就是将配置文件传给了$FLAMY_HOME/bin/flamy脚本,那就看看flamy又做了什么操作

cat $FLAMY_HOME/bin/flamy
-----------------------
#!/bin/bash
# Reference: http://stackoverflow.com/questions/59895/can-a-bash-script-tell-what-directory-its-stored-in
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

ARGS=("$@")
FLAMY_JAVA_OPTIONS="-XX:MaxPermSize=512M -XX:+CMSClassUnloadingEnabled"
RUN="java ${FLAMY_JAVA_OPTIONS} ${FLAMY_EXTRA_JAVA_OPTIONS} -cp $DIR/../lib/*:$DIR/../conf:$DIR/../additional_jars/* com.flaminem.flamy.Launcher"

exec ${RUN} "${ARGS[@]}"

flamy脚本配置了flamy运行时需要的jvm参数,和依赖jar包的拷贝,然后运行的main函数为com.flaminem.flamy.Launcher,并讲参数都传给了它。所以,我们的源码入口为com.flaminem.flamy.Launcher

Launcher

object Launcher的main函数如下:

def main(args: Array[String]): Unit = {
    // 初始化输出相关
    FlamyOutput.init()
    val returnStatus =
      try {
        // 以shell模式开启了一个Launcher class并启动
        new Launcher(args, withShell = true).launch()
      }
      finally{
        FlamyOutput.shutdown()
      }
    System.exit(returnStatus.exitCode)
  }

函数内新建了一个Launcher class并将参数传给它启动。

def launch(globalOptions: FlamyGlobalOptions = new FlamyGlobalOptions()): ReturnStatus = {
    /* This is necessary to avoid collisions with other running instances of flamy */
    try {
        // other code
        unsafeLaunch(globalOptions)
    }
    // other code
  }

Launcher.launch()方法调用了unsafeLaunch方法,而unsafeLaunch方法则是根据不同的Commands进行不同的处理。

private def unsafeLaunch(rootGlobalOptions: FlamyGlobalOptions): ReturnStatus = {
    if(opts.optException.isDefined) {
      handleException(opts.optException.get)
    }

    /* This "if" cannot be moved inside the "match case" because opts.subcommands is not defined when optError is defined */
    if(opts.optError.isDefined) {
      opts.optError.get
    }
    else {
      opts.subcommands match {
        // other cases

        case (command: FlamySubcommand) :: subCommands =>
          val res: ReturnStatus = command.doCommand(rootGlobalOptions.overrideWith(opts.globalOptions), subCommands)
          stats = command.stats.orNull
          res
    }
  }

对于是FlamySubcommand类型的command,直接调用doCommand执行命令。

现在看看flamy定义了哪些Commands类型

class Commands(args: Seq[String]) extends Options(args) {
    val show = new commands.Show
    val diff = new commands.Diff
    val describe = new commands.Describe
    val check = new commands.Check
    val run = new commands.Run
    val push = new commands.Push
    val repair = new commands.Repair
    val count = new commands.Count
    val waitForPartition = new commands.WaitForPartition
    val gatherInfo = new commands.GatherInfo
  }

因为show graph是查看hql的graph关系的,所以我们只看commands.Show

查看com.flaminem.flamy.commands.Show,它也定义一些子命令

val conf =  new Subcommand("conf") ...
val schemas = new Subcommand("schemas") ...
val tables = new Subcommand("tables") ...
val partitions = new Subcommand("partitions") ...
val graph = new ShowGraph
val select = new Subcommand("select") ...

Show的doCommand方法如下:

 override def doCommand(globalOptions: FlamyGlobalOptions, subCommands: List[ScallopConf]): ReturnStatus = {
    subCommands match {
      case  (command: FlamySubcommand)::Nil => command.doCommand(globalOptions, Nil)
      // other cases
    }
  }

也是直接调用了子命令的doCommand,所以接下来看ShowGraph.doCommand所做的处理。

override def doCommand(globalOptions: FlamyGlobalOptions, subCommands: List[ScallopConf]): ReturnStatus = {
    //初始化了flamy上下文
    val context = new FlamyContext(globalOptions)
    // 处理hql
    showGraph(context)
    ReturnSuccess
  }

所以这个graph的处理就是在showGraph方法中

private def showGraph(context: FlamyContext): Unit = {
    val model: Model =
      if(complete()){
        Model.getCompleteModel(context, Nil)
      }
      else{
        Model.getIncompleteModel(context, Nil)
      }
    val g: TableGraph = TableGraph(model).applySkipViews.applyFilter

    val graphDir: String = FlamyGlobalContext.RUN_DIR.getProperty + "/result"
    context.getLocalFileSystem.fileSystem.mkdirs(new Path(graphDir))

    val graphPath = makeGraphPath(graphDir, items())

    val lightPath = s"${graphPath}_light.png"
    val fullPath = s"$graphPath.png"

    if(schemaOnly()){
      g.export.toSchemaPng(graphPath)
      println(
        f"""graph printed at :
           |   ${FilePath(fullPath)}
           """.stripMargin
      )
      openGraph(fullPath)
    }
    else {
      g.export.toLightPng(graphPath + "_light")
      g.export.toFullPng(graphPath)
      println(
        f"""graphs printed at :
           |   ${FilePath(fullPath)}
           |   ${FilePath(lightPath)}
           """.stripMargin
      )
      openGraph(lightPath, fullPath)
    }
  }

从代码中可以看出来,showGraph有三个步骤:

  1. 获取hql的模型
  2. 根据模型获取TableGraph
  3. 输出graph

接下来逐步分析。

model的获取

以为第一次运行,model的获取走的是else,所以看Model.getIncompleteModel,源码如下

def getIncompleteModel(context: FlamyContext, items:Iterable[ItemName]): IncompleteModel = {
    val index = context.getFileIndex.strictFilter(items).get
    new IncompleteModelFactory(context).generateModel(index)
  }

FlamyContext中定义了一个FileIndex的字段,用来获取配置的model.dir.paths下的文件位置

private lazy val fileIndex: FileIndex = FileIndex(this)

然后再看看FileIndex是如何解析的

object FileIndex {

  def apply(context: FlamyContext): FileIndex = {
    val index: Index = new Index()
    // flamy.model.dir.paths配置的路径
    val dbPaths = context.modelDirs
    // dbPaths 所有文件
    val files: FileList = FileUtils.listItemFiles(dbPaths)

    val schemaFiles = new mutable.HashMap[SchemaName, SchemaFile]()
    for {
    // 遍历所有.db文件夹
      dbDir: File <- listDatabaseDirectories(dbPaths)
    } {
      val schemaFile = new MissingSchemaFile(dbDir)
      schemaFiles += schemaFile.schemaName -> schemaFile
    }

    for {
      file <- files
      fileType <- FileType.getTypeFromFileName(file.getName)
    } {
      if (TableFile.VALID_FILE_TYPES.contains(fileType)) {
        val tableFile: TableFile = new ExistingTableFile(file, fileType)
        index.addFile(tableFile)
      }
      else if (SchemaFile.VALID_FILE_TYPES.contains(fileType)) {
        val schemaFile: SchemaFile = new ExistingSchemaFile(file)
        schemaFiles += schemaFile.schemaName -> schemaFile
      }
    }
    schemaFiles.values.foreach {
      index.addFile
    }
    new FileIndex(index)
  }

}

FileIndex就是根据model下的文件路径进行了解析,然后添加到FileIndex中。

flamy中运行定义的FileType

object FileType {
    // CREATE.hql
  case object CREATE extends FileType {
    override val filePrefix: String = "CREATE"
    override val fileExtension: String = "hql"
    override val multipleFilesAllowed: Boolean = false
  }
    // VIEW.hql
  case object VIEW extends FileType {
    override val filePrefix: String = "VIEW"
    override val fileExtension: String = "hql"
    override val multipleFilesAllowed: Boolean = false
  }
    //POPULATE.hql
  case object POPULATE extends FileType {
    override val filePrefix: String = "POPULATE"
    override val fileExtension: String = "hql"
    override val multipleFilesAllowed: Boolean = true
  }
    //TEST.hql
  case object TEST extends FileType {
    override val filePrefix: String = "TEST"
    override val fileExtension: String = "hql"
    override val multipleFilesAllowed: Boolean = true
  }
    //META.properties
  case object META extends FileType {
    override val filePrefix: String = "META"
    override val fileExtension: String = "properties"
    override val multipleFilesAllowed: Boolean = false
  }
    //CREATE_SCHEMA.hql
  case object CREATE_SCHEMA extends FileType {
    override val filePrefix: String = "CREATE_SCHEMA"
    override val fileExtension: String = "hql"
    override val multipleFilesAllowed: Boolean = false
  }
    //PRESETS.hql
  case object PRESETS extends FileType {
    override val filePrefix: String = "PRESETS"
    override val fileExtension: String = "hql"
    override val multipleFilesAllowed: Boolean = false
  }
 }

在之前,我们看到了model文件夹下的hql文件有CREATE,POPULATE类型的。所以这里会被ExistingTableFile所封装。

因为getIncompleteModel的items参数为nil,所以val index = context.getFileIndex.strictFilter(items).get没有过滤。

然后就是调用IncompleteModelFactory.generateModel(index)生成模型操作了。

// 生成模型
  def generateModel(fileIndex: FileIndex): IncompleteModel = {
    logger.info("Generating model")
    mergeableTableInfoSet = MergeableTableInfoCollection()
    //获取所有CREATE.hql文件
    val creates = fileIndex.getAllTableFilesOfType(FileType.CREATE)
    val views = fileIndex.getAllTableFilesOfType(FileType.VIEW)
    // 获取所有POPULATE.hql文件
    val populates = fileIndex.getAllTableFilesOfType(FileType.POPULATE)
    // 获取所有META.properties文件
    val metas = fileIndex.getAllTableFilesOfType(FileType.META)
    // 解析create语句
    runner.run(analyzeCreate(_: TableFile), creates)
    // 解析查询语句
    runner.run(analyzePopulate(_: TableFile), populates ++ views)
    runner.run(analyzeMeta(_: TableFile), metas)

    val stats: FileRunner#Stats = runner.getStats
    FlamyOutput.out.info(stats.format("analyzed"))

    if (stats.getFailCount > 0) {
      throw new FlamyException("Interrupting command, some file were not validated.")
    }
    // 加入所有未被处理的hql文件
    mergeableTableInfoSet ++= getMissingTables(fileIndex)

    val result = new IncompleteModel(mergeableTableInfoSet.toTableInfoCollection, fileIndex)
    logger.info("model generated")
    result
  }

generateModel先获取model文件夹下所有的hql文件,然后对不同类型的hql文件进行分析,这里先看analyzeCreate

model\nasa\nasa_access_import.db\daily_logs\CREATE.hql为例

-- DROP TABLE IF EXISTS nasa_access_import.daily_logs ;
CREATE TABLE nasa_access_import.daily_logs(
  source_ip STRING,
  source_url STRING,
  time TIMESTAMP ,
  action STRING,
  url STRING,
  protocol STRING,
  response_code INT,
  size INT,
  line STRING
)
PARTITIONED BY (day STRING)
STORED AS ORC
;

analyzeCreate方法代码如下:

// 这里是获取create语句
  private def analyzeCreate(tableFile: TableFile) {
    //解析创建的table
    val table: Table = CreateTableParser.parseText(tableFile.text)(context)
    checkName(table, tableFile)
    mergeableTableInfoSet += TableInfo(table)
  }

首先调用CreateTableParser.parseText解析create语句

CreateTableParser object内容如下:

object CreateTableParser {

  @throws(classOf[SemanticException])
  @throws(classOf[ParseException])
  @throws(classOf[IOException])
  def parseQuery(query: String)(implicit context: FlamyContext): Table = {
    val cti: CreateTableInfo = new CreateTableInfo(context)
    cti.parse(query)
  }

  @throws(classOf[SemanticException])
  @throws(classOf[ParseException])
  @throws(classOf[IOException])
  def parseText(text: String)(implicit context: FlamyContext): Table = {
    // 获取查询语句,用的是正则
    val queries: Seq[String] = QueryUtils.cleanAndSplitQuery(text)
    if (queries.size != 1) {
      throw new RuntimeException("More than 1 query parsed")
    }
    parseQuery(queries.iterator.next)
  }
}

只有两个函数:

parseText只是将提供的文本进行了注释的去除和用正则表达式按;分割文本。

parseQuery就是处理单条查询语句了。在parseQuery语句中新建了一个CreateTableInfo对象。

CreateTableInfo实现了org.apache.hadoop.hive.ql.lib.NodeProcessor接口,该接口是hive解析hql的操作接口。

/**
 * Base class for processing operators which is no-op. The specific processors
 * can register their own context with the dispatcher.
 */
public interface NodeProcessor {

 
  Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
      Object... nodeOutputs) throws SemanticException;
}

parseQuery调用了CreateTableInfo.parse方法

@throws(classOf[ParseException])
@throws(classOf[SemanticException])
@throws(classOf[IOException])
//noinspection ScalaStyle
def parse(query: String): Table = {
    // 这里调用了hive提供的ParseDriver,进行句法分析
    val pd: ParseDriver = new ParseDriver
    // 获取语法树
    val completeTree = pd.parse(query, ModelHiveContext.getLightContext(context).hiveContext)
    var tree: ASTNode = completeTree
    while ((tree.getToken == null) && (tree.getChildCount > 0)) {
      tree = tree.getChild(0)
    }
    table = null
    columns.clear()
    partitions.clear()
    val rules: util.Map[Rule, NodeProcessor] = new util.LinkedHashMap[Rule, NodeProcessor]
    val disp: Dispatcher = new DefaultRuleDispatcher(this, rules, null)
    val ogw: GraphWalker = new DefaultGraphWalker(disp)
    val topNodes: util.ArrayList[Node] = new util.ArrayList[Node]
    topNodes.add(tree)
    ogw.startWalking(topNodes, null)
    if (table == null) {
      throw new UnexpectedBehaviorException("Could not parse this AST:" + HiveParserUtils.drawTree(completeTree))
    }
    table.columns = columns
    table.partitions = partitions
    table
  }

在函数的第一行就初始化了org.apache.hadoop.hive.ql.parse.ParseDriver,ParseDriver提供了语法树的解析生成功能,所以调用ParseDriver.parse可以获取当前查询语句的语法树。

获取到语法树后,就可以遍历语法树进行处理了。

之后又实例化了DefaultRuleDispatcherDefaultGraphWalker,最后调用DefaultGraphWalker.startWalking对语法进行遍历。

public void startWalking(Collection<Node> startNodes,
      HashMap<Node, Object> nodeOutput) throws SemanticException {
    toWalk.addAll(startNodes);
    while (toWalk.size() > 0) {
      Node nd = toWalk.remove(0);
      walk(nd);
      if (nodeOutput != null) {
        nodeOutput.put(nd, retMap.get(nd));
      }
    }
  }

对每个节点调用walk方法。

public void walk(Node nd) throws SemanticException {
    if (opStack.empty() || nd != opStack.peek()) {
      opStack.push(nd);
    }

    if ((nd.getChildren() == null)
        || getDispatchedList().containsAll(nd.getChildren())) {
      // all children are done or no need to walk the children
      if (!getDispatchedList().contains(nd)) {
        dispatch(nd, opStack);
      }
      opStack.pop();
      return;
    }
    // add children, self to the front of the queue in that order
    getToWalk().add(0, nd);
    getToWalk().removeAll(nd.getChildren());
    getToWalk().addAll(0, nd.getChildren());
  }
}

显示判断当前节点以及其子节点是否被遍历过,若子节点已经被遍历,自己没有被遍历,则调用dispatch进行处理。这里可以看出,DefaultGraphWalker的遍历方式是深度优先遍历。

public void dispatch(Node nd, Stack<Node> ndStack) throws SemanticException {
    dispatchAndReturn(nd, ndStack);
 }
public <T> T dispatchAndReturn(Node nd, Stack<Node> ndStack) throws SemanticException {
    Object[] nodeOutputs = null;
    if (nd.getChildren() != null) {
      nodeOutputs = new Object[nd.getChildren().size()];
      int i = 0;
      for (Node child : nd.getChildren()) {
        nodeOutputs[i++] = retMap.get(child);
      }
    }

    Object retVal = dispatcher.dispatch(nd, ndStack, nodeOutputs);
    retMap.put(nd, retVal);
    return (T) retVal;
  } 

先获取当前节点的子节点,保存在nodeOutputs数组中,然后调用dispatcher.dispatch方法。这里的dispatcher是CreateTableInfo实例的引用,所以看CreateTableInfo的process方法。

@throws(classOf[SemanticException])
  def process(pt: Node, stack: util.Stack[Node], procCtx: NodeProcessorCtx, nodeOutputs: AnyRef*): AnyRef = {
    pt.getToken.getType match {
      case HiveParser.TOK_CREATETABLE =>
        table = HiveParserUtils.getTable(TableType.TABLE, pt.getChild(0))
      case HiveParser.TOK_TABCOLLIST =>
        pt.getParent.getType match {
          case HiveParser.TOK_CREATETABLE =>
            columns.addAll(HiveParserUtils.getColumns(pt))
          case HiveParser.TOK_TABLEPARTCOLS =>
            partitions.addAll(HiveParserUtils.getColumns(pt).map {
              new PartitionKey(_)
            })
          case _ =>
        }
      case _ =>
    }
    null
  }

可以看到,CreateTableInfo只对三种类型做了处理

  1. 类型为创建table的TOK_CREATETABLE
  2. 类型为TOK_TABCOLLIST
  3. TOK_TABLEPARTCOLS

最后将遍历的节点保存在retMap中。

所以在遍历中CreateTableInfo只保存了table, columns, partitions信息。

analyzeCreate的工作就是分析了CREATE.hql中创建table的语句

analyzePopulate函数就是分析了POPULATE.hql中查询的语句了。

这里看它具体是如何处理的。

private def analyzePopulate(tableFile: TableFile) {
    // 获取所有源表与目标表的依赖关系
    val tables: MergeableTableDependencyCollection = PopulatePreParser.parseText(tableFile.text)
    mergeableTableInfoSet ++= tables.map{TableInfo(_, tableFile)}
    tables.foreach{table => checkName(table, tableFile)}
  }

可以看出,只要是获取查询语句各个table的依赖关系。调用PopulatePreParser.parseText方法获取。

def parseText(text: String): MergeableTableDependencyCollection = {
    // 获取所有查询语句
    val queries: Seq[String] = QueryUtils.cleanAndSplitQuery(text).map{emptyStrings}
    queries.flatMap{parseQuery}.toMergeableTableDependencyCollection
  }

同样是先获取查询语句,然后再对每条查询语句调用parseQuery进行处理。

// 处理查询语句
  private def parseQuery(query: String): MergeableTableDependencyCollection = {
    // 获取Common Table Expression tables
    val cteTables: Set[String] = findCTEs(query).map{_.toLowerCase}.toSet
    logger.debug(s"CTE founds: $cteTables")
    // 获取所有目标tables
    val destTables =
      getTableNamesFromRegex(destRE,query).flatMap{ nameToTableDep(_,TableType.TABLE,Set()) }++
      getTableNamesFromRegex(viewRE,query).flatMap{ nameToTableDep(_,TableType.VIEW,Set()) }
    // 获取所有source tables
    val srcTables = findSourceTables(query).flatMap{ nameToTableDep(_,TableType.REF, cteTables) }
    // 建立起依赖关系
    destTables.foreach{ t => t.tableDeps = new TableDependencyCollection(srcTables)}
    destTables.toMergeableTableDependencyCollection
  }

显而易见,flamy的parseQuery先是获取destTables,srcTables,然后建立起他们之间的关系。

所有tables的获取都是通过正则表达式匹配查询语句来获取的。由TableDependency保存。

一个TableDependency保存了一个table所有的结构信息以及它的依赖信息。

得到tables,和tables之间的依赖信息,然后再实例化一个IncompleteModel对象并返回,就完成了generateModel的处理。

也及时说showGraph就完成了Model的获取,接下来就是根据model生成tableGraph。

TableGraph建立了tables之间的依赖图

def apply(model: Model): TableGraph = {
    var graph: Graph[TableName, DiEdge] = Graph[TableName, DiEdge]()

    model.tables.foreach{
      td: TableInfo =>
        graph += TableName(td.fullName)
        graph ++= td.tableDeps.map{t => TableName(t.fullName) ~> TableName(td.fullName)}
    }
    val set1: Set[TableName] = model.fileIndex.getTableNames
    val set2: Set[TableName] = graph.getNodeSeq.toSet
    assert(set1.forall{set2.contains}, s"Please report a bug: the following tables are in the fileIndex and not in the graph: ${set1.diff(set2)}")

    new TableGraph(model, graph)
  }

然后就是输出graph,它是将graph转化为dot语言,然后根据Graphviz将dot转化为png图片文件。

flamy的show graph命令的执行逻辑就完毕了。

通过hive api获取Lineage信息

通过对flamy获取表创建语句的表信息,是通过ParseDriver来进行解析的,这里简单写一个例子

/**
 * @(#)MyLineageInfo.java, 2017/8/28.
 * <p/>
 * Copyright 2017 Netease, Inc. All rights reserved.
 * NETEASE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 */
package org.jjzhu.hive;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.TreeSet;

import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.jjzhu.hive.util.HiveParserUtil;

/**
 * @author 祝佳俊(hzzhujiajun@corp.netease.com)
 */
public class MyLineageInfo implements NodeProcessor {
    /**
     * Stores input tables in sql.
     */
    private TreeSet<String> inputTableList = new TreeSet<>();
    /**
     * Stores output tables in sql.
     */
    private TreeSet<String> outputTableList = new TreeSet<>();

    private Set<String> columns = new TreeSet<>();

    private Set<String> partitions = new TreeSet<>();

    public Set<String> getColumns() {
        return columns;
    }

    public TreeSet getInputTableList() {
        return inputTableList;
    }

    public TreeSet getOutputTableList() {
        return outputTableList;
    }

    public Set<String> getPartitions() {
        return partitions;
    }

    /**
     * Implements the process method for the NodeProcessor interface.
     */
    public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
                          Object... nodeOutputs) throws SemanticException {
        ASTNode pt = (ASTNode) nd;

        switch (pt.getToken().getType()) {

            case HiveParser.TOK_CREATETABLE:
            case HiveParser.TOK_TAB:
                outputTableList.add(BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0)));
                break;
            case HiveParser.TOK_TABCOLLIST:
                switch (pt.getParent().getType()){
                    case HiveParser.TOK_CREATETABLE:
                        columns.addAll(HiveParserUtil.getColumns(pt));
                        break;
                    case HiveParser.TOK_TABLEPARTCOLS:
                        partitions.addAll(HiveParserUtil.getColumns(pt));
                        break;
                }
                break;
            case HiveParser.TOK_TABREF:
                ASTNode tabTree = (ASTNode) pt.getChild(0);
                String table_name = (tabTree.getChildCount() == 1) ?
                        BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) :
                        BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) + "." + tabTree.getChild(1);
                inputTableList.add(table_name);
                break;
        }
        return null;
    }

    /**
     * parses given query and gets the lineage info.
     *
     * @param query 查询语句
     * @throws ParseException exception
     */
    public void getLineageInfo(String query) throws ParseException,
            SemanticException {

        /*
         * Get the AST tree
         */
        ParseDriver pd = new ParseDriver();
        ASTNode tree = pd.parse(query);

        while ((tree.getToken() == null) && (tree.getChildCount() > 0)) {
            tree = (ASTNode) tree.getChild(0);
        }

        /*
         * initialize Event Processor and dispatcher.
         */
        inputTableList.clear();
        outputTableList.clear();
        partitions.clear();
        columns.clear();
        // create a walker which walks the tree in a DFS manner while maintaining
        // the operator stack. The dispatcher
        // generates the plan from the operator tree
        Map<Rule, NodeProcessor> rules = new LinkedHashMap<>();

        // The dispatcher fires the processor corresponding to the closest matching
        // rule and passes the context along
        Dispatcher disp = new DefaultRuleDispatcher(this, rules, null);
        GraphWalker ogw = new DefaultGraphWalker(disp);

        // Create a list of topop nodes
        ArrayList<Node> topNodes = new ArrayList<>();
        topNodes.add(tree);
        ogw.startWalking(topNodes, null);
    }

    public static void main(String[] args) throws IOException, ParseException,
            SemanticException {
        String query = "INSERT OVERWRITE TABLE hzzhujiajun.table1 SELECT a.name FROM hzzhujiajun.table2 a join hzzhujiajun.table3 b ON (a.id = b.id)";
        String query2 = "CREATE EXTERNAL TABLE facts.http_status (\n" +
                "  code INT,\n" +
                "  status_group STRING,\n" +
                "  message STRING,\n" +
                "  description STRING\n" +
                ")\n" +
                "ROW FORMAT DELIMITED \n" +
                "FIELDS TERMINATED BY '\\t'\n" +
                "LINES TERMINATED BY '\\n'\n" +
                "STORED AS TEXTFILE\n" +
                "LOCATION \"${EXTERNAL_DATA_LOCATION}/facts.db/http_status\"\n";
        String query3 = "CREATE TABLE nasa_access.daily_logs(\n" +
                "  source_ip STRING,\n" +
                "  source_url STRING,\n" +
                "  time TIMESTAMP ,\n" +
                "  action STRING,\n" +
                "  url STRING,\n" +
                "  size INT,\n" +
                "  line STRING\n" +
                ")\n" +
                "PARTITIONED BY (day STRING)\n" +
                "STORED AS ORC";
        MyLineageInfo lep = new MyLineageInfo();
        lep.getLineageInfo(query);
        HiveParserUtil.output(lep);
        System.out.println("------------------------------------------");
        lep.getLineageInfo(query2);
        HiveParserUtil.output(lep);

        System.out.println("------------------------------------------");
        lep.getLineageInfo(query3);
        HiveParserUtil.output(lep);
    }
}

运行得到的输出

Input tables = [hzzhujiajun.table2, hzzhujiajun.table3]
Output tables = [hzzhujiajun.table1]
columns = []
partitions columns:[]
------------------------------------------
Input tables = []
Output tables = [facts.http_status]
columns = [code, description, message, status_group]
partitions columns:[]
------------------------------------------
Input tables = []
Output tables = [nasa_access.daily_logs]
columns = [action, line, size, source_ip, source_url, time, url]
partitions columns:[day]
上一篇下一篇

猜你喜欢

热点阅读