Apache Drill原理持续学习

2020-02-03  本文已影响0人  分裂四人组

DrillBit介绍



UserServer+UserWorker

UserServer处理RUN_QUERY_VALUE客户端的查询请求,会将任务分派给UserWorker处理, 由worker提交工作:

  • 显然worker要在构造UserServer的时候也一起构造出来, 这样在收到任务的时候, 确保立即有工人接手这份工作;
  • UserServer的构造在ServiceEngine,而服务引擎是由DrillBit创建的.
  • UserWorker是由WorkerManager管理的, 而WorkerManager也是由DrillBit创建的.
    所以启动DrillBit服务后,参与计算的角色都已经准备好了.
public class UserWorker{
  private final WorkerBee bee;

  public QueryId submitWork(UserClientConnection connection, RunQuery query) {
    ThreadLocalRandom r = ThreadLocalRandom.current();
    // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence).  Last 12 bytes are random.
    long time = (int) (System.currentTimeMillis()/1000);
    long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
    long p2 = r.nextLong();
    QueryId id = QueryId.newBuilder().setPart1(p1).setPart2(p2).build();
    incrementer.increment(connection.getSession());
    Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query);
    bee.addNewForeman(foreman);
    return id;

Foreman

  • Foreman是由WorkerBee(工蜂)主动创建的;
  • Foreman作为一个独立进程,不是自己启动,而是要由工人来启动;

一次Query的生命周期

Drill 大神的这篇链接打不开了:http://tnachen.wordpress.com/2013/11/05/lifetime-of-a-query-in-drill-alpha-release/
只能看下别人翻译的文章;

Root fragment 会被提交给DrillBit上面的Worker manager 。中间fragment 保存在zookeeper中,所有的leaf fragment会直接通过BitCom(RPC层次的东西,协议是Protobuf )发送给其他DrillBits。
   Worker Manager一旦接受到Root Fragment ,就会运行这个plan,并且包含一个Screen Store ,用来阻塞,并且等待返回的数据。如果该plan需要另外多个DrillBit,这些DrillBit组成一个wire,Worker Manager也同时会包含一个exchange operator,该exchange operator启动了一个Receiver用以等待wire中的数据。
  在wire中,leaf fragment被发送给其他DrillBit并且执行。这些leaf fragment也会被转换成为由physical operator 组成的DAG。每一个Physical operator都会利用一个Pull 类型的消息机制,从树的底部开始,operator会从他的parent operator中pull 记录信息,而他的parent operator 则返回一个Outcome status消息。Operator被设计成能够处理每一个可能outcome status(STOP,OK,OK_WITH_NEW_SCHEMA,NONE),因为Drill支持动态schema,也就是说Drill允许在同一个数据集中schema发生变化,所以Drill要能够处理当schema发生变化时的情况,可以参考columnnar storage(http://the-paper-trail.org/blog/columnar-storage/),Drill同时实现了他自己的内存数据结构,我们称之为ValueVector,ValueVector是一组byte集合,代表了一个column内的数据。在每一个Physical operator pull的消息中会返回一个RecordBatch,一个RecordBatch中包含一个或者多个ValueVector。(一个column会包含一个或者多个ValueVector,同时还有schema信息)。

在文章的例子中(图中),leaf fragment的顶端是这个Scan operator,该Scan operator被设置成为查询Parquet file,并且通过Parquet storage engine运行。这个Storage engine的作用就是从数据源中拉取数据,把数据转换为ValueVector,然后将这些ValueVector作为RecordBatch传递回他的child 。
  最终,所有的Leaf fragment将会接管这些batch数据,通过Sender operator 发送给中间DrillBit。
  中间fragment 一旦第一次接受到一个RecordBatch,会从HazleCast中通过RecordBatch中保留的fragment id查询相应的fragment,并且设置Receiver以及必要的physical operator来继续在DrillBit中进行处理计算。
  中间Fragment包含一个Filtering operator,在这个Filtering operator内部,一旦他接收到一个RecordBatch,他就会查找新的schema,并且将schema传递给CodeGeneration,同时还会传递一个特殊定义的filter expression,type information,借此产生一段特殊的code来完成filter 操作。通过设计成避免casting,运行轻量级的loop,以及进行prefetching,来减少方法的调用,这种方式在Hive的新vectoried query engine(通过Stinger initiative)以及impala中很普遍。
  中间fragment最终会议batch为单元,一次发送一个batch给Root DrillBit,在Root DrillBit中会由Screen operator 来接收相关数据,并且返回给client。
  DrillClient接收RecordBatch,简单讲ValueVector转换成Rows并且显示给client。
  
Query查询的入口是:Foreman线程的run方法中的queryRequest方法;客户端输入的查询,会通过RPC在Foreman上执行
protobuffer文件的定义在drill-protocol/src/main/protobuf下,比如User.proto对应了UserProtos。

关键看下run()上面的注释.

Called by execution pool to do query setup, and kick off remote execution.
Note that completion of this function is not the end of the Foreman's role in the query's lifecycle.

https://tnachen.wordpress.com/2013/11/05/lifetime-of-a-query-in-drill-...
http://yangyoupeng-cn-fujitsu-com.iteye.com/blog/1974556

Client

参考

上一篇 下一篇

猜你喜欢

热点阅读