poolboy源码分析
github地址: https://github.com/devinus/poolboy
启动流程关键代码行
poolboy是一个gen_server, 启动过程与普通gen_server无异
- poolboy:start_link(PoolArgs, WorkArgs), PoolArgs 一般只需要填写 [{name,XX},{work_module, mworker}]即可,
WorkArgs与 mywork模块启动有关. - poolboy:init, 初始化任务等待队列 Waiting = queue:new(), 记录调用方任务状态私有ets,
基本worker数量size, 最大溢出worker数量max_overflow,启动worker监督进程,设置进程调度策略(fifo,lifo),
最后通知监督者去启动size个worker,加入到workers队列中, 至此poolboy, poolboy_sup, worker进程启动完毕.
工作流程关键代码行
-
要向worker发送一个工作任务,首先需要向poolboy获取一个空闲的worker pid,
poolboy:transaction(PoolName,Fun)
这里以一个简单的行为作为worker的任务, 例如打印调用方传来的Term,假设worker是一个gen_server进程,
则 Fun = fun(Worker,Term) -> gen_server:call(Worker,Term) end. -
poolboy:checkout(Pool, true, Timeout), 向poolboy进程申请一个Worker, 默认是阻塞获取,即如果没有空闲worker(workers数量不超过overflow则会继续申请进程,以此做到动态伸展),会一直等待,直至Timeout超时, 如果设成false, 当前poolboy无法返回worker,则马上返回full,获取失败.
-
gen_server:call(Pool, {checkout, CRef, Block}, Timeout), 向poolboy发送请求, CRef是代表调用进程的唯一标识, 用于poolboy记录并监控状态. 当调用方调度超时(未获取,会向poolboy发送{cancel_waiting,CRef}, poolboy根据CRef从monitors ets 中尝试找到 Pid 和 MRef,并作出取消操作(但这个逻辑分支执行的可能性很低,仅在poolboy返回workerpid 和 调用方等待超时发出cancel_waiting同时触发时发生).
-
成功获取Worker后,马上调用Fun(Worker), 结束后调用 poolboy:checkin(Pool, Worker), 把Worker放回workers, 放回过程中有以下判断; 如果waiting队列非空,则马上返回Worker到waiting的第一个任务; 如果waiting为空,而且溢出进程非0, 则把当前pid关闭,overflow - 1,以此做到动态收缩; 否则Worker进入wokers队列.
保障代码
-
monitors用于监控调用方状态, 如果调用方在发起调度后挂掉, poolboy会收到 {'DOWN', MRef, _, _, _} 消息, 执行后续清理工作.
-
通过observer:start(). 进程树可见, poolboy - poolboy_sup - worker1 .. workern, 而poolboy与每一个worker都有link, poolboy_sup对于worker的重启策略是 temporary, 即永不重启, 当进程停止时, poolboy会收到 {'EXIT',Pid,_Reason}消息,
执行与Pid相关的清理工作, 然后通知poolboy_sup启动新worker, 更新workers.
改进点
- workers的数量只能在一开始配置好, 后续不能直接线上调整参数对worker进行动态伸缩