TF 2 Estimators
本文是对官方文档 的学习笔记。
本文 tf.estimator
, 一个high-level TensorFlow API. Estimators 包括以下几个组件:
- 训练
- 评估
- 预测
- 部署到服务器
TensorFlow实现了几个预制的 Estimators 。不过 TF2 也支持自定义Estimators ,但自定义Estimators主要是作为向后兼容性的度量。自定义Estimators不应用于新代码。所有Estimators(预制或自定义的)都是基于tf.estimator.Estimator类的类。
有关快速示例,请尝试 Estimators 教程。有关API设计的概述,请查看白皮书。
预备
pip install -q -U tensorflow_datasets
import tempfile
import os
import tensorflow as tf
import tensorflow_datasets as tfds
优势
与tf.keras.Model
相似,估计器是模型级别的抽象。 [tf.estimator
](https://www.tensorflow.org/api_docs/python/tf/estimator提供了一些仍在为tf.keras开发的功能。这些是:
- 基于参数的训练
- 完整的TFX集成
Estimator 能力
Estimator 有如下好处
-
你可以在本地主机或分布式多服务器环境上运行基于Estimator的模型,而无需更改模型。此外,您可以在CPU,GPU或TPU上运行基于Estimator的模型,而无需重新编码模型。
-
Estimator 提供了一个安全的分布式训练循环,可控制何时何地
- 加载数据
- 处理异常
- 创建Checkpoint 并且从失败中恢复
- 保存summary 到 TensorBoard
使用Estimators编写应用程序时,必须将数据输入管道与模型分开。这种分离简化了使用不同数据集。
使用预置 Estimator
预制的Estimator 使您可以在比基本TensorFlow API更高的概念层次上工作。无需再担心创建计算图或会话,因为Estimators会处理所有“管道”。此外,预制的Estimators使开发者仅需进行最少的代码更改即可尝试不同的模型架构。例如,tf.estimator.DNNClassifier是一个预制的Estimator类,它基于密集的前馈神经网络训练分类模型。
依赖于预制Estimator的TensorFlow程序通常包括以下四个步骤:
1. 开发Input 函数
例如,可以创建一个函数来导入训练集,而另一个函数来导入测试集。Estimator 希望将其输入格式化为一对对象:
- 一个字典,其中的键是 feature 名称,值是包含相应要素数据的Tensor(或SparseTensors)
- 包含1个或者多个 lable 的Tensor
input_fn应该返回一个tf.data.Dataset,以以上格式产生对。
例如,以下代码从泰坦尼克号数据集的train.csv文件构建tf.data.Dataset:
def train_input_fn():
titanic_file = tf.keras.utils.get_file("train.csv", "https://storage.googleapis.com/tf-datasets/titanic/train.csv")
titanic = tf.data.experimental.make_csv_dataset(
titanic_file, batch_size=32,
label_name="survived")
titanic_batches = (
titanic.cache().repeat().shuffle(500)
.prefetch(tf.data.AUTOTUNE))
return titanic_batches
input_fn在tf.Graph中执行,并且还可以直接返回包含图张量的(features_dics,labels)对,但这在诸如返回常量之类的简单情况之外容易出错。
2. 定义Feature 列
每个tf.feature_column
都会标识feature 名称,其类型以及任何输入预处理。
例如,以下代码片段创建了三个要素列。
- 第一种将age 直接用作浮点输入。
- 第二种使用 class 功能作为分类输入。
- 第三个使用embark_town作为分类输入,但是使用hashing trick避免了枚举选项和设置选项的数量。
age = tf.feature_column.numeric_column('age')
cls = tf.feature_column.categorical_column_with_vocabulary_list('class', ['First', 'Second', 'Third'])
embark = tf.feature_column.categorical_column_with_hash_bucket('embark_town', 32)
3. 实例化相关预设 Estimator
例如,下面是一个名为LinearClassifier的预制Estimator的示例实例
model_dir = tempfile.mkdtemp()
model = tf.estimator.LinearClassifier(
model_dir=model_dir,
feature_columns=[embark, cls, age],
n_classes=2
)
model_dir = tempfile.mkdtemp()
model = tf.estimator.LinearClassifier(
model_dir=model_dir,
feature_columns=[embark, cls, age],
n_classes=2
)
4. 训练,评估,预测
model = model.train(input_fn=train_input_fn, steps=100)
result = model.evaluate(train_input_fn, steps=10)
for key, value in result.items():
print(key, ":", value)
for pred in model.predict(train_input_fn):
for key, value in pred.items():
print(key, ":", value)
break
预设 Estimators 的好处
预制的估算器对最佳实践进行编码,具有以下优点:
- 确定计算图的不同部分应在何处运行的最佳实践,在一台计算机或群集上实施策略。
- 事件(摘要)编写和通用摘要的最佳做法。
如果不使用预制的Estimators,则必须自己实现上述功能。
定制化Estimators
无论是预制的还是定制的,每个Estimator的核心都是其模型函数model_fn,该函数可以构建用于训练,评估和预测。当您使用预制的Estimators 时,其他人已经实现了模型功能。依靠自定义Estimator时,您必须自己编写模型函数。
注意:自定义model_fn仍将以 Tensorflow 1.x样式的图形模式运行。这意味着没有eager run,也没有自动控制的依赖关系。您应该计划使用自定义model_fn从tf.estimator迁移。备用API是tf.keras和tf.distribute。如果您在训练的某些部分仍需要估算器,则可以使用tf.keras.estimator.model_to_estimator转换器从keras.Model创建估算器。
根据Keras模型创建估算器
您可以使用tf.keras.estimator.model_to_estimator将现有的Keras模型转换为Estimators。如果要现代化模型代码,这很有用,但是您的训练流程仍然需要Estimators。
实例化Keras MobileNet V2模型,并使用优化器,损失和度量对模型进行编译,以进行以下训练:
keras_mobilenet_v2 = tf.keras.applications.MobileNetV2(
input_shape=(160, 160, 3), include_top=False)
keras_mobilenet_v2.trainable = False
estimator_model = tf.keras.Sequential([
keras_mobilenet_v2,
tf.keras.layers.GlobalAveragePooling2D(),
tf.keras.layers.Dense(1)
])
# Compile the model
estimator_model.compile(
optimizer='adam',
loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
metrics=['accuracy'])
根据已编译的Keras模型创建一个Estimator。 Keras模型的初始模型状态保留在创建的Estimator中:
est_mobilenet_v2 = tf.keras.estimator.model_to_estimator(keras_model=estimator_model)
像对待任何其他估算器一样对待派生的估算器。
IMG_SIZE = 160 # All images will be resized to 160x160
def preprocess(image, label):
image = tf.cast(image, tf.float32)
image = (image/127.5) - 1
image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
return image, label
def train_input_fn(batch_size):
data = tfds.load('cats_vs_dogs', as_supervised=True)
train_data = data['train']
train_data = train_data.map(preprocess).shuffle(500).batch(batch_size)
return train_data
要进行训练,请调用Estimator的训练功能:
est_mobilenet_v2.train(input_fn=lambda: train_input_fn(32), steps=50)
同样,要进行评估,请调用Estimator的评估函数:
est_mobilenet_v2.evaluate(input_fn=lambda: train_input_fn(32), steps=10)
有关更多详细信息,请参阅tf.keras.estimator.model_to_estimator
的文档。
使用Estimator保存基于对象的检查点
默认情况下,Estimators 使用变量名称而不是“Checkpoint”指南中描述的对象图来保存检查点。 tf.train.Checkpoint将读取基于名称的检查点,但是将模型的一部分移到Estimator的model_fn之外时,变量名称可能会更改。为了向前兼容,保存基于对象的检查点可以更轻松地在Estimator中训练模型,然后在一个模型之外使用它。
import tensorflow.compat.v1 as tf_compat
def toy_dataset():
inputs = tf.range(10.)[:, None]
labels = inputs * 5. + tf.range(5.)[None, :]
return tf.data.Dataset.from_tensor_slices(
dict(x=inputs, y=labels)).repeat().batch(2)
class Net(tf.keras.Model):
"""A simple linear model."""
def __init__(self):
super(Net, self).__init__()
self.l1 = tf.keras.layers.Dense(5)
def call(self, x):
return self.l1(x)
def model_fn(features, labels, mode):
net = Net()
opt = tf.keras.optimizers.Adam(0.1)
ckpt = tf.train.Checkpoint(step=tf_compat.train.get_global_step(),
optimizer=opt, net=net)
with tf.GradientTape() as tape:
output = net(features['x'])
loss = tf.reduce_mean(tf.abs(output - features['y']))
variables = net.trainable_variables
gradients = tape.gradient(loss, variables)
return tf.estimator.EstimatorSpec(
mode,
loss=loss,
train_op=tf.group(opt.apply_gradients(zip(gradients, variables)),
ckpt.step.assign_add(1)),
# Tell the Estimator to save "ckpt" in an object-based format.
scaffold=tf_compat.train.Scaffold(saver=ckpt))
tf.keras.backend.clear_session()
est = tf.estimator.Estimator(model_fn, './tf_estimator_example/')
est.train(toy_dataset, steps=10)
然后,tf.train.Checkpoint
可以从其model_dir加载估算器的检查点。
opt = tf.keras.optimizers.Adam(0.1)
net = Net()
ckpt = tf.train.Checkpoint(
step=tf.Variable(1, dtype=tf.int64), optimizer=opt, net=net)
ckpt.restore(tf.train.latest_checkpoint('./tf_estimator_example/'))
ckpt.step.numpy() # From est.train(..., steps=10)
保存 Estimator 模型
Estimator 通过tf.Estimator.export_saved_model导出SavedModels。
input_column = tf.feature_column.numeric_column("x")
estimator = tf.estimator.LinearClassifier(feature_columns=[input_column])
def input_fn():
return tf.data.Dataset.from_tensor_slices(
({"x": [1., 2., 3., 4.]}, [1, 1, 0, 0])).repeat(200).shuffle(64).batch(16)
estimator.train(input_fn)
要保存Estimator ,您需要创建一个serving_input_receiver。此函数构建tf.Graph
的一部分,该tf.Graph解析由SavedModel接收的原始数据。
tf.estimator.export
模块包含帮助构建这些接收器的函数。
以下代码基于feature_columns构建一个接收器,该接收器接受序列化的tf。协议缓冲区示例,通常与tf-serving 一起使用。
tmpdir = tempfile.mkdtemp()
serving_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
tf.feature_column.make_parse_example_spec([input_column]))
estimator_base_path = os.path.join(tmpdir, 'from_estimator')
estimator_path = estimator.export_saved_model(estimator_base_path, serving_input_fn)
还可以从python加载并运行该模型:
imported = tf.saved_model.load(estimator_path)
def predict(x):
example = tf.train.Example()
example.features.feature["x"].float_list.value.extend([x])
return imported.signatures["predict"](
examples=tf.constant([example.SerializeToString()]))
print(predict(1.5))
print(predict(3.5))
tf.estimator.export.build_raw_serving_input_receiver_fn
允许您创建采用raw Tensor而不是 [tf.train.Example
(https://www.tensorflow.org/api_docs/python/tf/train/Example)
将 tf.distribute.Strategy与Estimator结合使用(有限支持)
tf.estimator是一个分布式Training TensorFlow API,最初支持异步参数服务器方法。 tf.estimator现在支持tf.distribute.Strategy。如果您使用的是tf.estimator,则只需更改很少的代码即可更改为分布式训练。这样,Estimator用户现在可以在多个GPU和多个工作程序上进行同步分布式训练,以及使用TPU。但是,Estimator中的这种支持是有限的。请查看下面的“现在支持什么”部分以了解更多详细信息。
在Estimator中使用tf.distribute.Strategy与Keras情况稍有不同。现在,无需使用strategy.scope,而是将策略对象传递到Estimate的RunConfig中。
可以参考distributed training guide
以获取更多信息。
这是一段代码片段,其中使用预制的Estimator LinearRegressor和MirroredStrategy进行了:
mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
feature_columns=[tf.feature_column.numeric_column('feats')],
optimizer='SGD',
config=config)
在这里,您使用预制的Estimator,但是相同的代码也可以与自定义Estimator一起使用。 train_distribute确定训练的分配方式,而eval_distribute确定评估的分配方式。这与Keras的另一个区别是,您在训练和评估中都使用相同的策略。
现在,您可以使用输入函数来训练和评估此估算器:
def input_fn():
dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
Estimator和Keras之间要强调的另一个区别是输入处理。在Keras中,每批数据集都会自动拆分为多个副本。但是,在Estimator中,您不会执行自动批量拆分,也不会在不同的工作程序之间自动分片数据。您可以完全控制希望如何在woker和device之间分配数据,并且必须提供input_fn来指定如何分配数据。
每个worker调用一次input_fn,因此每个worker给出一个数据集。然后将来自该数据集的一批物料馈送到该工作程序上的一个副本,从而消耗1个工作程序上的N个副本的N个批次。换句话说,input_fn返回的数据集应提供PER_REPLICA_BATCH_SIZE大小的批次。步骤的全局批处理大小可以通过PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync获得。
在进行worker training时,您应该将数据拆分给各个worker ,或者在每个worker 上随机散布种子。您可以在使用Estimator进行的multi training 中查看如何执行此操作的示例。
同样,您也可以使用多worker 和参数服务器策略。代码保持不变,但是需要使用tf.estimator.train_and_evaluate,并为集群中运行的每个二进制文件设置TF_CONFIG环境变量。
现在支持什么?
除TPUStrategy外,使用所有策略进行Estimator训练的支持有限。基本的训练和评估应该可以,但是v1.train.Scaffold等许多高级功能不起作用。此集成中可能还存在许多错误,并且没有计划积极改善这种支持(重点是Keras和自定义训练循环支持)。如果有可能,您应该更喜欢将tf.distribute与这些API结合使用。
image.png以下是一些端到端示例,这些示例展示了如何在Estimator中使用各种策略:
- Multi-worker Training with Estimator tutorial教程显示了如何使用MNIST数据集上的MultiWorkerMirroredStrategy与多worker进行训练。
- 使用Kubernetes模板在tensorflow /生态系统中使用分配策略进行多worker的端到端示例。它以Keras模型开始,并使用tf.keras.estimator.model_to_estimator API将其转换为Estimator。
- 官方的ResNet50模型,可以使用MirroredStrategy或MultiWorkerMirroredStrategy进行训练。