程序员

TF 2 Estimators

2021-01-17  本文已影响0人  数科每日

本文是对官方文档 的学习笔记。


本文 tf.estimator , 一个high-level TensorFlow API. Estimators 包括以下几个组件:

TensorFlow实现了几个预制的 Estimators 。不过 TF2 也支持自定义Estimators ,但自定义Estimators主要是作为向后兼容性的度量。自定义Estimators不应用于新代码。所有Estimators(预制或自定义的)都是基于tf.estimator.Estimator类的类。

有关快速示例,请尝试 Estimators 教程。有关API设计的概述,请查看白皮书

预备

pip install -q -U tensorflow_datasets

import tempfile
import os

import tensorflow as tf
import tensorflow_datasets as tfds

优势

tf.keras.Model 相似,估计器是模型级别的抽象。 [tf.estimator](https://www.tensorflow.org/api_docs/python/tf/estimator提供了一些仍在为tf.keras开发的功能。这些是:

Estimator 能力

Estimator 有如下好处

使用Estimators编写应用程序时,必须将数据输入管道与模型分开。这种分离简化了使用不同数据集。

使用预置 Estimator

预制的Estimator 使您可以在比基本TensorFlow API更高的概念层次上工作。无需再担心创建计算图或会话,因为Estimators会处理所有“管道”。此外,预制的Estimators使开发者仅需进行最少的代码更改即可尝试不同的模型架构。例如,tf.estimator.DNNClassifier是一个预制的Estimator类,它基于密集的前馈神经网络训练分类模型。

依赖于预制Estimator的TensorFlow程序通常包括以下四个步骤:

1. 开发Input 函数

例如,可以创建一个函数来导入训练集,而另一个函数来导入测试集。Estimator 希望将其输入格式化为一对对象:

input_fn应该返回一个tf.data.Dataset,以以上格式产生对。

例如,以下代码从泰坦尼克号数据集的train.csv文件构建tf.data.Dataset:

def train_input_fn():
  titanic_file = tf.keras.utils.get_file("train.csv", "https://storage.googleapis.com/tf-datasets/titanic/train.csv")
  titanic = tf.data.experimental.make_csv_dataset(
      titanic_file, batch_size=32,
      label_name="survived")
  titanic_batches = (
      titanic.cache().repeat().shuffle(500)
      .prefetch(tf.data.AUTOTUNE))
  return titanic_batches

input_fn在tf.Graph中执行,并且还可以直接返回包含图张量的(features_dics,labels)对,但这在诸如返回常量之类的简单情况之外容易出错。

2. 定义Feature 列

每个tf.feature_column 都会标识feature 名称,其类型以及任何输入预处理。

例如,以下代码片段创建了三个要素列。

age = tf.feature_column.numeric_column('age')
cls = tf.feature_column.categorical_column_with_vocabulary_list('class', ['First', 'Second', 'Third']) 
embark = tf.feature_column.categorical_column_with_hash_bucket('embark_town', 32)

3. 实例化相关预设 Estimator

例如,下面是一个名为LinearClassifier的预制Estimator的示例实例

model_dir = tempfile.mkdtemp()
model = tf.estimator.LinearClassifier(
    model_dir=model_dir,
    feature_columns=[embark, cls, age],
    n_classes=2
)

model_dir = tempfile.mkdtemp()
model = tf.estimator.LinearClassifier(
model_dir=model_dir,
feature_columns=[embark, cls, age],
n_classes=2
)

4. 训练,评估,预测

model = model.train(input_fn=train_input_fn, steps=100)
result = model.evaluate(train_input_fn, steps=10)
for key, value in result.items():
  print(key, ":", value)
for pred in model.predict(train_input_fn):
  for key, value in pred.items():
    print(key, ":", value)
  break

预设 Estimators 的好处

预制的估算器对最佳实践进行编码,具有以下优点:

如果不使用预制的Estimators,则必须自己实现上述功能。

定制化Estimators

无论是预制的还是定制的,每个Estimator的核心都是其模型函数model_fn,该函数可以构建用于训练,评估和预测。当您使用预制的Estimators 时,其他人已经实现了模型功能。依靠自定义Estimator时,您必须自己编写模型函数。

注意:自定义model_fn仍将以 Tensorflow 1.x样式的图形模式运行。这意味着没有eager run,也没有自动控制的依赖关系。您应该计划使用自定义model_fn从tf.estimator迁移。备用API是tf.keras和tf.distribute。如果您在训练的某些部分仍需要估算器,则可以使用tf.keras.estimator.model_to_estimator转换器从keras.Model创建估算器。

根据Keras模型创建估算器

您可以使用tf.keras.estimator.model_to_estimator将现有的Keras模型转换为Estimators。如果要现代化模型代码,这很有用,但是您的训练流程仍然需要Estimators。

实例化Keras MobileNet V2模型,并使用优化器,损失和度量对模型进行编译,以进行以下训练:

keras_mobilenet_v2 = tf.keras.applications.MobileNetV2(
    input_shape=(160, 160, 3), include_top=False)
keras_mobilenet_v2.trainable = False

estimator_model = tf.keras.Sequential([
    keras_mobilenet_v2,
    tf.keras.layers.GlobalAveragePooling2D(),
    tf.keras.layers.Dense(1)
])

# Compile the model
estimator_model.compile(
    optimizer='adam',
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
    metrics=['accuracy'])

根据已编译的Keras模型创建一个Estimator。 Keras模型的初始模型状态保留在创建的Estimator中:

est_mobilenet_v2 = tf.keras.estimator.model_to_estimator(keras_model=estimator_model)

像对待任何其他估算器一样对待派生的估算器。

IMG_SIZE = 160  # All images will be resized to 160x160

def preprocess(image, label):
  image = tf.cast(image, tf.float32)
  image = (image/127.5) - 1
  image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
  return image, label

def train_input_fn(batch_size):
  data = tfds.load('cats_vs_dogs', as_supervised=True)
  train_data = data['train']
  train_data = train_data.map(preprocess).shuffle(500).batch(batch_size)
  return train_data

要进行训练,请调用Estimator的训练功能:

est_mobilenet_v2.train(input_fn=lambda: train_input_fn(32), steps=50)

同样,要进行评估,请调用Estimator的评估函数:

est_mobilenet_v2.evaluate(input_fn=lambda: train_input_fn(32), steps=10)

有关更多详细信息,请参阅tf.keras.estimator.model_to_estimator 的文档。

使用Estimator保存基于对象的检查点

默认情况下,Estimators 使用变量名称而不是“Checkpoint”指南中描述的对象图来保存检查点。 tf.train.Checkpoint将读取基于名称的检查点,但是将模型的一部分移到Estimator的model_fn之外时,变量名称可能会更改。为了向前兼容,保存基于对象的检查点可以更轻松地在Estimator中训练模型,然后在一个模型之外使用它。

import tensorflow.compat.v1 as tf_compat

def toy_dataset():
  inputs = tf.range(10.)[:, None]
  labels = inputs * 5. + tf.range(5.)[None, :]
  return tf.data.Dataset.from_tensor_slices(
    dict(x=inputs, y=labels)).repeat().batch(2)

class Net(tf.keras.Model):
  """A simple linear model."""

  def __init__(self):
    super(Net, self).__init__()
    self.l1 = tf.keras.layers.Dense(5)

  def call(self, x):
    return self.l1(x)

def model_fn(features, labels, mode):
  net = Net()
  opt = tf.keras.optimizers.Adam(0.1)
  ckpt = tf.train.Checkpoint(step=tf_compat.train.get_global_step(),
                             optimizer=opt, net=net)
  with tf.GradientTape() as tape:
    output = net(features['x'])
    loss = tf.reduce_mean(tf.abs(output - features['y']))
  variables = net.trainable_variables
  gradients = tape.gradient(loss, variables)
  return tf.estimator.EstimatorSpec(
    mode,
    loss=loss,
    train_op=tf.group(opt.apply_gradients(zip(gradients, variables)),
                      ckpt.step.assign_add(1)),
    # Tell the Estimator to save "ckpt" in an object-based format.
    scaffold=tf_compat.train.Scaffold(saver=ckpt))

tf.keras.backend.clear_session()
est = tf.estimator.Estimator(model_fn, './tf_estimator_example/')
est.train(toy_dataset, steps=10)

然后,tf.train.Checkpoint可以从其model_dir加载估算器的检查点。

opt = tf.keras.optimizers.Adam(0.1)
net = Net()
ckpt = tf.train.Checkpoint(
  step=tf.Variable(1, dtype=tf.int64), optimizer=opt, net=net)
ckpt.restore(tf.train.latest_checkpoint('./tf_estimator_example/'))
ckpt.step.numpy()  # From est.train(..., steps=10)

保存 Estimator 模型

Estimator 通过tf.Estimator.export_saved_model导出SavedModels。

input_column = tf.feature_column.numeric_column("x")

estimator = tf.estimator.LinearClassifier(feature_columns=[input_column])

def input_fn():
  return tf.data.Dataset.from_tensor_slices(
    ({"x": [1., 2., 3., 4.]}, [1, 1, 0, 0])).repeat(200).shuffle(64).batch(16)
estimator.train(input_fn)

要保存Estimator ,您需要创建一个serving_input_receiver。此函数构建tf.Graph的一部分,该tf.Graph解析由SavedModel接收的原始数据。

tf.estimator.export 模块包含帮助构建这些接收器的函数。

以下代码基于feature_columns构建一个接收器,该接收器接受序列化的tf。协议缓冲区示例,通常与tf-serving 一起使用。

tmpdir = tempfile.mkdtemp()

serving_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
  tf.feature_column.make_parse_example_spec([input_column]))

