斯坦福机器学习课程

CS20si 第9课: 输入流程与风格迁移

2018-08-02  本文已影响7人  tech0ne

第9课: 输入流程与风格迁移

CS20si课程资料和代码Github地址

在看完GANs后,课程回到TensorFlow的正题上来。

队列(Queue)和协调器(Coordinator)

我们简要提到过队列但是从没有详细讨论它,在TensorFlow文档中,队列被描述为:“在计算图中实现异步计算的重要对象”。

如果你做过任何深度学习的项目,可能就不需要我来说服你为什么需要异步编程。在输入管道中,多个线程可以帮助我们减轻读取数据阶段的瓶颈,因为读取数据需要等很长时间。例如用队列来准备训练模型所需的数据,我们需要:

TensorFlow的Session对象被设计为支持多线程的,所以多个线程可以简单的用同一个Session并行的执行运算。然而,实现一个Python程序像上面描述那样驾驭线程并不那么容易。所有线程必须能够一起停止,异常必须被捕获并报出,而且停止时队列必须被合理的关闭。

文档上看起来像是在使用队列时多线程是可选的,但是实际上不用多线程,你的队列很可能会阻塞然后使程序崩溃。幸运的是,TensorFlow提供了两个类帮助使用多线程队列:tf.Coordinatortf.QueueRunner,这两个类需要一起使用。Coordinator类帮助多个线程一起停止并报告异常,QueueRunner类用来创建一定数量的线程推送tensors到一个队列中。

这里有两个主要的队列类:tf.FIFOQueuetf.RandomShuffleQueue。FIFOQueue创建一个先入先出的队列,而RandomShuffleQueue创建随机出队的队列。这两个队列支持enqueueenqueue_manydequeue的操作。一种常见的做法是在读取数据时enqueue多个样本,然后一个个的dequeue它们,dequeue_many是不允许的。如果你需要为mini-batch训练读取多个样本时,可以使用tf.train.batch或者tf.train.shuffle_batch

import tensorflow as tf

q = tf.FIFOQueue(3, "float")
init = q.enqueue_many(([0., 0., 0.],))

x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y])

with tf.Session() as sess:
    init.run()
    q_inc.run()
    q_inc.run()
    q_inc.run()
    q_inc.run()
    print(sess.run(x))
    print(sess.run(x))
    print(sess.run(x))

tf.PaddingFIFOQueue是支持批量大小可变的FIFOQueue,它还支持dequeue_many。有时候你需要灌入不同大小batch,例如NLP中的序列到序列模型,大多数时候你希望以一句话作为一个batch,但是句子的长度是不同的。除此之外还有tf.PriorityQueue,它的进队和出队参考另一个参数:权重(priority)。

我不知道为什么只有PaddingFIFOQueue支持dequeue_many,但是从TensorFlow的GitHub上报告的问题来看似乎是因为其它队列在使用'dequeue_many'时出现了很多问题所以被禁止了。

你可以不使用参数创建一个队列,例如参数:min_after_dequeue
(出队列后队列中元素的最小数量)、bounded capacity
(队列中的最大元素个数)和队列中元素的形状(如果形状为None,
元素可以是任何形状)。然而在实践中很少单独使用队列,而总是和string_input_producer一起使用,因此我们将简要地过一下这一节,而在string_input_producer中详细介绍。

tf ​. ​RandomShuffleQueue ​( ​capacity ​, ​ min_after_dequeue ​, ​ dtypes ​, ​ shapes ​= ​None ​, ​ names ​= ​None ​,
seed ​= ​None ​, ​ shared_name ​= ​None ​, ​ name ​= ​'random_shuffle_queue')

在课程GitHub中的09_queue_example.py中可以看到这样一个列子。

