TensorFlow2 不同层次的建模方法

2020-09-10  本文已影响0人  水之心

准备数据:

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import tensorflow as tf
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

# 正负样本数量
n_positive, n_negative = 2000, 2000

#生成正样本, 小圆环分布
r_p = 5.0 + tf.random.truncated_normal([n_positive, 1], 0.0, 1.0)
theta_p = tf.random.uniform([n_positive, 1], 0.0, 2*np.pi)
Xp = tf.concat([r_p*tf.cos(theta_p), r_p*tf.sin(theta_p)], axis=1)
Yp = tf.ones_like(r_p)

#生成负样本, 大圆环分布
r_n = 8.0 + tf.random.truncated_normal([n_negative, 1], 0.0, 1.0)
theta_n = tf.random.uniform([n_negative, 1], 0.0, 2*np.pi)
Xn = tf.concat([r_n*tf.cos(theta_n), r_n*tf.sin(theta_n)], axis=1)
Yn = tf.zeros_like(r_n)

# 汇总样本
X = tf.concat([Xp, Xn], axis=0)
Y = tf.concat([Yp, Yn], axis=0)


# 可视化
plt.figure(figsize=(6, 6))
plt.scatter(Xp[:, 0].numpy(), Xp[:, 1].numpy(), c="r")
plt.scatter(Xn[:, 0].numpy(), Xn[:, 1].numpy(), c="g")
plt.legend(["positive", "negative"])
plt.show()

效果:

1 低阶 API

构建数据管道迭代器:

def data_iter(features, labels, batch_size=8):
    num_examples = len(features)
    indices = list(range(num_examples))
    np.random.shuffle(indices)  #样本的读取顺序是随机的
    for i in range(0, num_examples, batch_size):
        indexs = indices[i: min(i + batch_size, num_examples)]
        yield tf.gather(X,indexs), tf.gather(Y,indexs)

# 测试数据管道效果   
batch_size = 10
(features,labels) = next(data_iter(X,Y,batch_size))
print(features)
print(labels)

输出:

tf.Tensor(
[[ 7.63374    -2.4995987 ]
 [ 3.5261695   2.8121672 ]
 [ 6.926017    0.43988833]
 [ 9.417864   -0.8176333 ]
 [ 8.488339   -3.2209659 ]
 [ 3.745992   -0.43044332]
 [ 4.44868     6.7732778 ]
 [ 2.578557    2.2084076 ]
 [-1.4551258  -4.423701  ]
 [ 8.7067995  -1.698387  ]], shape=(10, 2), dtype=float32)
tf.Tensor(
[[0.]
 [1.]
 [0.]
 [0.]
 [0.]
 [1.]
 [0.]
 [1.]
 [1.]
 [0.]], shape=(10, 1), dtype=float32)

此处范例我们利用 tf.Module 来组织模型变量:

class DNNModel(tf.Module):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.w1 = tf.Variable(
            tf.random.truncated_normal([2, 4]), dtype=tf.float32)
        self.b1 = tf.Variable(tf.zeros([1, 4]), dtype=tf.float32)
        self.w2 = tf.Variable(
            tf.random.truncated_normal([4, 8]), dtype=tf.float32)
        self.b2 = tf.Variable(tf.zeros([1, 8]), dtype=tf.float32)
        self.w3 = tf.Variable(
            tf.random.truncated_normal([8, 1]), dtype=tf.float32)
        self.b3 = tf.Variable(tf.zeros([1, 1]), dtype=tf.float32)

    # 正向传播
    @tf.function(input_signature=[tf.TensorSpec(shape=[None, 2], dtype=tf.float32)])
    def __call__(self, x):
        x = tf.nn.relu(x@self.w1 + self.b1)
        x = tf.nn.relu(x@self.w2 + self.b2)
        y = tf.nn.sigmoid(x@self.w3 + self.b3)
        return y

    # 损失函数(二元交叉熵)
    @tf.function(input_signature=[tf.TensorSpec(shape=[None, 1], dtype=tf.float32),
                                  tf.TensorSpec(shape=[None, 1], dtype=tf.float32)])
    def loss_func(self, y_true, y_pred):
        # 将预测值限制在 1e-7 以上, 1 - 1e-7 以下,避免log(0)错误
        eps = 1e-7
        y_pred = tf.clip_by_value(y_pred, eps, 1.0-eps)
        bce = - y_true*tf.math.log(y_pred) - (1-y_true)*tf.math.log(1-y_pred)
        return tf.reduce_mean(bce)

    # 评估指标(准确率)
    @tf.function(input_signature=[tf.TensorSpec(shape=[None, 1], dtype=tf.float32),
                                  tf.TensorSpec(shape=[None, 1], dtype=tf.float32)])
    def metric_func(self, y_true, y_pred):
        y_pred = tf.where(y_pred > 0.5, tf.ones_like(y_pred, dtype=tf.float32),
                          tf.zeros_like(y_pred, dtype=tf.float32))
        acc = tf.reduce_mean(1-tf.abs(y_true-y_pred))
        return acc