estimator_base_path = os.path.join(tmpdir, 'from_estimator')
estimator_path = estimator.export_saved_model(estimator_base_path, serving_input_fn)

还可以从python加载并运行该模型:

imported = tf.saved_model.load(estimator_path)

def predict(x):
  example = tf.train.Example()
  example.features.feature["x"].float_list.value.extend([x])
  return imported.signatures["predict"](
    examples=tf.constant([example.SerializeToString()]))

print(predict(1.5))
print(predict(3.5))

tf.estimator.export.build_raw_serving_input_receiver_fn 允许您创建采用raw Tensor而不是 [tf.train.Example (https://www.tensorflow.org/api_docs/python/tf/train/Example)

将 tf.distribute.Strategy与Estimator结合使用(有限支持)

tf.estimator是一个分布式Training TensorFlow API,最初支持异步参数服务器方法。 tf.estimator现在支持tf.distribute.Strategy。如果您使用的是tf.estimator,则只需更改很少的代码即可更改为分布式训练。这样,Estimator用户现在可以在多个GPU和多个工作程序上进行同步分布式训练,以及使用TPU。但是,Estimator中的这种支持是有限的。请查看下面的“现在支持什么”部分以了解更多详细信息。

在Estimator中使用tf.distribute.Strategy与Keras情况稍有不同。现在,无需使用strategy.scope,而是将策略对象传递到Estimate的RunConfig中。

可以参考distributed training guide
以获取更多信息。

这是一段代码片段,其中使用预制的Estimator LinearRegressor和MirroredStrategy进行了:

mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)

在这里,您使用预制的Estimator,但是相同的代码也可以与自定义Estimator一起使用。 train_distribute确定训练的分配方式,而eval_distribute确定评估的分配方式。这与Keras的另一个区别是,您在训练和评估中都使用相同的策略。

现在,您可以使用输入函数来训练和评估此估算器:

def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)