N_SAMPLES ​= ​ ​1000
NUM_THREADS ​= ​ 4
# Generating some simple data
# create 1000 random samples, each is a 1D array from the normal distribution (10, 1)
data ​= ​ ​10 ​ ​* ​ np ​. ​random ​. ​randn ​( ​N_SAMPLES ​, ​ ​4 ​) ​ ​+ ​ ​1
# create 1000 random labels of 0 and 1
target ​= ​ np ​. ​random ​. ​randint ​( ​0 ​, ​ ​2 ​, ​ size ​= ​N_SAMPLES ​)
queue ​= ​ tf ​. ​FIFOQueue ​( ​capacity ​= ​50 ​, ​ dtypes ​=[ ​tf ​. ​float32 ​, ​ tf ​. ​int32 ​], ​ shapes ​=[[ ​4 ​], ​ ​[]])
enqueue_op ​= ​ queue ​. ​enqueue_many ​([ ​data ​, ​ target ​])
dequeue_op ​= ​ queue ​. ​dequeue ​()
# create NUM_THREADS to do enqueue
qr ​= ​ tf ​. ​train ​. ​QueueRunner ​( ​queue ​, ​ ​[ ​enqueue_op ​] ​ ​* ​ NUM_THREADS)
with ​ tf ​. ​Session ​() ​ ​as ​ sess:
    # Create a coordinator, launch the queue runner threads.
    coord ​= ​ tf ​. ​train ​. ​Coordinator ​()
    enqueue_threads ​= ​ qr ​. ​create_threads ​( ​sess ​, ​ coord ​= ​coord ​, ​ start ​= ​True)
    for ​ step ​in ​ xrange ​( ​100 ​): ​ ​# do to 100 iterations
        if ​ coord ​. ​should_stop ​():
            break
        data_batch ​, ​ label_batch ​= ​ sess ​. ​run ​( ​dequeue_op)
    coord ​. ​request_stop ​()
    coord ​. ​join ​( ​enqueue_threads)

在TensorFlow队列中你也可以不使用tf.Coordinator,但是可以使用它来管理你创建的任何线程。例如你使用Python线程包创建线程做一些事,你仍然可以使用tf.Coordinator来管理这些线程。(译者:在1.8版中为tf.train.Coordinator)

import threading
# thread body: loop until the coordinator indicates a stop was requested.
# if some condition becomes true, ask the coordinator to stop.
def ​ my_loop ​( ​coord ​):
    while ​ ​not ​ coord ​. ​should_stop ​():
        ... ​do ​ something ​...
        if ​ ​... ​some condition ​...:
            coord ​. ​request_stop ​()
# main code: create a coordinator.
coord ​= ​ tf ​. ​Coordinator ​()
# create 10 threads that run 'my_loop()'
# you can also create threads using QueueRunner as the example above
threads ​= ​ ​[ ​threading ​. ​Thread ​( ​target ​= ​my_loop ​, ​ args ​=( ​coord ​,)) ​ ​for ​ _ ​in ​ xrange ​( ​10 ​)]
# start the threads and wait for all of them to stop.
for ​ t ​in ​ threads ​:
    t ​. ​start ​()
coord ​. ​join ​( ​threads)

数据读取器(Data Reader)

我们已经学习了3种TensorFlow读取数据的方法,第一种是从常量(Constant)中读取,第二种是用feed_dict读取,第三种也是最常用的做法是用DataReader直接从存储中读取数据。

TensorFlow为常见的数据类型内建了一些Reader,最通用的一个是TextLineReader,它每次读取一行。除此之外还有读定长数据的Reader,读取整个文件的Reader,读取TFRecord类型数据(下面要讲到)的Reader。

tf ​. ​TextLineReader
Outputs ​ the lines of a file delimited ​by ​ newlines
E ​. ​g ​. ​ text files ​, ​ CSV files
tf ​. ​FixedLengthRecordReader
Outputs ​ the entire file ​when ​ all files have same ​fixed ​ lengths
E ​. ​g ​. ​ each MNIST file has ​28 ​ x ​28 ​ pixels ​, ​ CIFAR ​- ​10 ​ ​32 ​ x ​32 ​ x 3
tf ​. ​WholeFileReader
Outputs ​ the entire file content. This is useful when each file contains a sample
tf ​. ​TFRecordReader
Reads ​ samples ​from ​ ​TensorFlow ​' ​s own binary format ​( ​TFRecord)
tf ​. ​ReaderBase
Allows ​ you to create your own readers

要使用Data Reader,我们首先要建立一个队列并用tf.train.string_input_producer获得所有你要读取的文件名。

filename_queue ​= ​ tf ​. ​train ​. ​string_input_producer ​([ ​"heart.csv" ​])
reader ​= ​ tf ​. ​TextLineReader ​(skip_header_lines=1)
# it means you choose to skip the first line for every file in the queue

你可以将Reader想象成你每调用一次只返回一个值的运算 - 类似于Python的生成器(generator)。所以当你调用reader.read()的时候它返回一个键值对,其中的键是能够标识文件和数据的字符串。

key ​, ​ value ​= ​ reader ​. ​read ​( ​filename_queue)

例如上面的语句可能返回:

key ​ ​= ​ data ​/ ​heart ​. ​csv ​:2
value ​ ​= ​ ​144 ​, ​0.01 ​, ​4.41 ​, ​28.61 ​, ​Absent ​, ​55 ​, ​28.87 ​, ​2.06 ​, ​63 ​,1