model = DNNModel()

测试模型结构:

batch_size = 10
features, labels = next(data_iter(X, Y, batch_size))

predictions = model(features)

loss = model.loss_func(labels, predictions)
metric = model.metric_func(labels, predictions)

tf.print("init loss:", loss)
tf.print("init metric", metric)
print(len(model.trainable_variables))

输出:

init loss: 0.889420033
init metric 0.6
6

使用 autograph 机制转换成静态图加速:

# 打印时间分割线
@tf.function
def printbar():
    today_ts = tf.timestamp() % (24*60*60)

    hour = tf.cast(today_ts//3600+8, tf.int32) % tf.constant(24)
    minite = tf.cast((today_ts % 3600)//60, tf.int32)
    second = tf.cast(tf.floor(today_ts % 60), tf.int32)

    def timeformat(m):
        if tf.strings.length(tf.strings.format("{}", m)) == 1:
            return(tf.strings.format("0{}", m))
        else:
            return(tf.strings.format("{}", m))

    timestring = tf.strings.join([timeformat(hour), timeformat(minite),
                                  timeformat(second)], separator=":")
    tf.print("=========="*8+timestring)

@tf.function
def train_step(model, features, labels):

    # 正向传播求损失
    with tf.GradientTape() as tape:
        predictions = model(features)
        loss = model.loss_func(labels, predictions)

    # 反向传播求梯度
    grads = tape.gradient(loss, model.trainable_variables)

    # 执行梯度下降
    for p, dloss_dp in zip(model.trainable_variables, grads):
        p.assign(p - 0.001*dloss_dp)

    # 计算评估指标
    metric = model.metric_func(labels, predictions)

    return loss, metric


def train_model(model, epochs):
    for epoch in tf.range(1, epochs+1):
        for features, labels in data_iter(X, Y, 100):
            loss, metric = train_step(model, features, labels)
        if epoch % 100 == 0:
            printbar()
            tf.print("epoch =", epoch, "loss = ", loss, "accuracy = ", metric)


train_model(model, epochs=600)

输出:

================================================================================16:50:26
epoch = 100 loss =  0.556310713 accuracy =  0.7
================================================================================16:50:28
epoch = 200 loss =  0.405847311 accuracy =  0.86
================================================================================16:50:30
epoch = 300 loss =  0.467671931 accuracy =  0.75
================================================================================16:50:32
epoch = 400 loss =  0.426428646 accuracy =  0.85
================================================================================16:50:34
epoch = 500 loss =  0.360130191 accuracy =  0.84
================================================================================16:50:36
epoch = 600 loss =  0.361137211 accuracy =  0.84

结果可视化:

fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(12, 5))
ax1.scatter(Xp[:, 0], Xp[:, 1], c="r")
ax1.scatter(Xn[:, 0], Xn[:, 1], c="g")
ax1.legend(["positive", "negative"])
ax1.set_title("y_true")

Xp_pred = tf.boolean_mask(X, tf.squeeze(model(X) >= 0.5), axis=0)
Xn_pred = tf.boolean_mask(X, tf.squeeze(model(X) < 0.5), axis=0)

ax2.scatter(Xp_pred[:, 0], Xp_pred[:, 1], c="r")
ax2.scatter(Xn_pred[:, 0], Xn_pred[:, 1], c="g")
ax2.legend(["positive", "negative"])
ax2.set_title("y_pred")

效果:

2 中级 API

构建模型:

from tensorflow.keras import layers, losses, metrics, optimizers

class DNNModel(tf.Module):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.dense1 = layers.Dense(4, activation="relu")
        self.dense2 = layers.Dense(8, activation="relu")
        self.dense3 = layers.Dense(1, activation="sigmoid")

    # 正向传播
    @tf.function(input_signature=[tf.TensorSpec(shape=[None, 2], dtype=tf.float32)])
    def __call__(self, x):
        x = self.dense1(x)
        x = self.dense2(x)
        y = self.dense3(x)
        return y


model = DNNModel()
model.loss_func = losses.binary_crossentropy
model.metric_func = metrics.binary_accuracy
model.optimizer = optimizers.Adam(learning_rate=0.001)

测试模型结构:

# 构建输入数据管道
ds = tf.data.Dataset.from_tensor_slices((X, Y)) \
    .shuffle(buffer_size=4000).batch(100) \
    .prefetch(tf.data.experimental.AUTOTUNE)
features, labels = next(ds.as_numpy_iterator())

predictions = model(features)

loss = model.loss_func(tf.reshape(labels, [-1]), tf.reshape(predictions, [-1]))
metric = model.metric_func(tf.reshape(
    labels, [-1]), tf.reshape(predictions, [-1]))

tf.print("init loss:", loss)
tf.print("init metric", metric)

输出:

init loss: 0.930841148
init metric 0.51

训练模型:

@tf.function
def train_step(model, features, labels):
    with tf.GradientTape() as tape:
        predictions = model(features)
        loss = model.loss_func(tf.reshape(
            labels, [-1]), tf.reshape(predictions, [-1]))
    grads = tape.gradient(loss, model.trainable_variables)
    model.optimizer.apply_gradients(zip(grads, model.trainable_variables))
    metric = model.metric_func(tf.reshape(
        labels, [-1]), tf.reshape(predictions, [-1]))
    return loss, metric


def train_model(model, epochs):
    for epoch in tf.range(1, epochs+1):
        loss, metric = tf.constant(0.0), tf.constant(0.0)
        for features, labels in ds:
            loss, metric = train_step(model, features, labels)
        if epoch % 10 == 0:
            printbar()
            tf.print("epoch =", epoch, "loss = ", loss, "accuracy = ", metric)


train_model(model, epochs=60)

输出:

================================================================================17:01:42
epoch = 10 loss =  0.0938826576 accuracy =  0.96
================================================================================17:01:42
epoch = 20 loss =  0.0858769417 accuracy =  0.96
================================================================================17:01:42
epoch = 30 loss =  0.126385167 accuracy =  0.94
================================================================================17:01:43
epoch = 40 loss =  0.0790566728 accuracy =  0.96
================================================================================17:01:43
epoch = 50 loss =  0.0721534416 accuracy =  0.97
================================================================================17:01:43
epoch = 60 loss =  0.105751008 accuracy =  0.96

结果可视化:

fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5))
ax1.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
ax1.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
ax1.legend(["positive","negative"]);
ax1.set_title("y_true");

