tensorflow技术解析与实战——阅读笔记

tensorflow初探七之队列-Tensorflow技术解析与

2019-01-29  本文已影响0人  欠我的都给我吐出来

队列

队列(queue)本身也是图中的一个节点,是一种有状态的节点,其他节点,如入队节点(enqueue)和出队节点(dequeue),可以修改它的内容。例如,入队节点可以把新元素插到队列末尾,出队节点可以把队列前面的元素删除。TensorFlow 中主要有两种队列,即 FIFOQueue 和 RandomShuffleQueue。

#  FIFOQueue 先进先出
q=tf.FIFOQueue(3,"float")
init=q.enqueue_many(([0.1,0.2,0.3],))

x=q.dequeue()
y=x+1
q_inc=q.enqueue([y])

with tf.Session() as sess:
    sess.run(init)
    for i in range(2):
        sess.run(q_inc)
    quelen=sess.run(q.size())
    for i in range(quelen):
        print(sess.run(q.dequeue()))

#RandomShuffleQueue
q=tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes="float")
with tf.Session() as sess:
    for i in range(0,10):
        sess.run(q.enqueue(i))
    for i in range(0,8):
        print(sess.run(q.dequeue()))

#3.0
#4.0
#8.0
#2.0
#0.0
#1.0
#7.0
#9.0

注意到RandomShuffleQueue的参数有容量和最小长度。当队列长度等于最小值,执行出队操作以及队列长度等于最大值,执行入队操作时,会有阻断情况发生。只有当队列满足要求后,才能继续执行。可以通过设置绘画在运行时的等待时间来解除阻断。

q=tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes="float")
with tf.Session() as sess:
    for i in range(0,10):
        sess.run(q.enqueue(i))
    for i in range(0,10):
        run_options = tf.RunOptions(timeout_in_ms = 10000) # 等待 10 秒
        try:
            print(sess.run(q.dequeue(), options=run_options))
        except tf.errors.DeadlineExceededError:
            print('out of range')
            break;

如果入队操作是在主线程中进行,那么当入队产生阻断时,会影响后续的读数据以及训练操作。会话中可以运行多个线程,我们使用线程管理器 QueueRunner 创建一系列的新线程进行入队操作,让主线程继续使用数据,即训练网络和读取数据是异步的,主线程在训练网络,另一个线程在将数据从硬盘读入内存。

队列管理器

q=tf.FIFOQueue(1000,'float')
counter=tf.Variable(0.0)
incre_op=tf.assign_add(counter,tf.constant(1.0))
enqueue_op=q.enqueue(counter)

qr=tf.train.QueueRunner(q,enqueue_ops=[incre_op,enqueue_op]*1)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    #启动入队操作
    enqueue_threads=qr.create_threads(sess,start=True)
    #主线程是取数据操作
    for i in range(10):
        print(sess.run(q.dequeue()))

#3.0
#11.0
#69.0
#73.0
#90.0
#97.0
#107.0
#226.0
#266.0
#274.0

输出的队列也不是我们期待的自然数列,并且线程被阻断。这是因为加 1 操作和入队操作不同步,可能加 1 操作执行了很多次之后,才会进行一次入队操作。

上述代码最后报异常,然后会话自动关闭。入队线程自顾自地执行,在需要的出队操作完成之后,程序没法结束,一直到超过队列的容量之后,会导致cancalledError。因此需要协调器来管理线程。

协调器coordinator
可以解决上面的入队线程不受控制的情况。

q=tf.FIFOQueue(1000,'float')
counter=tf.Variable(0.0)
incre_op=tf.assign_add(counter,tf.constant(1.0))
enqueue_op=q.enqueue(counter)

qr=tf.train.QueueRunner(q,enqueue_ops=[incre_op,enqueue_op]*1)
sess=tf.Session()
sess.run(tf.global_variables_initializer())
coord=tf.train.Coordinator() #协调器,协调线程间的关系可以视为一种信号量,用来做同步
enqueue_threads = qr.create_threads(sess, coord = coord,start=True)

for i in range(0,10):
    print(sess.run(q.dequeue()))
    
coord.request_stop()# 通知其他线程关闭
coord.join(enqueue_threads)# join 操作等待其他线程结束,其他所有线程关闭之后,这一函数才能返回

#3.0
#20.0
#304.0
#1118.0
#1164.0
#1242.0
#1311.0
#1320.0
#1381.0
#1387.0

这个很奇怪,并没有按照书上说的关闭线程之后再执行出队操作,就会抛出 tf.errors.OutOfRange 错误。而且

print(sess.run(q.size()))
#每次都不一样,这次是170
for i in range(0,10):
    print(sess.run(q.dequeue()))
#在此实行上面的代码,得到10个1387
上一篇 下一篇

猜你喜欢

热点阅读