2018-06-29-tensorflow-queue-队列理解
在Tensorflow中,队列是一类非常特殊的元素,其涉及到的内容也十分广泛,使用的方法也是多种多样.但是在本文中仅仅介绍一下使用队列进行并行的输入的方法.不过需要首先记住一点,使用队列的方法进行输入的本质是从队列取数据和加数据的过程是并行的,并且加数据的过程是可以采用多线程的.本文不同于其他的教程,将会以使用queue实现mini-batch数据输入作为讲述的核心.主要参考了这篇官网的教程
基本思路
Mini-batch的数据输入方式是很熟悉的套路.但是怎么生成这样的mini-batch是难点.并且生成完1个epoch的所有mini-batch之后,将会回过头去重新生成一遍所有的mini-batch.我们分析一下这样的思路,可以发现如果采用队列的方式对生成的batch进行存储.每次取出来一个队列元素(一个batch),顺手在把这个batch 加回到队列中,并且利用这个batch进行模型的训练.训练结束了之后将会再取一个batch ,循环往复.这样就可以保证有源源不断的batch可以取出来.
实现方法之队列简介
image.png 上面的这个图,就是最最最简单的一种队列啦.但是呢麻雀虽小,五脏俱全.这么一小段代码,就涵盖了我们用的到的几个op.tf.FIFOQueue()
可以创建一个最简单的先进先出的队列.q.enqueue_many()
可以n入队多个元素.q.dequeue()
可以出队一个元素.q.enqueue()
则很明显可以入队一个元素。
QueueRunner和Coordinator的使用
好像利用上一小节的内容就可以实现我们上述的生成mini-batch的过程啦,但是这样的话我这篇总结就太短啦~立即推:还没完!
其实确实,上一部分的内容已经足够。想想,只需要首先把所有的batch都放入到队列当中,然后每次出队一个元素,在放入队尾就行了。但是呢,Tensorflow比较贴心。给我们提供了另外一个工具QueueRunner.它的作用是用来帮我们往队列里填数据,没有了就填没有了就填。而且能够多线程的填。用法如下:
def simple_shuffle_batch(source, capacity, batch_size=10):
# Create a random shuffle queue.
queue = tf.RandomShuffleQueue(capacity=capacity,
min_after_dequeue=int(0.9*capacity),
shapes=source.shape, dtypes=source.dtype)
# Create an op to enqueue one item.
enqueue = queue.enqueue(source)
# Create a queue runner that, when started, will launch 4 threads applying
# that enqueue op.
num_threads = 4
qr = tf.train.QueueRunner(queue, [enqueue] * num_threads)
# Register the queue runner so it can be found and started by
# <a href="../../api_docs/python/tf/train/start_queue_runners"><code>tf.train.start_queue_runners</code></a> later (the threads are not launched yet).
tf.train.add_queue_runner(qr)
# Create an op to dequeue a batch
return queue.dequeue_many(batch_size)
从代码里可以看出来,使用queuerunner需要三个步骤
1.需要创建一个queuerunner,即qr = tf.train.QueueRunner(queue, [enqueue] * num_threads)
2.并且添加到训练中去tf.train.add_queue_runner(qr)
。
3.代码中其实还少一步,就是要使用tf.train.start_queue_runners()
这个函数来启动我们的自动化小助手queue runner。否则它不会干活帮我们填充数据的。
还没完!Tensorflow还提供了一个东西叫Coordinator
这个东西呢,只有在我们使用多线程版本的queue runner的时候才有用,它的作用是负责统一规划。等到所有的线程都结束了,就做个记录,说好啦可以收工啦,可以汇合数据啦。用法也是极其简单的。
# Using Python's threading library.
import threading
# Thread body: loop until the coordinator indicates a stop was requested.
# If some condition becomes true, ask the coordinator to stop.
def MyLoop(coord):
while not coord.should_stop():
...do something...
if ...some condition...:
coord.request_stop()
# Main thread: create a coordinator.
coord = tf.train.Coordinator()
# Create 10 threads that run 'MyLoop()'
threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]
# Start the threads and wait for all of them to stop.
for t in threads:
t.start()
coord.join(threads)
Supervisor的使用
有了上面这些东西呢,就可以比较完美的实现我们的mini-batch的数据的生成过程了。但是还需要多提一个看起来不相关的话题。就是如果你恰好在看官网的教程,我们可以发现在序列的rnn教程的读取数据的代码里和我上文讲的过程是有出入的。主要集中在以下两点:
1.该代码没有把每一个batch 放进queue当中,而是呀把epoch的编号放进了queue当中,使用的是tf.train.range_input_producer()
这样一个方法,本质是建立了一个fifo队列。这里首先需要解释一下,在rnn解决序列问题的时候,epoch并不是我们理解的整个文本,而是一个unrolling的rnn的时间跨度。比如说我们的rnn模型使用了10个cell作为中间层,那么一个epoch就是由10个mini-batch组成的。理解了这个,我们也就很容易理解代码中的做法,代码把所有epoch编号,每次出队一个编号,这个编号重新入队之后,再根据编号去找对应的batch们,然后利用这些batch进行训练。和我前面提到的直接将batch存到queue中的想法,多了一个查找的过程。
2.另外一个重要区别是代码里没有用到start_queue_runners
和coordinator
。是我说错了吗!不!是它没有直接用,可以观察到,它使用了supervisor这么个东西。如果仔细研究过,就会知道这是一个用来长期进行训练的可以取代普通session的东东。它的主要作用是模型训练了一段时间(通常是半分钟)之后,就会存储一下。这样的话如果训练的时间很长,断电啦,或者咋的啦。就会自动的从之前存储的模型中读取信息,实现了‘断点续传’。这个supervisor还顺手把我们的queue_runners给启动啦,并且自动的管理了多线程。真是多才多艺的小监督员。