表示数据​144 ​, ​0.01 ​, ​4.41 ​, ​28.61 ​, ​Absent ​, ​55 ​, ​28.87 ​, ​2.06 ​, ​63 ​,1是文件data/heart.csv的第2航。

tf.train.string_input_producer在后台创建了一个FIFOQueue,所以要用队列,我们需要同时使用tf.Coordinatortf.QueueRunner

filename_queue ​= ​ tf ​. ​train ​. ​string_input_producer ​( ​filenames)
reader ​= ​ tf ​. ​TextLineReader ​( ​skip_header_lines ​= ​1 ​) ​ ​# skip the first line in the file
key ​, ​ value ​= ​ reader ​. ​read ​( ​filename_queue)
with ​ tf ​. ​Session ​() ​ ​as ​ sess:
    coord ​= ​ tf ​. ​train ​. ​Coordinator ​()
    threads ​= ​ tf ​. ​train ​. ​start_queue_runners ​( ​coord ​= ​coord)
    ​print ​ sess ​. ​run ​( ​key) # data ​/ ​heart ​. ​csv ​:2
    ​print ​ sess ​. ​run ​( ​value) # ​144 ​, ​0.01 ​, ​4.41 ​, ​28.61 ​, ​Absent ​, ​55 ​, ​28.87 ​, ​2.06 ​, ​63 ​,1
    coord ​. ​request_stop ​()
    coord ​. ​join ​( ​threads)

我们获得的是value是字符串tensor,接下来用TensorFlow的CSV解码器将value转换为向量。

content ​= ​ tf . decode_csv ​( ​value ​, ​ record_defaults ​= ​record_defaults ​)

上面的record_defaults需要我们自己建立,它表示两点内容:

定义所有列的类型和初始值。

record_defaults ​= ​ ​[[ ​1.0 ​] ​ ​for ​ _ ​in ​ range ​( ​N_FEATURES ​)] # define all features to be floats
record_defaults ​[ ​4 ​] ​ ​= ​ ​[ ​''] # make the fifth feature string
record_defaults ​. ​append ​([ ​1 ​])
content ​= ​ tf ​. ​decode_csv ​( ​value ​, ​ record_defaults ​= ​record_defaults ​)

你可以在灌数据之前做所有你想做的数据预处理。例如我们的数据有8个浮点值,1个字符串和1个整数值,我们将把字符串转换为浮点数,然后将9个特征转换为一个tensor以便灌入模型。

# convert the 5th column (present/absent) to the binary value 0 and 1
condition ​= ​ tf ​. ​equal ​( ​content ​[ ​4 ​], ​ tf ​. ​constant ​( ​'Present' ​))
content ​[ ​4 ​] ​ ​= ​ tf ​. ​select ​( ​condition ​, ​ tf ​. ​constant ​( ​1.0 ​), ​ tf ​. ​constant ​( ​0.0 ​))
# pack all 9 features into a tensor
features ​= ​ tf ​. ​pack ​( ​content ​[: ​N_FEATURES ​])
# assign the last column to label
label ​= ​ content ​[- ​1]

于是每一次Reader从CSV文件中读取一行,就会被转换为特征向量和标签。

但是你一般不想灌入一个单独的样本到模型中,而是希望灌入一个batch数据。你可以用tf.train.batch或者tf.train.shuffle_batch做这个。

# minimum number elements in the queue after a dequeue, used to ensure
# that the samples are sufficiently mixed
# I think 10 times the BATCH_SIZE is sufficient
min_after_dequeue ​= ​ ​10 ​ ​* ​ BATCH_SIZE
# the maximum number of elements in the queue
capacity ​= ​ ​20 ​ ​* ​ BATCH_SIZE
# shuffle the data to generate BATCH_SIZE sample pairs
data_batch ​, ​ label_batch ​= ​ tf ​. ​train ​. ​shuffle_batch ​([ ​features ​, ​ label ​], ​ batch_size ​= ​BATCH_SIZE ​,
                                        capacity ​= ​capacity ​, ​ min_after_dequeue ​= ​min_after_dequeue)

这样就做完了,你可以简单的像在以前的模型中使用input_placeholderlabel_placeholder中那样使用data_batchlabel_batch,除非你不需要通过feed_dict灌数据。完整的代码在课程GitHub中的05_csv_reader.py中。

TFRecord

