语言模型

2022-09-30  本文已影响0人  小黄不头秃

(一)语言模型

给定一个文本序列x_1, x_2, ..., x_T,语言模型的目标就是估计联合概率密度p(x_1, ..., x_T)

他的应用包括:

(1)使用计数来建模

假设有一个长度为2的序列,我们来求他的联合概率密度。
p(x,x') = p(x)p(x'|x)= \frac{n(x)}{n} * \frac{n(x,x')}{n(x)} = \frac{n(x,x')}{n}
(这里n是总词数,n(x)、n(x,x'),是单个单词和连续单词对出现的次数。)

同理也可以拓展到序列为3的情况:
p(x, x', x'') = \frac{n(x,x',x'')}{n}

缺点:当序列非常长的时候,可能这个长序列出现的次数不足为1.

可以使用马尔可夫假设来缓解这个问题。


(2)马尔科夫假设(N元语法)

假设和预测值前面的n个元素相关,可以处理较长序列。

(二)代码实现

N元语法的实现和常用数据集。

import random 
import torch 
import re
from d2l import torch as d2l
def read_time_machine():
    """将时间机器科幻小说加载到文本行列表中"""
    with open("../data/timemachine.txt",'r') as f:
        lines = f.readlines()
    return [re.sub('[^A-Za-z]+',' ', line).strip().lower() for line in lines] # strip去掉左右两边的特殊字符串

tokens = d2l.tokenize(read_time_machine()) # 二维数组,一行一行读取内容
corpus = [token for line in tokens for token in line] # 一维数组,小说中的每一个单词
vocab = d2l.Vocab(corpus) # 统计每个词出现的次数,及映射关系
print(vocab.token_freqs[:10])
# 可视化词频
freqs = [freq for token, freq in vocab.token_freqs]
d2l.plot(freqs, xlabel='token: x', ylabel='frequency: n(x)',
         xscale='log', yscale='log')
# 其他的词元组合,比如二元组合,三元语法等
# 二元
bigram_tokens = [pair for pair in zip(corpus[:-1],corpus[1:])] # 优雅
bigram_vocab = d2l.Vocab(bigram_tokens)
print(bigram_vocab.token_freqs[:10])

# 三元
trigram_tokens = [triple for triple in zip(corpus[:-2], corpus[1:-1], corpus[2:])]
trigram_vocab = d2l.Vocab(trigram_tokens)
print(trigram_vocab.token_freqs[:10])

bigram_freqs = [freq for token, freq in bigram_vocab.token_freqs]
trigram_freqs = [freq for token, freq in trigram_vocab.token_freqs]
d2l.plot([freqs, bigram_freqs, trigram_freqs], xlabel='token: x',
         ylabel='frequency: n(x)', xscale='log', yscale='log',
         legend=['unigram', 'bigram', 'trigram'])
# 随机的生成一个小批量数据的特征和标签以供读取。在随机采样中,每个样本都是在原始的长序列上任意捕获的子序列
def seq_data_iter_random(corpus, batch_size, num_steps):
    """随机生成一个小批量的子序列"""
    corpus = corpus[random.randint(0, num_steps-1):] # 随机选取起始点
    # 计算可以切分为多少个子序列。
    num_subseqs = (len(corpus)-1) // num_steps # 要给最后一个标签预留一个预测的位置,所以需要-1
    # 获取每个子序列开始的下标
    initial_indices = list(range(0, num_subseqs*num_steps, num_steps))
    random.shuffle(initial_indices)

    def data(pos):
        return corpus[pos:pos + num_steps]

    # 可以划分多少个batch
    num_batches = num_subseqs // batch_size
    for i in range(0, batch_size*num_batches, batch_size):
        # 在这里,initial_indices包含子序列的随机起始索引
        initial_indices_per_batch = initial_indices[i: i + batch_size]
        X = [data(j) for j in initial_indices_per_batch] # [-1, num_steps]
        Y = [data(j + 1) for j in initial_indices_per_batch] # [-1, num_steps]
        yield torch.tensor(X), torch.tensor(Y)

my_seq = list(range(35))
for X, Y in seq_data_iter_random(my_seq, batch_size=2, num_steps=5):
    print('X: ', X, '\nY:', Y)
# 另一种方法是保证小批量里面的子序列在原始数据上也是相邻的
def seq_data_iter_sequential(corpus, batch_size, num_steps):  #@save
    """使用顺序分区生成一个小批量子序列"""
    # 从随机偏移量开始划分序列
    offset = random.randint(0, num_steps)
    num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
    Xs = torch.tensor(corpus[offset: offset + num_tokens])
    Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
    Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
    num_batches = Xs.shape[1] // num_steps
    for i in range(0, num_steps * num_batches, num_steps):
        X = Xs[:, i: i + num_steps]
        Y = Ys[:, i: i + num_steps]
        yield X, Y
for X, Y in seq_data_iter_sequential(my_seq, batch_size=2, num_steps=5):
    print('X: ', X, '\nY:', Y)
# 将上面两种采样方式封装成一个类
class SeqDataLoader:  #@save
    """加载序列数据的迭代器"""
    def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
        if use_random_iter:
            self.data_iter_fn = d2l.seq_data_iter_random
        else:
            self.data_iter_fn = d2l.seq_data_iter_sequential
        self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens)
        self.batch_size, self.num_steps = batch_size, num_steps

    def __iter__(self):
        return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
def load_data_time_machine(batch_size, num_steps,use_random_iter=False, max_tokens=10000):
    """返回时光机器数据集的迭代器和词表"""
    data_iter = SeqDataLoader(batch_size, num_steps, use_random_iter, max_tokens)
    return data_iter, data_iter.vocab
上一篇 下一篇

猜你喜欢

热点阅读