PyTrch深度学习简明实战24 - LSTM

2023-05-01  本文已影响0人  薛东弗斯
image.png
image.png
image.png
image.png

每个绿色的框为一个LSTM Cell,LSTM Cell有2个状态输入/v2个状态输出,一个是hidden输出,一个是单元状态
LSTM相邻单元格直接有2个状态连接。


image.png
image.png
import torch
import torchtext
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torchtext.vocab import GloVe

TEXT = torchtext.data.Field(lower=True, fix_length=200, batch_first=False)
LABEL = torchtext.data.Field(sequential=False)

# make splits for data
train, test = torchtext.datasets.IMDB.splits(TEXT, LABEL)

# 构建词表 vocab
#TEXT.build_vocab(train, vectors=GloVe(name='6B', dim=300), 
#                 max_size=20000, min_freq=10)
TEXT.build_vocab(train, max_size=10000, min_freq=10, vectors=None)
LABEL.build_vocab(train)

#[(k, v) for k, v in TEXT.vocab.stoi.items() if v>9999]
TEXT.vocab.vectors
BATCHSIZE = 512
# make iterator for splits
train_iter, test_iter = torchtext.data.BucketIterator.splits(
                                      (train, test), batch_size=BATCHSIZE)
                                      
b = next(iter(train_iter))

hidden_size = 300
embeding_dim = 100

class RNN_Encoder(nn.Module):
    def __init__(self, input_dim, hidden_size):
        super(RNN_Encoder, self).__init__()
        self.rnn = nn.LSTMCell(input_dim, hidden_size)
    def forward(self, inputs):
        bz = inputs.shape[1]
        ht = torch.zeros((bz, hidden_size)).cuda()
        ct = torch.zeros((bz, hidden_size)).cuda()
        for word in inputs:
            ht, ct = self.rnn(word, (ht, ct))
        return ht, ct
        
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.em = nn.Embedding(20002, embeding_dim)   # 200*batch*100
        self.rnn = RNN_Encoder(embeding_dim, hidden_size)     # batch*300
        self.fc1 = nn.Linear(hidden_size, 256)
        self.fc2 = nn.Linear(256, 3)

    def forward(self, x):
        x = self.em(x)
        _, x = self.rnn(x)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x
        
model = Net()

if torch.cuda.is_available():
    model.to('cuda')
    
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