Estimator和Keras之间要强调的另一个区别是输入处理。在Keras中,每批数据集都会自动拆分为多个副本。但是,在Estimator中,您不会执行自动批量拆分,也不会在不同的工作程序之间自动分片数据。您可以完全控制希望如何在woker和device之间分配数据,并且必须提供input_fn来指定如何分配数据。

每个worker调用一次input_fn,因此每个worker给出一个数据集。然后将来自该数据集的一批物料馈送到该工作程序上的一个副本,从而消耗1个工作程序上的N个副本的N个批次。换句话说,input_fn返回的数据集应提供PER_REPLICA_BATCH_SIZE大小的批次。步骤的全局批处理大小可以通过PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync获得。

在进行worker training时,您应该将数据拆分给各个worker ,或者在每个worker 上随机散布种子。您可以在使用Estimator进行的multi training 中查看如何执行此操作的示例。

同样,您也可以使用多worker 和参数服务器策略。代码保持不变,但是需要使用tf.estimator.train_and_evaluate,并为集群中运行的每个二进制文件设置TF_CONFIG环境变量。

现在支持什么?

除TPUStrategy外,使用所有策略进行Estimator训练的支持有限。基本的训练和评估应该可以,但是v1.train.Scaffold等许多高级功能不起作用。此集成中可能还存在许多错误,并且没有计划积极改善这种支持(重点是Keras和自定义训练循环支持)。如果有可能,您应该更喜欢将tf.distribute与这些API结合使用。

image.png

以下是一些端到端示例,这些示例展示了如何在Estimator中使用各种策略:

上一篇 下一篇

猜你喜欢

热点阅读