tensorflow on kubernetes实战 分布式深度
写在前面
- 态度决定高度!让优秀成为一种习惯!
- 世界上没有什么事儿是加一次班解决不了的,如果有,就加两次!(- - -茂强)
为什么是tensorflow on kubernetes?
个人觉得最大的优势是:
- 租户隔离 保证不同的用户能够互不干扰
- 资源包括GPU调度 能够有效利用资源
- 扩展能力 能够很容易横向扩展
- 灵活 整个资源分配比较灵活 管理灵活
等等
kubernetes集群的搭建
本文采用的是kubeadm安装方式,这个安装方式直接自动化安装etcd等依赖的组件
首先我们介绍一下什么是kubernetes,我们先来理解一下几个概念(一下内容均来自官方中文文档,版权归其所有)
-
基本概念
kuberntes特点
kubernetes能做什么
kubernetes组件
kubernetes总体结构
master节点
node节点
什么是pod
什么是标签
什么是注解
什么是RC
什么是服务
什么是目录
以下是kubernetes常见的命令,可参考官方文档进一步学习
-
常见命里该
命令
命令
命令
命令 -
kubernetes架构图
架构图-版权归原作者所有 - 集群安装
首先准备环境包,我里边才考了一个技术博客里边的包,版权归原创所有
https://pan.baidu.com/s/1FfO2saDPkXH7wO_5XTNqpw - 环境准备
centos7
三台机器:master,node1,node2 - 解压资源包
tar -xjvf k8s_images.tar.bz2
- 安装docker
yum install docker-ce-selinux-17.03.2.ce-1.el7.centos.noarch.rpm
yum install docker-ce-17.03.2.ce-1.el7.centos.x86_64.rpm
- 检测安装情况
yum list | grep docker
- 启动docker
systemctl start docker && systemctl enable docker
- 检测docker是否启动成功
ifconfig
查看是否有docker网络
- 修改docker镜像源为阿里云
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["[https://u9ea3fz9.mirror.aliyuncs.com]
(https://u9ea3fz9.mirror.aliyuncs.com)"]
}
EOF
- 重新启动docker
sudo systemctl daemon-reload
sudo systemctl restart docker
- 关闭防火墙
systemctl stop firewalld && systemctl disable firewalld
setenforce 0
- 配置路由表参数
echo "
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
" >> /etc/sysctl.conf
sysctl -p
- 关闭交换设备
swapoff -a
- 导入镜像
docker load <./docker_images/etcd-amd64_v3.1.10.tar
docker load <./docker_images/flannel:v0.9.1-amd64.tar
docker load <./docker_images/k8s-dns-dnsmasq-nannyamd64_v1.14.7.tar
docker load <./docker_images/k8s-dns-kube-dns-amd64_1.14.7.tar
docker load <./docker_images/k8s-dns-sidecar-amd64_1.14.7.tar
docker load <./docker_images/kube-apiserver-amd64_v1.9.0.tar
docker load <./docker_images/kube-controller-manager-amd64_v1.9.0.tar
docker load <./docker_images/kube-scheduler-amd64_v1.9.0.tar
docker load < ./docker_images/kube-proxy-amd64_v1.9.0.tar
docker load <./docker_images/pause-amd64_3.0.tar
docker load < ./kubernetes-dashboard_v1.8.1.tar
- 安装安装kubelet kubeadm kubectl
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh kubernetes-cni-0.6.0-0.x86_64.rpm kubelet-1.9.9-9.x86_64.rpm kubectl-1.9.0-0.x86_64.rpm
rpm -ivh kubectl-1.9.0-0.x86_64.rpm
rpm -ivh kubeadm-1.9.0-0.x86_64.rpm
- 在master节点启动kubelet
systemctl enable kubelet && sudo systemctl start kubelet
- check一下kubelet默认的cgroup的driver和docker的是否不一样,docker默认的cgroupfs,kubelet默认为systemd,两个要保证一致
vim /etc/systemd/system/kubelet.service.d/10-kubeadm.conf
修改其中的cgroupfs-driver的值systemd为cgroupfs
- 重启kubelet
systemctl daemon-reload && systemctl restart kubelet
以上操作在所有的节点上都需要操作,等操作完成后再进行下边的内容
- 初始化master
初始化master节点kubeadm init --kubernetes-version=v1.9.0 --pod-network-cidr=10.244.0.0/16
注意:这里一定要记下
kubeadm join --token 26c210.acef208514aaf37f 10.255.164.31:6443 --discovery-token-ca-cert-hash-sha256:87ee8b74e3b937f82b5174fa64bd140071cbf9087f41f5b4bec38c22332e6137
这个命令,后续如果你想在集群中增加该节点,横向扩展的时候这个是必须的命令(记录到你的加密文件里边)
同时,注意输出的目录里边还有个提示
提示
这个是个授权的处理,你需要运行以下,否则kubecttl命令可能用不了
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config
- 创建网络
找到kube-flannel.yml文件
kubectl create -f kube-flannel.yml
然后运行
kubectl get nodes
出现节点master状态为ready表示成功
与此同时配置好环境变量:
echo "export KUBECONFIG=/etc/kubernetes/admin.conf" >> ~/.bash_profile
source ~/.bash_profile
- 加入其他节点
到此,该操作需要在node节点上进行操作,有多少个node节点都需要操作
kubeadm join --token 99f58e.60c1ad95c0ac7dcd 10.255.164.31:6443 --discovery-token-ca-cert-hash sha256:7be50b18a3697bad6a0477db525a95e4db011f9f1f89384882b53eb85968eab5
请用你刚才初始化master的那个让你保存的命令进行,不要利用以上命令
加入完成以后查看是否已经加入成功
查看所有加入的节点kubectl get nodes
下面查看以下所有的命名空间
获取所有的命名空间
如果出现所有的节点的状态都是ready表明已经成功建立 kubernetes集群了
- kubernetes-dashborad的搭建
这里我们选用官方最新的yaml文件
找到如下图部分,进行修改为如下
dashborad
其中的nodePort是链接到该dashborad的端口,type: NodePort是一种端口暴露,表示暴漏给proxy层,外界可以通过该nodePort访问到该服务
另外还要对其依赖的镜像版本做个修改,因为,前边加载到docker的kubernetes-dashborad的版本与改文件的版本有可能不一样,所以要改动一下
dashborad文件镜像修改
创建dashborad
kubectl create -f kubernetes-dashboard.yaml
然后访问https://master_ip:32666
修改上边的master_ip为你物理机的域名或者ip
通过浏览器可以访问到
这里我们采用令牌登陆模式
下边我们一块获取令牌,执行
安全访问token列表kubectl get secret -n kube-system
以红框内的controller-token为准,执行
kubectl describe secret/namespace-controller-token-bw9jn -n kube-system
记得不要直接执行,修改上边的namespace-controller-token-bw9jn,因为每个集群都不一样
这时就会拿到该token
获取token
复制以上token到浏览器,就可以登陆了。
dashborad内容展示
这样,我们的dashborad已经创建好了
到此,kubernetes集群已经搭建完成,如果是正式环境的话,还请搭建HA机制的集群,这个在中文技术文档中都有,这里不再赘述,可参考中文官方文档进行正式环境HA搭建
下边就让我们一步步的来构建tensorflow on kubernetes环境吧
tensorflow on kubernetes架构图
首先我们来了解以下整个平台的架构图是什么样子
tensorflow on kubernetes架构图
我们来解释一下,这个架构图是分布式tensorflow的实战图,其中有两个参数服务,多个worker服务,还有个shuffle和抽样的服务,shuffle就是对样根据其标签进行混排,然后对外提供batch抽样服务(可以是有放回和无放回,抽样是一门科学,详情可以参考抽样技术一书),每个batch的抽样是由每个worker去触发,worker拿到抽样的数据样本ID后就去基于kubernetes构建的分布式数据库里边提取该batchSize的样本数据,进行训练计算,由于分布式的tensorflow能够保证异步梯度下降算法,所以每次训练batch数据的时候都会基于最新的参数迭代,然而,更新参数操作就是两个参数服务做的,架构中模型(参数)的存储在NFS中,这样以来,参数服务与worker就可以共享参数了,最后说明一下,我们训练的所有数据都是存储在分布式数据库中(数据库的选型可以根据具体的场景而定)。为什么需要一个shuffle和抽样的服务,因为当数据量很大的时候,我们如果对所有的样本数据进行shuffle和抽样计算的话会浪费很大的资源,因此需要一个这样的服务专门提取数据的(id,label)来进行混排和抽样,这里如果(id, label)的数据量也很大的时候我们可以考虑基于spark 来分布式的进行shuffle和抽样,目前spark2.3已经原生支持kubernetes调度
NFS服务搭建
- 什么是NFS(来自百度百科)
NFS(Network File System)即网络文件系统,是FreeBSD支持的文件系统中的一种,它允许网络中的计算机之间通过TCP/IP网络共享资源。在NFS的应用中,本地NFS的客户端应用可以透明地读写位于远端NFS服务器上的文件,就像访问本地文件一样。 - NFS服务搭建
yum install nfs-utils rpcbind -y
mkdir -p data/nfs
vim /etc/exports
加入如下内容
/data/nfs 192.168.86.0/24(rw,no_root_squash,no_all_squash,sync)
启动
/bin/systemctl start rpcbind.service
/bin/systemctl start nfs.service
到此,NFS服务已经搭建好了
tensorflow docker镜像打包
首先我们准备DockerFile
FROM ubuntu:16.04
MAINTAINER yahengsong yahengsong@foxmail.com
RUN apt-get update \
&& apt-get install -y wget \
&& apt-get install -y lrzsz \
&& apt-get install -y unzip \
&& apt-get install -y zip \
&& apt-get install -y vim \
&& apt-get install -y gcc \
&& apt-get install -y g++ \
&& apt-get install -y automake \
&& apt-get install -y autoconf \
&& apt-get install -y libtool \
&& apt-get install -y make \
&& apt-get install -y openssl \
&& apt-get install -y libssl-dev \
&& apt-get install -y ruby \
&& apt-get install -y zlib1g \
&& apt-get install zlib1g.dev \
&& apt-get install -y bzip2 \
&& apt-get install -y libncurses5-dev \
&& apt-get install -y sqlite sqlite3 \
&& apt-get install -y libgdbm-dev \
&& apt-get install -y libpcap-dev \
&& apt-get install -y xz-utils
RUN wget https://www.python.org/ftp/python/3.6.0/Python-3.6.0.tar.xz \
&& tar -xvf Python-3.6.0.tar.xz \
&& cd Python-3.6.0 \
&& mkdir -p /usr/local/python3 \
&& ./configure --prefix=/usr/local/python3 \
&& make \
&& make install \
&& rm -rf Python-3.6.0* \
&& ln -s /usr/local/python3/bin/python3 /usr/bin/python3 \
&& ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
RUN pip install --upgrade pip \
&& pip --no-cache-dir install >https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-1.7.0-cp36-cp36m-linux_x86_64.whl
# TensorBoard
EXPOSE 6006
# IPython
EXPOSE 8888
WORKDIR /root
我们可以通过该DockerFile在阿里云上进行镜像打包
这样我们就有了自己的环境了(注意,该版本没有安装jupyter)
阿里云镜像
详情可以参考如何在阿里云上打包自己的镜像
tenssorflow on kubernetes实战
以下将是如何在kubernetes集群上部署tensorflow环境
首先部署单机CPU版本的
下面我们来看线tensorflow.yaml文件
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: tensorflow
spec:
replicas: 1
template:
metadata:
labels:
k8s-app: tensorflow
spec:
containers:
- name: tensorflow
image: registry.cn-
hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
ports:
- containerPort: 8888
resources:
limits:
cpu: 4
memory: 2Gi
requests:
cpu: 2
memory: 1Gi
---
apiVersion: v1
kind: Service
metadata:
name: jupyter-service
spec:
type: NodePort
ports:
- port: 80
targetPort: 8888
nodePort: 32001
name: tensorflow
selector:
k8s-app: tensorflow
这里我们依赖的是阿里云的tensorflow的docker镜像registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3(该版本里边有jupyter),我们向kubernetes集群申请2个CPU和1G内存,kubernetes集群给该Deployment最大的CPU限制是4核和2G内存,需要注意的是该环境暴露到外网的端口是32001
有了改文件,接下来创建环境
kubectl create -f tensorflow.yaml
这时候就可以通过,查看该环境了
kubectl get pods
到此我们可以通过http://master_ip:32001/来访问该环境的jupyter了
这时候我们可以通过如下获取token
get token
然后通过token就可以登陆了
jupyter
然后你就可以愉快的编程了
分布式tensorflow on kubernetes
如之前所介绍的分布式深度学习架构
我们首先创建一个参数服务tf-ps.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: tensorflow-ps
spec:
replicas: 1
template:
metadata:
labels:
name: tensorflow-ps
role: ps
spec:
containers:
- name: ps
image: registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
ports:
- containerPort: 2222
resources:
limits:
cpu: 4
memory: 2Gi
requests:
cpu: 2
memory: 1Gi
volumeMounts:
- mountPath: /datanfs
readOnly: false
name: nfs
volumes:
- name: nfs
nfs:
server: 你的nfs服务地址
path: "/data/nfs"
---
apiVersion: v1
kind: Service
metadata:
name: tensorflow-ps-service
labels:
name: tensorflow-ps
role: service
spec:
ports:
- port: 2222
targetPort: 2222
selector:
name: tensorflow-ps
执行
kubectl create -f tf-ps.yaml
然后两个参数节点就会被创建
下面我们创建2个worker节点tf-worker.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: tensorflow-worker
spec:
replicas: 3
template:
metadata:
labels:
name: tensorflow-worker
role: worker
spec:
containers:
- name: worker
image: registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
ports:
- containerPort: 2222
resources:
limits:
cpu: 4
memory: 2Gi
requests:
cpu: 2
memory: 1Gi
volumeMounts:
- mountPath: /datanfs
readOnly: false
name: nfs
volumes:
- name: nfs
nfs:
server: 你的nfs服务地址
path: "/data/nfs"
---
apiVersion: v1
kind: Service
metadata:
name: tensorflow-wk-service
labels:
name: tensorflow-worker
spec:
ports:
- port: 2222
targetPort: 2222
selector:
name: tensorflow-worker
执行
kubectl create -f tf-worker.yaml
这时候2个worker节点就会被创建
节点
-
训练
查看每个pod的ip用于构建集群训练代码
参数服务
然后进去每个节点环境
kubectl exec -ti tensorflow-ps-77b8d7bc89-87qgp bash
kubectl exec -ti tensorflow-worker-b7cc4dd66-94ntr bash
kubectl exec -ti tensorflow-worker-b7cc4dd66-mzqhb bash
创建以下代码(在此之前请先准备好mnist数据集的csv格式并放到nfs服务的data/nfs目录下)
from __future__ import print_function
import math
import tensorflow as tf
import collections
import sys,os, time
import numpy as np
# TensorFlow集群描述信息,ps_hosts表示参数服务节点信息,worker_hosts表示worker节点信息
tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")
# TensorFlow Server模型描述信息,包括作业名称,任务编号,隐含层神经元数量,MNIST数据目录以及每次训练数据大小(默认一个批次为100个图片)
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100, "Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/datanfs", "Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")
FLAGS = tf.app.flags.FLAGS
#图片像素大小为28*28像素
IMAGE_PIXELS = 28
class DataSet(object):
def __init__(self,
images,
labels,
reshape=True):
"""Construct a DataSet.
one_hot arg is used only if fake_data is true. `dtype` can be either
`uint8` to leave the input as `[0, 255]`, or `float32` to rescale into
`[0, 1]`.
"""
self._num_examples = images.shape[0]
# Convert shape from [num examples, rows, columns, depth]
# to [num examples, rows*columns] (assuming depth == 1)
images = images.astype(np.float32)
images = np.multiply(images, 1.0 / 255.0)
self._images = images
self._labels = labels
self._epochs_completed = 0
self._index_in_epoch = 0
@property
def images(self):
return self._images
@property
def labels(self):
return self._labels
@property
def num_examples(self):
return self._num_examples
@property
def epochs_completed(self):
return self._epochs_completed
def next_batch(self, batch_size, fake_data=False, shuffle=True):
"""Return the next `batch_size` examples from this data set."""
start = self._index_in_epoch
# Shuffle for the first epoch
if self._epochs_completed == 0 and start == 0 and shuffle:
perm0 = np.arange(self._num_examples)
np.random.shuffle(perm0)
self._images = self.images[perm0]
self._labels = self.labels[perm0]
# Go to the next epoch
if start + batch_size > self._num_examples:
# Finished epoch
self._epochs_completed += 1
# Get the rest examples in this epoch
rest_num_examples = self._num_examples - start
images_rest_part = self._images[start:self._num_examples]
labels_rest_part = self._labels[start:self._num_examples]
# Shuffle the data
if shuffle:
perm = np.arange(self._num_examples)
np.random.shuffle(perm)
self._images = self.images[perm]
self._labels = self.labels[perm]
# Start next epoch
start = 0
self._index_in_epoch = batch_size - rest_num_examples
end = self._index_in_epoch
images_new_part = self._images[start:end]
labels_new_part = self._labels[start:end]
return np.concatenate((images_rest_part, images_new_part), axis=0) , \
np.concatenate((labels_rest_part, labels_new_part), axis=0)
else:
self._index_in_epoch += batch_size
end = self._index_in_epoch
return self._images[start:end], self._labels[start:end]
def dense_to_one_hot(labels_dense, num_classes):
"""Convert class labels from scalars to one-hot vectors."""
num_labels = labels_dense.shape[0]
index_offset = np.arange(num_labels) * num_classes
labels_one_hot = np.zeros((num_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
def read_data_sets(train_dir,
reshape=True,
validation_size=2000):
trainfile = os.path.join(train_dir, "mnist_train.csv")
testfile = os.path.join(train_dir, "mnist_test.csv")
train_images = np.array([], dtype=np.uint8)
train_labels = np.array([], dtype=np.uint8)
test_images = np.array([], dtype=np.uint8)
test_labels = np.array([], dtype=np.uint8)
count = 0
with open(trainfile) as f:
for line in f.readlines():
count+= 1
line = line.strip()
line = line.split(",")
line = [int(x) for x in line]
one_rray = np.array(line[1:], dtype=np.uint8)
train_images = np.hstack((train_images, one_rray))
train_labels = np.hstack((train_labels, np.array(line[0], dtype=np.uint8)))
if count % 10000 == 0:
print(str(count))
if count == 20000:
break
train_images = train_images.reshape(20000, 28*28)
train_labels = train_labels.reshape(20000, 1)
train_labels = dense_to_one_hot(train_labels, 10)
count = 0
with open(testfile) as f:
for line in f.readlines():
count += 1
line = line.strip()
line = line.split(",")
line = [int(x) for x in line]
one_rray = np.array(line[1:], dtype=np.uint8)
test_images = np.hstack((test_images, one_rray))
test_labels = np.hstack((test_labels, np.array(line[0], dtype=np.uint8)))
if count % 10000 == 0:
print(str(count))
test_images = test_images.reshape(10000, 28*28)
test_labels = test_labels.reshape(10000, 1)
test_labels = dense_to_one_hot(test_labels, 10)
if not 0 <= validation_size <= len(train_images):
raise ValueError(
'Validation size should be between 0 and {}. Received: {}.'
.format(len(train_images), validation_size))
validation_images = train_images[:validation_size]
validation_labels = train_labels[:validation_size]
train_images = train_images[validation_size:]
train_labels = train_labels[validation_size:]
train = DataSet(train_images, train_labels, reshape=reshape)
validation = DataSet(validation_images, validation_labels, reshape=reshape)
test = DataSet(test_images, test_labels, reshape=reshape)
Datasets = collections.namedtuple('Datasets', ['train', 'validation', 'test'])
return Datasets(train=train, validation=validation, test=test)
def main(_):
#从命令行参数中读取TensorFlow集群描述信息
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# 创建TensorFlow集群描述对象
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# 为本地执行Task,创建TensorFlow本地Server对象.
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
#如果是参数服务,直接启动即可
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
#分配操作到指定的worker上执行,默认为该节点上的cpu0
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
ps_device="/job:ps/cpu:0",
cluster=cluster)):
# 定义TensorFlow隐含层参数变量,为全连接神经网络隐含层
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
# 定义TensorFlow softmax回归层的参数变量
sm_w = tf.Variable(tf.truncated_normal([FLAGS.hidden_units, 10], stddev=1.0 / math.sqrt(FLAGS.hidden_units)), name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
#定义模型输入数据变量(x为图片像素数据,y_为手写数字分类)
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
y_ = tf.placeholder(tf.float32, [None, 10])
#定义隐含层及神经元计算模型
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)
#定义softmax回归模型,及损失方程
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
#定义全局步长,默认值为0
global_step = tf.Variable(0, name="global_step", trainable=False)
#定义训练模型,采用Adagrad梯度下降法
train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step)
#定义模型精确度验证模型,统计模型精确度
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
#对模型定期做checkpoint,通常用于模型回复
saver = tf.train.Saver()
#定义收集模型统计信息的操作
summary_op = tf.summary.merge_all()
#定义操作初始化所有模型变量
init_op = tf.initialize_all_variables()
#创建一个监管程序,用于构建模型检查点以及计算模型统计信息。
is_chief = (FLAGS.task_index == 0)
if is_chief:
print("Worker %d: Initializing session..." % FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." % FLAGS.task_index)
sv = tf.train.Supervisor(
is_chief= is_chief,
logdir="/tmp/train_logs",
init_op=init_op,
summary_op=summary_op,
saver=saver,
global_step=global_step,
save_model_secs=600)
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=False,
device_filters=["/job:ps",
"/job:worker/task:%d" % FLAGS.task_index])
#读入MNIST训练数据集
mnist = read_data_sets(FLAGS.data_dir)
#创建TensorFlow session对象,用于执行TensorFlow图计算
with sv.managed_session(server.target, config=sess_config) as sess:
print("Worker %d: Session initialization complete." % FLAGS.task_index)
# Perform training
time_begin = time.time()
print("Training begins @ %f" % time_begin)
local_step = 0
step = 0
while not sv.should_stop() and step < 10000:
# 读入MNIST的训练数据,默认每批次为100个图片
batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
train_feed = {x: batch_xs, y_: batch_ys}
#执行分布式TensorFlow模型训练
_, step = sess.run([train_op, global_step], feed_dict=train_feed)
local_step = local_step + 1
now = time.time()
print("%f: Worker %d: training step %d done (global step: %d)" %
(now, FLAGS.task_index, local_step, step))
#每隔100步长,验证模型精度
if step % 100 == 0:
print("acc: %g" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
print("cross entropy = %g" % sess.run(loss, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
# 停止TensorFlow Session
time_end = time.time()
print("Training ends @ %f" % time_end)
training_time = time_end - time_begin
print("Training elapsed time: %f s" % training_time)
print("acc: %g" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
print("cross entropy = %g" % sess.run(loss, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
sv.stop()
if __name__ == "__main__":
tf.app.run()
然后在参数服务上执行
python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="ps" --task_index=0
则会有,其实是启动了GRPC服务
参数服务
在第一个worker节点上执行
python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=0
在第二个worker节点上执行
python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=1
这时候等数据加载完成就会有如下训练信息
第一个工作节点
第二个工作节点
两个工作节点的迭代次数合起来就是我们设置的总的迭代次数
自后的模型都会存在nfs服务中,因为只有这样参数节点和工作节点才能共享模型参数
- GPU
GPU的方案整体和以上差不多,只是在原yaml文件中增加GPU支持
模版:
apiVersion: v1
kind: ReplicationController
metadata:
name: tensorflow-worker
spec:
replicas: 1
selector:
name: tensorflow-worker
template:
metadata:
labels:
name: tensorflow-worker
role: worker
spec:
containers:
- name: worker
image: gcr.io/tensorflow/tensorflow:latest-gpu
ports:
- containerPort: 2222
env:
- name: PS_KEY
valueFrom:
configMapKeyRef:
name: tensorflow-cluster-config
key: ps
- name: WORKER_KEY
valueFrom:
configMapKeyRef:
name: tensorflow-cluster-config
key: worker
securityContext:
privileged: true
resources:
requests:
alpha.kubernetes.io/nvidia-gpu: 1
limits:
alpha.kubernetes.io/nvidia-gpu: 1
volumeMounts:
- mountPath: /dev/nvidia0
name: nvidia0
- mountPath: /dev/nvidiactl
name: nvidiactl
- mountPath: /dev/nvidia-uvm
name: nvidia-uvm
- mountPath: /datanfs
name: tfstorage
- name: libcuda-so
mountPath: /usr/lib/x86_64-linux-gnu
- name: cuda
mountPath: /usr/local/cuda-8.0
volumes:
- name: nfs
persistentVolumeClaim:
claimName: nfs-pvc
- hostPath:
path: /dev/nvidia0
name: nvidia0
- hostPath:
path: /dev/nvidiactl
name: nvidiactl
- hostPath:
path: /dev/nvidia-uvm
name: nvidia-uvm
- name: libcuda-so
hostPath:
path: /usr/lib/x86_64-linux-gnu
- name: cuda
hostPath:
path: /usr/local/cuda-8.0
到此我们的实战已经初步结束,当然不排除其中有很多细节,有很多坑要踩,这些细节和坑在这里都不一一再说了,因为太多了,没发写。
如果你们在实战过程中遇到什么问题,欢迎随时跟我沟通,我们共同成长,共同学习。
QQ:458798698
微信号:songwindwind
或者直接在简书上联系我。
有兴趣的可以关注本人github
https://github.com/songyaheng