presto架构和原理分析
架构
架构Presto查询引擎是一个Master-Slave的架构,由一个Coordinator节点,一个Discovery Server节点,多个Worker节点组成,
Discovery Server通常内嵌于Coordinator节点中。
Coordinator负责解析SQL语句,生成执行计划,分发执行任务给Worker节点执行。
Worker节点负责实际执行查询任务。
Worker节点启动后向Discovery Server服务注册,Coordinator从Discovery Server获得可以正常工作的Worker节点。如果配置了Hive Connector,需要配置一个Hive MetaStore服务为Presto提供Hive元信息,Worker节点与HDFS交互读取数据。
- 主从架构
- Coordinator
- Worker
- Discovery Server
执行过程
提交查询
用户使用Presto Cli提交一个查询语句后,Cli使用HTTP协议与Coordinator通信,Coordinator收到查询请求后调用SqlParser解析SQL语句得到Statement对象,并将Statement封装成一个QueryStarter对象放入线程池中等待执行。
SQL编译过程
sql编译过程逻辑执行计划
逻辑执行计划虚线就是Presto对逻辑执行计划的切分点
逻辑计划Plan生成的SubPlan分为四个部分,每一个SubPlan都会提交到一个或者多个Worker节点上执行
查询执行流程
查询执行流程-
1.Cli通过HTTP协议提交SQL查询之后,查询请求封装成一个SqlQueryExecution对象交给Coordinator的SqlQueryManager#queryExecutor线程池去执行
-
2.每个SqlQueryExecution线程(图中Q-X线程)启动后对查询请求的SQL进行语法解析和优化并最终生成多个Stage的SqlStageExecution任务,每个SqlStageExecution任务仍然交给同样的线程池去执行
-
3.每个SqlStageExecution线程(图中S-X线程)启动后每个Stage的任务按PlanDistribution属性构造一个或者多个RemoteTask通过HTTP协议分配给远端的Worker节点执行
-
4.Worker节点接收到RemoteTask请求之后,启动一个SqlTaskExecution线程(图中T-X线程)将这个任务的每个Split包装成一个PrioritizedSplitRunner任务(图中SR-X)交给Worker节点的TaskExecutor#executor线程池去执行
物理执行计划
物理执行计划SubPlan的几个属性
planDistribution 分发方式
3种不同的PlanDistribution方式:
Source表示这个SubPlan是数据源,Source类型的任务会按照数据源大小确定分配多少个节点进行执行;
Fixed表示这个SubPlan会分配固定的节点数进行执行(Config配置中的query.initial-hash-partitions参数配置,默认是8);
None表示这个SubPlan只分配到一个节点进行执行。
在下面的执行计划中,SubPlan1和SubPlan0 PlanDistribution=Source,这两个SubPlan都是提供数据源的节点,SubPlan1所有节点的读取数据都会发向SubPlan0的每一个节点;SubPlan2分配8个节点执行最终的聚合操作;SubPlan3只负责输出最后计算完成的数据。
outputPartitioning
OutputPartitioning属性只有两个值HASH和NONE,
表示这个SubPlan的输出是否按照partitionBy的key值对数据进行Shuffle。在下面的执行计划中只有SubPlan0的OutputPartitioning=HASH,所以SubPlan2接收到的数据是按照rank字段Partition后的数据
partitionBy
低延时查询核心技术
完全基于内存的并行计算
上述查询执行流程,均放在内存线程中执行
内存中的数据模型
数据模型Presto中处理的最小数据单元是一个Page对象,Page对象的数据结构如下图所示。
一个Page对象包含多个Block对象,每个Block对象是一个字节数组,存储一个字段的若干行。多个Block横切的一行是真实的一行数据。一个Page最大1MB,最多16*1024行数据。
流水线计算
流水线模型
流水线模型左侧是任务的执行流程图
Worker节点将最细粒度的任务封装成一个PrioritizedSplitRunner对象,放入pending split优先级队列中。每个
Worker节点启动一定数目的线程进行计算,线程数task.shard.max-threads=availableProcessors() * 4,在config中配置。
每个空闲的线程从队列中取出一个PrioritizedSplitRunner对象执行,如果执行完成一个周期,超过最大执行时间1秒钟,判断任务是否执行完成,如果完成,从allSplits队列中删除,如果没有,则放回pendingSplits队列中。
每个任务的执行流程如下图右侧,依次遍历所有Operator,尝试从上一个Operator取一个Page对象,如果取得的Page不为空,交给下一个Operator执行。
本地化计算
Presto在选择Source任务计算节点的时候,对于每一个Split,按下面的策略选择一些minCandidates
优先选择与Split同一个Host的Worker节点
如果节点不够优先选择与Split同一个Rack的Worker节点
如果节点还不够随机选择其他Rack的节点
对于所有Candidate节点,选择assignedSplits最少的节点。
动态编译执行计划
小心使用内存和数据结构
类BlinkDB的近似查询
引入了一些近似查询函数approx_avg、approx_distinct、approx_percentile
GC控制
参考链接
更多文章参见 微信号: life_361