Xp_pred = tf.boolean_mask(X,tf.squeeze(model(X)>=0.5),axis = 0)
Xn_pred = tf.boolean_mask(X,tf.squeeze(model(X)<0.5),axis = 0)

ax2.scatter(Xp_pred[:,0].numpy(),Xp_pred[:,1].numpy(),c = "r")
ax2.scatter(Xn_pred[:,0].numpy(),Xn_pred[:,1].numpy(),c = "g")
ax2.legend(["positive","negative"]);
ax2.set_title("y_pred");

效果:

3 高级 API

数据构建:

n = len(X)
ds_train = tf.data.Dataset.from_tensor_slices((X[0:n*3//4,:],Y[0:n*3//4,:])) \
     .shuffle(buffer_size = 1000).batch(20) \
     .prefetch(tf.data.experimental.AUTOTUNE) \
     .cache()

ds_valid = tf.data.Dataset.from_tensor_slices((X[n*3//4:,:],Y[n*3//4:,:])) \
     .batch(20) \
     .prefetch(tf.data.experimental.AUTOTUNE) \
     .cache()

定义模型:

from tensorflow.keras import layers, losses, metrics, optimizers, models

tf.keras.backend.clear_session()
class DNNModel(models.Model):
    def __init__(self):
        super().__init__()

    def build(self,input_shape):
        self.dense1 = layers.Dense(4,activation = "relu",name = "dense1") 
        self.dense2 = layers.Dense(8,activation = "relu",name = "dense2")
        self.dense3 = layers.Dense(1,activation = "sigmoid",name = "dense3")
        super(DNNModel,self).build(input_shape)

    # 正向传播
    @tf.function(input_signature=[tf.TensorSpec(shape = [None,2], dtype = tf.float32)])  
    def call(self,x):
        x = self.dense1(x)
        x = self.dense2(x)
        y = self.dense3(x)
        return y

model = DNNModel()
model.build(input_shape =(None,2))

model.summary()

输出:

Model: "dnn_model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
dense1 (Dense)               multiple                  12        
_________________________________________________________________
dense2 (Dense)               multiple                  40        
_________________________________________________________________
dense3 (Dense)               multiple                  9         
=================================================================
Total params: 61
Trainable params: 61
Non-trainable params: 0

训练模型:

### 自定义训练循环

optimizer = optimizers.Adam(learning_rate=0.01)
loss_func = tf.keras.losses.BinaryCrossentropy()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_metric = tf.keras.metrics.BinaryAccuracy(name='train_accuracy')

valid_loss = tf.keras.metrics.Mean(name='valid_loss')
valid_metric = tf.keras.metrics.BinaryAccuracy(name='valid_accuracy')


@tf.function
def train_step(model, features, labels):
    with tf.GradientTape() as tape:
        predictions = model(features)
        loss = loss_func(labels, predictions)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    train_loss.update_state(loss)
    train_metric.update_state(labels, predictions)

@tf.function
def valid_step(model, features, labels):
    predictions = model(features)
    batch_loss = loss_func(labels, predictions)
    valid_loss.update_state(batch_loss)
    valid_metric.update_state(labels, predictions)


def train_model(model,ds_train,ds_valid,epochs):
    for epoch in tf.range(1,epochs+1):
        for features, labels in ds_train:
            train_step(model,features,labels)

        for features, labels in ds_valid:
            valid_step(model,features,labels)

        logs = 'Epoch={},Loss:{},Accuracy:{},Valid Loss:{},Valid Accuracy:{}'

        if  epoch%100 ==0:
            printbar()
            tf.print(tf.strings.format(logs,
            (epoch,train_loss.result(),train_metric.result(),valid_loss.result(),valid_metric.result())))

        train_loss.reset_states()
        valid_loss.reset_states()
        train_metric.reset_states()
        valid_metric.reset_states()

train_model(model,ds_train,ds_valid,1000)

输出:

================================================================================08:04:33
Epoch=100,Loss:0.100245498,Accuracy:0.956,Valid Loss:0.0823013112,Valid Accuracy:0.96
================================================================================08:04:42
Epoch=200,Loss:0.0889734551,Accuracy:0.959666669,Valid Loss:0.0959472954,Valid Accuracy:0.956
================================================================================08:04:50
Epoch=300,Loss:0.0859184787,Accuracy:0.959666669,Valid Loss:0.100719072,Valid Accuracy:0.952
================================================================================08:04:59
Epoch=400,Loss:0.0843160897,Accuracy:0.960666656,Valid Loss:0.104706556,Valid Accuracy:0.952
================================================================================08:05:08
Epoch=500,Loss:0.0835563391,Accuracy:0.961,Valid Loss:0.107334398,Valid Accuracy:0.952
================================================================================08:05:17
Epoch=600,Loss:0.0829177722,Accuracy:0.962,Valid Loss:0.109648846,Valid Accuracy:0.952
================================================================================08:05:27
Epoch=700,Loss:0.0824796259,Accuracy:0.961666644,Valid Loss:0.111882597,Valid Accuracy:0.951
================================================================================08:05:36
Epoch=800,Loss:0.0822021216,Accuracy:0.961666644,Valid Loss:0.113583572,Valid Accuracy:0.951
================================================================================08:05:45
Epoch=900,Loss:0.082076259,Accuracy:0.961666644,Valid Loss:0.115141563,Valid Accuracy:0.95
================================================================================08:05:54
Epoch=1000,Loss:0.0819547623,Accuracy:0.962,Valid Loss:0.116600387,Valid Accuracy:0.949

可视化:

fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5))
ax1.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
ax1.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
ax1.legend(["positive","negative"]);
ax1.set_title("y_true");

Xp_pred = tf.boolean_mask(X,tf.squeeze(model(X)>=0.5),axis = 0)
Xn_pred = tf.boolean_mask(X,tf.squeeze(model(X)<0.5),axis = 0)

ax2.scatter(Xp_pred[:,0].numpy(),Xp_pred[:,1].numpy(),c = "r")
ax2.scatter(Xn_pred[:,0].numpy(),Xn_pred[:,1].numpy(),c = "g")
ax2.legend(["positive","negative"]);
ax2.set_title("y_pred");

显示:

上一篇下一篇

猜你喜欢

热点阅读