tensorflow初探七之队列-Tensorflow技术解析与
2019-01-29 本文已影响0人
欠我的都给我吐出来
队列
队列(queue)本身也是图中的一个节点,是一种有状态的节点,其他节点,如入队节点(enqueue)和出队节点(dequeue),可以修改它的内容。例如,入队节点可以把新元素插到队列末尾,出队节点可以把队列前面的元素删除。TensorFlow 中主要有两种队列,即 FIFOQueue 和 RandomShuffleQueue。
- FIFOQueue 创建一个先入先出队列。例如,我们在训练一些语音、文字样本时,使用循环神经网络的网络结构,希望读入的训练样本是有序的,就要用 FIFOQueue。
- RandomShuffleQueue 创建一个随机队列,在出队列时,是以随机的顺序产生元素的。例如,我们在训练一些图像样本时,使用 CNN 的网络结构,希望可以无序地读入训练样本,就要用RandomShuffleQueue,每次随机产生一个训练样本。RandomShuffleQueue 在 TensorFlow 使用异步计算时非常重要。因为 TensorFlow 的会话是支持多线程的,我们可以在主线程里执行训练操作,使用 RandomShuffleQueue 作为训练输入,开多个线程来准备训练样本,将样本压入队列后,主线程会从队列中每次取出 mini-batch 的样本进行训练。
# FIFOQueue 先进先出
q=tf.FIFOQueue(3,"float")
init=q.enqueue_many(([0.1,0.2,0.3],))
x=q.dequeue()
y=x+1
q_inc=q.enqueue([y])
with tf.Session() as sess:
sess.run(init)
for i in range(2):
sess.run(q_inc)
quelen=sess.run(q.size())
for i in range(quelen):
print(sess.run(q.dequeue()))
#RandomShuffleQueue
q=tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes="float")
with tf.Session() as sess:
for i in range(0,10):
sess.run(q.enqueue(i))
for i in range(0,8):
print(sess.run(q.dequeue()))
#3.0
#4.0
#8.0
#2.0
#0.0
#1.0
#7.0
#9.0
注意到RandomShuffleQueue的参数有容量和最小长度。当队列长度等于最小值,执行出队操作以及队列长度等于最大值,执行入队操作时,会有阻断情况发生。只有当队列满足要求后,才能继续执行。可以通过设置绘画在运行时的等待时间来解除阻断。
q=tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes="float")
with tf.Session() as sess:
for i in range(0,10):
sess.run(q.enqueue(i))
for i in range(0,10):
run_options = tf.RunOptions(timeout_in_ms = 10000) # 等待 10 秒
try:
print(sess.run(q.dequeue(), options=run_options))
except tf.errors.DeadlineExceededError:
print('out of range')
break;
如果入队操作是在主线程中进行,那么当入队产生阻断时,会影响后续的读数据以及训练操作。会话中可以运行多个线程,我们使用线程管理器 QueueRunner 创建一系列的新线程进行入队操作,让主线程继续使用数据,即训练网络和读取数据是异步的,主线程在训练网络,另一个线程在将数据从硬盘读入内存。
队列管理器
q=tf.FIFOQueue(1000,'float')
counter=tf.Variable(0.0)
incre_op=tf.assign_add(counter,tf.constant(1.0))
enqueue_op=q.enqueue(counter)
qr=tf.train.QueueRunner(q,enqueue_ops=[incre_op,enqueue_op]*1)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
#启动入队操作
enqueue_threads=qr.create_threads(sess,start=True)
#主线程是取数据操作
for i in range(10):
print(sess.run(q.dequeue()))
#3.0
#11.0
#69.0
#73.0
#90.0
#97.0
#107.0
#226.0
#266.0
#274.0
输出的队列也不是我们期待的自然数列,并且线程被阻断。这是因为加 1 操作和入队操作不同步,可能加 1 操作执行了很多次之后,才会进行一次入队操作。
上述代码最后报异常,然后会话自动关闭。入队线程自顾自地执行,在需要的出队操作完成之后,程序没法结束,一直到超过队列的容量之后,会导致cancalledError。因此需要协调器来管理线程。
协调器coordinator
可以解决上面的入队线程不受控制的情况。
q=tf.FIFOQueue(1000,'float')
counter=tf.Variable(0.0)
incre_op=tf.assign_add(counter,tf.constant(1.0))
enqueue_op=q.enqueue(counter)
qr=tf.train.QueueRunner(q,enqueue_ops=[incre_op,enqueue_op]*1)
sess=tf.Session()
sess.run(tf.global_variables_initializer())
coord=tf.train.Coordinator() #协调器,协调线程间的关系可以视为一种信号量,用来做同步
enqueue_threads = qr.create_threads(sess, coord = coord,start=True)
for i in range(0,10):
print(sess.run(q.dequeue()))
coord.request_stop()# 通知其他线程关闭
coord.join(enqueue_threads)# join 操作等待其他线程结束,其他所有线程关闭之后,这一函数才能返回
#3.0
#20.0
#304.0
#1118.0
#1164.0
#1242.0
#1311.0
#1320.0
#1381.0
#1387.0
这个很奇怪,并没有按照书上说的关闭线程之后再执行出队操作,就会抛出 tf.errors.OutOfRange 错误。而且
print(sess.run(q.size()))
#每次都不一样,这次是170
for i in range(0,10):
print(sess.run(q.dequeue()))
#在此实行上面的代码,得到10个1387