二进制文件非常有用,虽然我遇到过很多人都不喜欢用,因为他们认为二进制文件很麻烦。如果你是他们中的一员,我希望通过这节课能够帮助你克服对二进制文件的非理性恐惧。它们能更好的利用磁盘缓存,它们能很快的迁移,它们能存储不同类型的数据。(所以你可以把图片和标签放在一个地方)

像很多机器学习框架一样,TensorFlow有自己的二进制数据格式,名叫TFRecord。TFRecord是一个序列化的tf.train.Example类型的Protobuf对象,可以用简单几行代码进行创建。下面是将一幅图片转换为TFRecord的例子。

首先,我们需要读取图片然后将它转换为二进制字节流。

def ​get_image_binary ​( ​filename ​):
    image ​= ​ ​Image ​. ​open ​( ​filename)
    image ​= ​ np ​. ​asarray ​( ​image ​, ​ np ​. ​uint8)
    shape ​= ​ np ​. ​array ​( ​image ​. ​shape ​, ​ np ​. ​int32)
    ​return ​ shape ​. ​tobytes ​(), ​ image ​. ​tobytes ​() ​ ​# convert image to raw data bytes in the array.

下一步,用tf.python_io.TFRecordWritertf.train.Feature将这些字节写入TFRecord。你需要shape信息去重建图片。

def ​write_to_tfrecord ​( ​label ​, ​ shape ​, ​ binary_image ​, ​ tfrecord_file ​):
    ​""" This example is to write a sample to TFRecord file. If you want to write
    more samples ​, ​ just ​use ​ a loop.
    ​"""
    writer ​= ​ tf ​. ​python_io ​. ​TFRecordWriter ​( ​tfrecord_file)
    ​# write label, shape, and image content to the TFRecord file
    example ​= ​ tf ​. ​train ​. ​Example ​( ​features ​= ​tf ​. ​train ​. ​Features ​( ​feature ​={
                ​'label' ​: ​ tf ​. ​train ​. ​Feature ​( ​bytes_list ​= ​tf ​. ​train ​. ​BytesList ​( ​value ​=[ ​label ​])),
                ​'shape' ​: ​ tf ​. ​train ​. ​Feature ​( ​bytes_list ​= ​tf ​. ​train ​. ​BytesList ​( ​value ​=[ ​shape ​])),
                ​'image' ​: ​tf ​. ​train ​. ​Feature ​( ​bytes_list ​= ​tf ​. ​train ​. ​BytesList ​(
                value ​=[ ​binary_image ​]))
                ​}))
    writer ​. ​write ​( ​example ​. ​SerializeToString ​())
    writer ​. ​close ​()

要读取一个TFRecord文件,可以用TFRecordReadertf.decode_raw

def ​read_from_tfrecord ​( ​filenames ​):
    tfrecord_file_queue ​= ​ tf ​. ​train ​. ​string_input_producer ​( ​filenames ​, ​ name ​= ​'queue' ​)
    reader ​= ​ tf ​. ​TFRecordReader ​()
    _ ​, ​ tfrecord_serialized ​= ​ reader ​. ​read ​( ​tfrecord_file_queue ​)
    ​# label and image are stored as bytes but could be stored as ​# int64 or float64 values in a serialized tf.Example protobuf.
    tfrecord_features ​= ​ tf ​. ​parse_single_example ​( ​tfrecord_serialized ​,
                        features ​={
                        ​'label' ​: ​ tf ​. ​FixedLenFeature ​([], ​ tf ​. ​string ​),
                        ​'shape' ​: ​ tf ​. ​FixedLenFeature ​([], ​ tf ​. ​string ​),
                        ​'image' ​: ​ tf ​. ​FixedLenFeature ​([], ​ tf ​. ​string ​),
                        ​}, ​ name ​= ​'features' ​)
    ​# image was saved as uint8, so we have to decode as uint8.
    image ​= ​ tf ​. ​decode_raw ​( ​tfrecord_features ​[ ​'image' ​], ​ tf ​. ​uint8 ​)
    shape ​= ​ tf ​. ​decode_raw ​( ​tfrecord_features ​[ ​'shape' ​], ​ tf ​. ​int32 ​)
    ​# the image tensor is flattened out, so we have to reconstruct the shape
    image ​= ​ tf ​. ​reshape ​( ​image ​, ​ shape ​)
    label ​= ​ tf ​. ​cast ​( ​tfrecord_features ​[ ​'label' ​], ​ tf ​. ​string ​)
    ​return ​ label ​, ​ shape ​, ​ image

记住这些标签、图片还原成一些tensor对象,要获得它们的值你必须使用tf.Session()计算。

风格迁移

这部分内容在第二次作业中。

上一篇 下一篇

猜你喜欢

热点阅读