使用注意力机制的seq2seq(Bahdanau 注意力)
2022-10-10 本文已影响0人
小黄不头秃
(一)使用注意力机制的seq2seq
机器翻译中,每一个生成的词可能相关于原句子中不同的词。因为两种语言的单词并不都是一一对应的,有可能一个词对应的是两个词,并且语序可能也不一样。我们想要的是神经网络能够有意识的去根据重点去预测,然而seq2seq是没法做到的,因为他往解码器中传入的是最后一个隐藏层的隐状态。然后再进行预测。
(1)加入注意力
之前我们在seq2seq的模型中解码器的输入是,最后一层的隐状态和embedding concat起来作为解码器的输入。这样其实是不够好的。我们是希望对应着翻译。
- 编码器对每次词的输出作为 key 和 value( key 和 value 是一样的)例如:某一小段词的长度是3,那么就会生成三个键值对;
- 解码器RNN对上一个词的输出是 query ;
- 注意力的输出和下一个词的词 concat 进入下一次预测;
(2)总结
- seq2seq中通过隐状态在编码器和解码器中传递消息。
- 注意力机制可以根据解码器RNN的输出来匹配到合适的编码器RNN的输出来更有效的传递信息。
(二)代码实现
import torch
from torch import nn
from d2l import torch as d2l
# 带有注意力机制的解码器基本接口
class AttentionDecoder(d2l.Decoder):
"""带有注意力机制解码器的基本接口"""
def __init__(self, **kwargs):
super(AttentionDecoder, self).__init__(**kwargs)
# @property是python的一种装饰器,是用来修饰方法的。创建只读属性
# 调用的时候不需要加(),加括号反而会报错
# 因为python没有私有属性,所以可以通过@property来设置,可以隐藏属性名,让用户使用的时候无法随意修改
@property
def attention_weights(self):
raise NotImplementedError
接下来,让我们在接下来的Seq2SeqAttentionDecoder类中
实现带有Bahdanau注意力的循环神经网络解码器。
首先,我们初始化解码器的状态,需要下面的输入:
- 编码器在所有时间步的最终层隐状态,将作为注意力的键和值;
- 上一时间步的编码器全层隐状态,将作为初始化解码器的隐状态;
- 编码器有效长度(排除在注意力池中填充词元)。
在每个解码时间步骤中,解码器上一个时间步的最终层隐状态将用作查询。
因此,注意力输出和输入嵌入都连结为循环神经网络解码器的输入。
class Seq2SeqAttentionDecoder(AttentionDecoder):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs):
super(Seq2SeqAttentionDecoder, self).__init__(**kwargs)
# 在解码器中放入attention(key_size, query_size, num_hiddens, dropout)
self.attention = d2l.AdditiveAttention(num_hiddens, num_hiddens, num_hiddens, dropout)
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers, dropout=dropout)
self.dense = nn.Linear(num_hiddens, vocab_size)
def init_state(self, enc_outputs, enc_valid_lens, *args):
# outputs的形状为(num_steps,batch_size,num_hiddens).
# 经过permute之后变成了(batch_size,num_steps,num_hiddens)
# hidden_state的形状为(num_layers,batch_size,num_hiddens)
outputs, hidden_state = enc_outputs
return (outputs.permute(1, 0, 2), hidden_state, enc_valid_lens)
def forward(self, X, state):
# enc_outputs的形状为(batch_size,num_steps,num_hiddens).
# hidden_state的形状为(num_layers,batch_size, num_hiddens) torch.Size([2, 4, 16])
enc_outputs, hidden_state, enc_valid_lens = state
# 输出X的形状为(num_steps,batch_size,embed_size)
X = self.embedding(X).permute(1, 0, 2)
outputs, self._attention_weights = [], []
# print(hidden_state.shape) # torch.Size([2, 4, 16])
for x in X:
# query是最后一层隐状态的输出
# query的形状为(batch_size,1,num_hiddens) torch.Size([4, 1, 16])
# hidden_state的形状(num_layers,batch_size,num_hiddens)
# print(hidden_state[-1].shape) # torch.Size([4, 16])
query = torch.unsqueeze(hidden_state[-1], dim=1) # torch.Size([4, 1, 16])
# context的形状为(batch_size,1,num_hiddens)
# enc_outputs的形状为(batch_size,num_steps,num_hiddens).torch.Size([4, 7, 16])
context = self.attention(query, enc_outputs, enc_outputs, enc_valid_lens) # (queries, keys, values, valid_lens)
# socres_shape = torch.Size([4, 1, 7])
# socres * value = torch.Size([4, 1, 16])
# 在特征维度上连结
x = torch.cat((context, torch.unsqueeze(x, dim=1)), dim=-1)
# 将x变形为(1,batch_size,embed_size+num_hiddens)
out, hidden_state = self.rnn(x.permute(1, 0, 2), hidden_state)
outputs.append(out)
self._attention_weights.append(self.attention.attention_weights)
# 全连接层变换后,outputs的形状为(num_steps,batch_size,vocab_size)
outputs = self.dense(torch.cat(outputs, dim=0))
return outputs.permute(1, 0, 2), [enc_outputs, hidden_state, enc_valid_lens]
@property
def attention_weights(self):
return self._attention_weights
encoder = d2l.Seq2SeqEncoder(vocab_size=10, embed_size=8, num_hiddens=16,
num_layers=2)
encoder.eval()
decoder = Seq2SeqAttentionDecoder(vocab_size=10, embed_size=8, num_hiddens=16,
num_layers=2)
decoder.eval()
X = torch.zeros((4, 7), dtype=torch.long) # (batch_size,num_steps)
state = decoder.init_state(encoder(X), None)
output, state = decoder(X, state)
output.shape, len(state), state[0].shape, len(state[1]), state[1][0].shape
embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 250, d2l.try_gpu()
train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
encoder = d2l.Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers, dropout)
decoder = Seq2SeqAttentionDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers, dropout)
net = d2l.EncoderDecoder(encoder, decoder)
d2l.train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
translation, dec_attention_weight_seq = d2l.predict_seq2seq(
net, eng, src_vocab, tgt_vocab, num_steps, device, True)
print(f'{eng} => {translation}, ',
f'bleu {d2l.bleu(translation, fra, k=2):.3f}')
attention_weights = torch.cat([step[0][0][0] for step in dec_attention_weight_seq], 0).reshape((
1, 1, -1, num_steps))
# 加上一个包含序列结束词元
d2l.show_heatmaps(
attention_weights[:, :, :, :len(engs[-1].split()) + 1].cpu(),
xlabel='Key positions', ylabel='Query positions')