def fit(epoch, model, trainloader, testloader):
    correct = 0
    total = 0
    running_loss = 0
    
    model.train()
    for b in trainloader:
        x, y = b.text, b.label
        if torch.cuda.is_available():
            x, y = b.text.to('cuda'), b.label.to('cuda')
        y_pred = model(x)
        loss = loss_fn(y_pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        with torch.no_grad():
            y_pred = torch.argmax(y_pred, dim=1)
            correct += (y_pred == y).sum().item()
            total += y.size(0)
            running_loss += loss.item()
#    exp_lr_scheduler.step()
    epoch_loss = running_loss / len(trainloader.dataset)
    epoch_acc = correct / total
        
        
    test_correct = 0
    test_total = 0
    test_running_loss = 0 
    
    model.eval()
    with torch.no_grad():
        for b in testloader:
            x, y = b.text, b.label
            if torch.cuda.is_available():
                x, y = x.to('cuda'), y.to('cuda')
            y_pred = model(x)
            loss = loss_fn(y_pred, y)
            y_pred = torch.argmax(y_pred, dim=1)
            test_correct += (y_pred == y).sum().item()
            test_total += y.size(0)
            test_running_loss += loss.item()
    
    epoch_test_loss = test_running_loss / len(testloader.dataset)
    epoch_test_acc = test_correct / test_total
    
        
    print('epoch: ', epoch, 
          'loss: ', round(epoch_loss, 3),
          'accuracy:', round(epoch_acc, 3),
          'test_loss: ', round(epoch_test_loss, 3),
          'test_accuracy:', round(epoch_test_acc, 3)
             )
        
    return epoch_loss, epoch_acc, epoch_test_loss, epoch_test_acc
    
epochs = 30

train_loss = []
train_acc = []
test_loss = []
test_acc = []

for epoch in range(epochs):
    epoch_loss, epoch_acc, epoch_test_loss, epoch_test_acc = fit(epoch,
                                                                 model,
                                                                 train_iter,
                                                                 test_iter)
    train_loss.append(epoch_loss)
    train_acc.append(epoch_acc)
    test_loss.append(epoch_test_loss)
    test_acc.append(epoch_test_acc)
import torch
import torchtext
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch.utils import data
from torch.utils.data import DataLoader
from torchtext.data.utils import get_tokenizer

train_iter, test_iter = torchtext.datasets.IMDB()

tokenizer = get_tokenizer('basic_english')             # 初始化分词工具
# print(tokenizer('This is a book about PyTorch.'))      # 在英文语句上调用并打印分词结果
# ['this', 'is', 'a', 'book', 'about', 'pytorch', '.']

train_iter, test_iter = torchtext.datasets.IMDB()
train_data, test_data = list(train_iter), list(test_iter)

all_classes = set([label for (label, text) in train_data])
num_class = len(all_classes)

from torchtext.data.utils import get_tokenizer            # 分词工具
from torchtext.vocab import build_vocab_from_iterator     # 创建词表工具

tokenizer = get_tokenizer('basic_english')      # 分词工具做初始化

def yield_tokens(data):
    for _, text in data:
        yield tokenizer(text)

vocab = build_vocab_from_iterator(yield_tokens(train_data), specials=["<pad>", "<unk>"], min_freq=5)

vocab.set_default_index(vocab["<unk>"])

text_pipeline = lambda x: vocab(tokenizer(x))

label_pipeline = lambda x: int(x == 'pos')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def collate_batch(batch):
    label_list, text_list = [], []
    for (_label, _text) in batch:
        label_list.append(label_pipeline(_label))
        precess_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
        text_list.append(precess_text)
    label_list = torch.tensor(label_list)
    text_list = torch.nn.utils.rnn.pad_sequence(text_list)
    return label_list.to(device), text_list.to(device)
    
train_dataloader = DataLoader(train_data, batch_size=16, shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_data, batch_size=16, shuffle=False, collate_fn=collate_batch)

vocab_size = len(vocab)

embeding_dim = 100
hidden_size = 128

class RNN_Net(nn.Module):
    def __init__(self, vocab_size, embeding_dim, hidden_size):
        super(RNN_Net, self).__init__()
        self.em = nn.Embedding(vocab_size, embeding_dim)   
        self.rnn = nn.LSTM(embeding_dim, hidden_size)
        self.fc1 = nn.Linear(hidden_size, 64)
        self.fc2 = nn.Linear(64, 2)

    def forward(self, inputs):
        x = self.em(inputs)
        x, _ = self.rnn(x)
#        print(x.size(), o.size())
        x = F.relu(self.fc1(x[-1]))
        x = self.fc2(x)
        return x
        
model = RNN_Net(vocab_size, embeding_dim, hidden_size).to(device)

loss_fn = nn.CrossEntropyLoss()
from torch.optim import lr_scheduler
optimizer = torch.optim.Adam(model.parameters(), betas=(0.5, 0.5), lr=0.01)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=15, gamma=0.1)

def train(dataloader):
    total_acc, total_count, total_loss, = 0, 0, 0
    model.train()
    for label, text in dataloader:
        predicted_label = model(text)
        loss = loss_fn(predicted_label, label)
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        with torch.no_grad():
            total_acc += (predicted_label.argmax(1) == label).sum().item()
            total_count += label.size(0)
            total_loss += loss.item()*label.size(0)
    return total_loss/total_count, total_acc/total_count
    
def test(dataloader):
    model.eval()
    total_acc, total_count, total_loss, = 0, 0, 0

    with torch.no_grad():
        for label, text in dataloader:
            predicted_label = model(text)
            loss = loss_fn(predicted_label, label)
            total_acc += (predicted_label.argmax(1) == label).sum().item()
            total_count += label.size(0)
            total_loss += loss.item()*label.size(0)
    return total_loss/total_count, total_acc/total_count
    
def fit(epochs, train_dl, test_dl):
    train_loss = []
    train_acc = []
    test_loss = []
    test_acc = []

    for epoch in range(epochs):
        epoch_loss, epoch_acc = train(train_dl)
        epoch_test_loss, epoch_test_acc = test(test_dl)
        train_loss.append(epoch_loss)
        train_acc.append(epoch_acc)
        test_loss.append(epoch_test_loss)
        test_acc.append(epoch_test_acc)
        exp_lr_scheduler.step()
        template = ("epoch:{:2d}, train_loss: {:.5f}, train_acc: {:.1f}% ," 
                    "test_loss: {:.5f}, test_acc: {:.1f}%")
        print(template.format(
              epoch, epoch_loss, epoch_acc*100, epoch_test_loss, epoch_test_acc*100))
    print("Done!")
    
    return train_loss, test_loss, train_acc, test_acc
    
EPOCHS = 20

# 同学们自己训练吧,没有GPU太慢了
train_loss, test_loss, train_acc, test_acc = fit(EPOCHS, train_dataloader, test_dataloader)

上一篇下一篇

猜你喜欢

热点阅读