Optimus论文源码阅读
代码地址
https://github.com/pengyanghua/optimus
论文思路
《Optimus: An Efficient Dynamic Resource Scheduler for Deep Learning Clusters》阅读笔记
目标
通过动态分配集群资源,减少所有任务训练总时间
结论
本文在Kubernetes上实现了Optimus,并在一个有7个CPU服务器和6个GPU服务器的深度学习集群上进行了实验,使用MXNet框架运行了9个训练任务。结果表明,Optimus在作业完成时间和完成时间方面分别比典型的集群调度器高出139%和63%。
背景知识和假设
分布式训练 (MXNet [59], Tensor- Flow [23], PaddlePaddle [17], Angel [43], Petuum [67])
- Ps(paramter server):参数服务,接受参数,更新参数,下发参数
-
W(worker):训练节点,计算梯度
image.png
ps和w数量决定了训练速度,ps和w的数量增多可以加速,但是过多可能因为通信开销导致变慢,图A是比例关系,图B是数量关系
image.png
收敛
- 实验模型并不是所有都收敛比如DNN,好在生产模型是成熟的可以更好收敛的
- 一个epoch训练周期是所有mini-batches都处理一次。
- 数十到数百个epoch,模型会收敛
-
训练是一个迭代过程,将数据集划分为chunks,每个chunk进一步划分为mini-batches。训练步骤处理一个mini-batches并更新梯度(参数值)。我们还可以在每个mini-batches结束时计算训练性能指标。当所有 mini-batch 都处理完毕后,就完成了一个epoch。通常一个模型会被训练很多个 epoch(几十到几百个)直到它收敛(性能指标稳定)。
image.png
目前的调度系统具有以下特点
1.静态的资源分配,除非手动修改配置或者重新提交任务,否则不能利用闲置资源
2.调度决策不考虑任务本身特性,导致短时间任务因为无资源饿死
3、用FIFO模式,导致很多任务等待过长
本文调度系统
用的模型
https://github.com/apache/incubator-mxnet/tree/master/example
步骤
-
任务收敛模型【估算每个模型的训练时间】
- 针对不同类型模型,对损失和迭代次数的关系进行建模,使得可以通过当前损失预测剩余迭代次数
-
第一个模型用于估计作业需要完成的步骤/时期数。在给定步数k的情况下,SGD 以 O(1/k) 的速率收敛。因此我们可以使用以下模型来近似训练损失曲线:
image.png
image.png
-
在每一步后获得更多数据点,模型拟合(预测误差)得到改善,如下所示:
image.png -
资源-速度模型【加速关键】
- 训练速度由前向损失计算时间,反向梯度计算时间,通讯时间等因素决定(同步训练和异步训练公式会不一样)
- 异步训练,一个job中多个worker不同步,每个work有训练进度PS就更新;同步训练,PS搜集到所有work更新后才更新
- ps和和w的数量可以决定上述因素
-
可以通过ps和w数量来预测训练速度
image.png
image.png
-
资源分配
- 以上两个模型结合可以决定系统中当前的任务的每个任务的ps和w数量
- 因为是np问题,使用一个启发式算法来确定ps和w数量
- 为每个任务分配一个ps和一个w作为初始化
- 计算每个任务增加一个ps或者1个w的,取一个最大收益的进行分配,贪心
- 重复上述步骤直到资源耗尽或者增加资源成为负收益
-
任务放置
- 决定好每个任务的ps和w后要确定ps和w怎么放在服务器上
- ps和w放在相同服务器上可以减少通信时间
-
定理:采用最少的服务器来放置这些 w 和 ps,使得每个服务器内部署相同数量的 w 和 ps。
image.png
源码阅读
论文步骤
1、任务收敛
针对不同类型模型,对损失和迭代次数的关系进行建模,使得可以通过当前损失预测剩余迭代次数
2、资源-速度
3、资源分配
4、任务放置
提出问题
1、如何计算时间
2、资源如何分配
3、任务如何放置
4、如何与K8S做交互
从目录开始
image.pngestimator.py:估算速度和epoch
experimentor.py:代码入口, 代码使用多线程通信
job.py: 每个任务具体调度
jobrepo.py: 三个任务仓库
ResNet-50_ImageNet
VGG-16_ImageNet
ResNext-110_Cifar10
optimus_scheduler.py:主调度器
params.py: 参数值
代码入口 experimentor.py
企业微信截图_0282de44-4ef2-473f-a1b5-0d3eb6801b52_副本.png- 清除所有job
def clear():
os.system('kubectl delete jobs --all')
- 为每个任务分配单独的线程
Timer:时间驱动
Hub:消息转发
Generator:任务随机生成和模拟提交任务,本代码用了三个任务在jobrepo.py
Progressor: 任务执行器
Statsor: 任务状态收集
UTIL_Scheduler 任务调度器,重点逻辑
主调度器optimus_scheduler.py
image.png1、在init函数中,除了初始化参数还有一句
self.msg_handler = threading.Thread(target=self._msg_handle, args=())
重点为阅读_msg_handle这个函数
企业微信截图_b91c1b1b-fbbb-44ef-bb41-2b8df5edad8d_副本.png
while循环中每次收到一条消息,按照类型进行处理
第一次收集收集数据点,第一次估算速度,为所有未完成的job分配一个ps一个worker
for job in self.uncompleted_jobs:
cpu_req = job.worker_cpu + job.ps_cpu
mem_req = job.worker_mem + job.ps_mem
bw_req = job.worker_bw + job.ps_bw
gpu_req = job.worker_gpu
suff_resr = self.__check_cluster_resource_full(cpu_req, mem_req, bw_req, gpu_req)
if suff_resr:
job.num_worker = 1
job.num_ps = 1
self.cluster_used_cpu += cpu_req
self.cluster_used_mem += mem_req
self.cluster_used_bw += bw_req
self.cluster_used_gpu += gpu_req
# compute initial utility
self.__update_util_queue(job, util_queue)
else:
continue
还有剩余的资源继续增加,看task_type
if suff_resr:
# currently no mechanism to reduce resources
if task_type == "ps":
job.num_ps += 1
elif task_type == "worker":
job.num_worker += 1
self.cluster_used_cpu += cpu_req
self.cluster_used_mem += mem_req
self.cluster_used_bw += bw_req
self.cluster_used_gpu += gpu_req
self.__update_util_queue(job, util_queue)
else:
# no enough resource
break
# how to handle not_ready_jobs
__update_util_queue
- 计算剩余时间
# compute utility
# allocate 1 ps or 1 worker each time.
# sometimes can allocate multiple ps or worker for optimization, to avoid stuck in local optimal.
end_epoch = self.estimator.est_epoch(job)
if end_epoch <= 0:
# error when estimating epoch
end_epoch = job.progress + 20
rem_epoch = end_epoch - job.progress # the rem_epoch is negative if estimated epoch return -1
est_speed = self.estimator.est_speed(job, job.num_ps, job.num_worker)
self.logger.debug("estimated speed: " + str(est_speed))
if est_speed <= 0:
self.not_ready_jobs.add(job)
return
rem_time = rem_epoch / est_speed
- 当ps+1计算时间,同理worker
# if add ps 1
est_speed = self.estimator.est_speed(job, job.num_ps + 1, job.num_worker)
if est_speed <= 0:
self.not_ready_jobs.add(job)
return
ps_rem_time = rem_epoch / est_speed
resource_reqs = (job.ps_cpu, job.ps_mem, job.ps_bw)
shares = (1.0 * job.ps_cpu / self.cluster_num_cpu, 1.0 * job.ps_mem / self.cluster_num_mem,
1.0 * job.ps_bw / self.cluster_num_bw)
dom_res = shares.index(max(shares))
ps_util = (rem_time - ps_rem_time)/ resource_reqs[dom_res]
给出ps数和工人数,预测训练速度。如果字典中已经存在,使用真正的一个
def est_speed(self, job, num_ps, num_worker):
"""Give the number of ps and the number of worker, predict the training speed.
Use the real one if already exists in the dict
"""
if (num_ps, num_worker) in job.training_speeds:
return job.training_speeds[(num_ps, num_worker)]
else:
# do training speed curve fitting here
if 'async' in job.kv_store:
if len(job.training_speeds) >= 4:
# do not need curve fitting each time, can be further optimized. future work
ps_list = []
worker_list = []
speed_list = []
for key, value in job.training_speeds.items():
(ps, worker) = key
ps_list.append(float(ps))
worker_list.append(float(worker))
speed_list.append(value)
params = self._async_speed_curve_fitting(np.array(ps_list), np.array(worker_list), np.array(speed_list))
if params is None:
self.logger.error(self.name+":: " + job.name + " " + str((num_ps, num_worker)) + " speed estimation error")
return -1
else:
[a, b, c, d] = params
est_speed = self.__async_speed_fit_func((num_ps, num_worker), a, b, c, d)
return est_speed
else:
return -1
继续回到调度器,计算好这个时间周期所有未完成job需要多少资源后开始放置
# placement
ps_placements, worker_placements = self.__place(self.uncompleted_jobs)
对任务需求资源进行排序
# sort jobs based on num_ps and num_worker
job_sort_queue = Queue.PriorityQueue()
for job in jobs:
job_sort_queue.put((job.num_ps + job.num_worker, job))
对节点进行排序
cpu_avail_queue = Queue.PriorityQueue()
# sort nodes based on available cpus, since cpu is usually the bottleneck
for i in range(len(params.NODE_LIST)):
cpu_avail_queue.put((self.node_used_cpu_list[i], i))
从大到小把任务放入从小到大的节点
for i in range(job.num_ps):
# place ps evenly
node = cand_place_nodes[i % len(cand_place_nodes)]
# check whether resource is enough to place this ps
suff_resr = self.__check_node_resource_full(node, job.ps_cpu, job.ps_mem, job.ps_bw)
if suff_resr:
ps_nodes.append(node)
# minus temporary resources
self.__deduct_resr(job, "ps", 1, node)
else:
# since node is already sorted based on resources,
# if a larger node can not place the task, the following one can not too
fit_flag = False
# add the deducted resource back
for node in ps_nodes:
self.__add_back_resr(job, "ps", 1, node)
ps_already_deduct = True
break
最后一些放不进去的节点,和剩余的资源。拆开他们的ps和work单独放置,同时计算会不会速度变慢
# have try all nodes, but still can not place, then check if we can place some tasks
# and place ps and worker alternatively
self.logger.debug("last placed job: " + job.name)
ps_nodes = []
worker_nodes = []
flag_place_ps = True
for i in range(job.num_ps + job.num_worker):
flag_no_resource = True
if flag_place_ps:
# place ps task
for node in range(len(params.NODE_LIST)):
suff_resr = self.__check_node_resource_full(node, job.ps_cpu, job.ps_mem, job.ps_bw)
if suff_resr:
ps_nodes.append(node)
self.__deduct_resr(job, "ps", 1, node)
flag_no_resource = False
break
else:
# place worker task
for node in range(len(params.NODE_LIST)):
suff_resr = self.__check_node_resource_full(node, job.worker_cpu, job.worker_mem,
job.worker_bw, job.worker_gpu)
if suff_resr:
worker_nodes.append(node)
self.__deduct_resr(job, "worker", 1, node)
flag_no_resource = False
break
任务资源和K8S的交互
创建任务
def start(self):
# start the job in k8s
self.logger.info("starting job " + self.name + "...")
# job working dir
os.system('mkdir -p ' + self.dir)
self.ps_mount_dirs = self._set_mount_dirs('ps', self.host_workdir_prefix) # ps container mount
self.worker_mount_dirs = self._set_mount_dirs('worker', self.host_workdir_prefix) # worker container mount
self.__set_batch_size()
# create job yamls
self._create()
# prepare data
self._read_data()
# start pods in k8s
subprocess.check_output("kubectl create -f " + self.yaml, shell=True)
如何获得yaml文件
# copy template file
self.jinja = self.dir + self.name + '.jinja'
os.system("cp ../templates/k8s-mxnet-template.jinja " + self.jinja)
# replace variables in jinja file
temp_file = self.jinja + '.temp'
for key, value in variables.items():
os.system('sed -e "s@\$' + key + '@' + value + '@g" "' + self.jinja + '"' + ' > ' + temp_file)
os.system('rm ' + self.jinja)
os.system('mv ' + temp_file + ' ' + self.jinja)
# generate yaml file
self.yaml = self.dir + self.name + '.yaml'
os.system("python ../templates/render-template.py " + self.jinja + " > " + self.yaml)
image.png
其它相关交互
cmd = 'kubectl get pods --selector=' + 'name=' + self.name + ',' + 'job=' + task + ' --namespace=default' + ' |grep ' + task
# get heapster cluster ip
# heapster 192.168.192.16 <none> 80/TCP 5d
cmd = "kubectl get services --namespace=kube-system | grep heapster |awk '{print $2}'"
# in case not delete all
subprocess.check_output('kubectl delete jobs --selector=name=' + self.name, shell=True)
本论文使用了K8S自带的均衡负载
The workers/parameter servers are placed in a load balancing way, according to the default behavior of Kubernetes.
用了其中两个特性 FitPredicate和PriorityFunction
candidates = []
for pod in pods:
for node in nodes:
if pod.label-selector == node.label or pod.resources.limits + node.allocated_resources < node.resources.capacity: // fit predicate
candidates.append(node)
for candidate in candidates:
select one according to priorities (e.g., least used resources) // priority function
general steps:
一般步骤:(1)过滤节点,(2)对过滤后的节点列表进行优先级排序(3)选择最适合的节点
Available Predicates
- Static predicates:
PodFitsPorts,
PodFitsResources,
NoDiskConflict,
MatchNodeSelector,
HostName - Configurable predicates:
ServiceAffinity,
LabelsPresence
Available Priority Functions
- Static priority functions:
LeastRequestedPriority,
BalancedResourceAllocation,
ServiceSpreadingPriority,
EqualPriority - Configurable priority functions:
ServiceAntiAffinity,
LabelPrerference
参考地址:https://kubernetes.io/blog/2017/03/advanced-scheduling-in-kubernetes/
参考例子ResNet-50_ImageNet
'''
ResNet-50_ImageNet
'''
def _set_resnet50_job(job):
num_ps = params.DEFAULT_NUM_PS
num_worker = params.DEFAULT_NUM_WORKER
ps_cpu = 3
ps_mem = 9
ps_bw = 0
worker_cpu = 2
worker_mem = 8
worker_gpu = 1
worker_bw = 0
job.set_ps_resources(num_ps, ps_cpu, ps_mem, ps_bw)
job.set_worker_resources(num_worker, worker_cpu, worker_mem, worker_bw, worker_gpu)
image = 'xxx'
script = '/init.sh'
# must end with /, save everything including training data, validation data,
# training log and training model into this dir
work_dir = '/mxnet/example/image-classification/data/'
host_workdir_prefix = '/data/k8s-workdir/experiment/'
job.set_container(image, script, work_dir, host_workdir_prefix)
prog = 'python train_imagenet.py --network resnet --num-layers 50 --disp-batches 5 --num-epochs 100 --data-train /data/imagenet-train.rec'
kv_store = 'dist_sync'
prog += ' --kv-store ' + kv_store
if worker_gpu > 0:
prog += " --gpus" + " " + ",".join([str(i) for i in range(int(worker_gpu))])
job.set_train(prog=prog, batch_size=32, kv_store=kv_store, scale_bs=True)
hdfs_data = ['/k8s-mxnet/imagenet/imagenet-train.rec']
data_dir = '/data/'
host_data_dir = '/data/mxnet-data/imagenet/'
job.set_data(hdfs_data=hdfs_data, data_dir=data_dir, host_data_dir=host_data_dir, data_mounted=True)
job.set_mxnet(kv_store_big_array_bound=1000 * 1000, ps_verbose='')
源码部署
JOB是如何跑到集群的
image.png由调度器计算出每个job需要用多少ps和多少worker,然后设置这个参数
set_ps_resources
set_worker_resources
放置这些ps和work,需要从环境变量中获取这些资源
基于yarn的mxnet深度学习,需要配置mxnet分布式集群环境(暂时还没有配置这个)
set_worker_placement
set_ps_placement
挂载到容器的主机上的目录
_set_mount_dirs
mount_dirs = []
if type == 'ps':
for i in xrange(self.num_ps):
postfix = self.name + '-ps-' + str(i) + '/'
mount_dir = host_workdir_prefix + postfix
mount_dirs.append(mount_dir)
cmd = 'ssh ' + self.ps_placement[i] + ' "sudo rm -rf ' + mount_dir + '; mkdir -p ' + mount_dir + '"'
os.system(cmd)
设置容器
set_container
# container description
self.image = image
self.script = script
self.work_dir = work_dir
self.host_workdir_prefix = host_workdir_prefix
self.work_volume = work_volume
如果数据不在本地主机,从HDFS中获取,数据集列表,包括训练数据和验证数据
可以下载到本地,放到相应目录,目前找到一个cifar10的数据集和训练集
'http://data.mxnet.io/data/cifar10/cifar10_val.rec'
'http://data.mxnet.io/data/cifar10/cifar10_train.rec'
set_data 设置数据
_read_data 读取HDFS数据,如果本地有数据不会执行
self.hdfs_data = hdfs_data
self.data_dir = data_dir
self.host_data_dir = host_data_dir
self.data_mounted = data_mounted
self.data_volume = data_volume
设置训练集
set_train
__set_batch_size
set_mxnet
创建任务,利用jinja模版,传入参数,生成相应的yaml文件
_create
# copy template file
self.jinja = self.dir + self.name + '.jinja'
os.system("cp ../templates/k8s-mxnet-template.jinja " + self.jinja)
# replace variables in jinja file
temp_file = self.jinja + '.temp'
for key, value in variables.items():
os.system('sed -e "s@\$' + key + '@' + value + '@g" "' + self.jinja + '"' + ' > ' + temp_file)
os.system('rm ' + self.jinja)
os.system('mv ' + temp_file + ' ' + self.jinja)
# generate yaml file
self.yaml = self.dir + self.name + '.yaml'
os.system("python ../templates/render-template.py " + self.jinja + " > " + self.yaml)
尝试运行有成功生成一些yaml文件,里面ip参数不太对,其它参数也可能有问题,所以执行不起来。
image.png
获取训练状态,主要是获取正在进行的任务和值损失列表
_read_progress_stats
get_training_progress_stats
self._read_progress_stats()
return (list(self.progress_list), list(self.val_loss_list))
获取训练速度, 打卡本地文件获取
_read_training_speed
get_training_speed
'''
cmd = 'scp ' + node + ':' + local_file + ' ' + self.dir # the new txt will replace the old one, no need to delete
os.system(cmd)
try:
with open(self.dir+speed_fn, 'r') as fh:
stb_speed = float(fh.readline().replace('\n', '').split(' ')[1])
self.speed_list[i] = float('%.3f'%(stb_speed))
except Exception as e:
print e
continue
'''
cmd = "ssh " + node + " 'cat " + local_file + "'"
获取k8s pods
__get_pods
cmd = 'kubectl get pods --selector=' + 'name=' + self.name + ',' + 'job=' + task + ' --namespace=default' + ' |grep ' + task
output = subprocess.check_output(cmd, shell=True)
获取job的各项指标
_read_metrics
get_metrics
# cpu: milli core, mem: bytes, net: bytes/second
metric_keys = ['cpu/usage_rate', 'memory/usage', 'network/tx_rate', 'network/rx_rate']
运行开始函数
start
def start(self):
# start the job in k8s
self.logger.info("starting job " + self.name + "...")
# job working dir
os.system('mkdir -p ' + self.dir)
self.ps_mount_dirs = self._set_mount_dirs('ps', self.host_workdir_prefix) # ps container mount
self.worker_mount_dirs = self._set_mount_dirs('worker', self.host_workdir_prefix) # worker container mount
self.__set_batch_size()
# create job yamls
self._create()
# prepare data
self._read_data()
# start pods in k8s
subprocess.check_output("kubectl create -f " + self.yaml, shell=True)
删除任务和删除所有任务
subprocess.check_output('kubectl delete jobs --selector=name=' + self.name, shell=True)
安装过程中遇到的问题
1、ssh nodelist配置到集群,ssh命令运行失败,堡垒机需要跳转
image.png
2、getenv没有配置完全,需要一个个配置,或者删掉一些再做尝试
image.png
image.png
image.png
3、还需要收集所有相关训练任务的数据集和训练集
结论
动态部署可以利用论文这种方法,生成配置文件,与集群和外界环境的交互可以直接在python命令加shell脚本来获取和运行。