自然语言处理之语言模型(language model)
2019-05-11 本文已影响0人
Mr_Relu
编程环境:
anaconda + python3.7
完整代码及数据已经更新至GitHub,欢迎fork~GitHub链接
声明:创作不易,未经授权不得复制转载
statement:No reprinting without authorization
1:用 python 编程实践语言模型 (uni-gram 和 bi-gram),加入平滑技术。
2:计算 test.txt 中句子的 PPL,对比 uni-gram 和 bi-gram 语言模型效果。
uni-gram code:
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 11 08:10:22 2019
@author: Mr.relu
"""
from textblob import TextBlob
from textblob import Word
from collections import defaultdict
from functools import reduce
import re
import math
from sklearn.externals import joblib
dictionary = set() #文档词典
postings = defaultdict(dict) #存放词频信息的postings
total_num = 0 #总词频数
num_dic = 0 #总词个数
def main():
global postings,total_num,num_dic,dictionary
get_dic()
cal_probability()
num_dic = len(dictionary)
# joblib.dump(postings, 'uni_gram_postings.pkl')
# postings = joblib.load('uni_gram_postings.pkl')
print("Total number of train words:"+str(total_num))
print("number of dictionary:"+str(num_dic))
ppl = test()
print("the test PPL score is:"+str(round(ppl,5)))
def get_dic():
global dictionary,total_num,postings
f = open('train_LM.txt','r',encoding='utf-8',errors='ignore')
lines = f.readlines()
f.close()
for line in lines:
terms = line_token(line)
#print(terms)
d_tnum=len(terms)#预处理后每篇文档的总词数
#print(d_tnum)
unique_terms = set(terms)
dictionary = dictionary.union(unique_terms)#并入总词典
total_num += d_tnum
for term in unique_terms:
c_term=terms.count(term)
if term in postings:
postings[term][0] += c_term
else:
postings[term][0] = c_term
def line_token(document):
document=document.lower()
document = document.replace('__eou__','') #将__eou__删除
#document=re.sub(r'', " ",document)
#document=re.sub(r"\W|\d|_|\s{2,}"," ",document)#\W只保留数字字母下划线。\d匹配任意数字 \s{2,}匹配两个以上的空格
document=re.sub(r"\W|\d|\s{2,}"," ",document)#保留数字和下划线
terms=TextBlob(document).words.singularize()
result=[]
for word in terms:
expected_str = Word(word)
expected_str = expected_str.lemmatize("v")#还原动词原型
result.append(expected_str)
return result
def cal_probability():
global postings,total_num
for term in postings:
postings[term][1] = postings[term][0]/total_num
def get_pw_of_absent(newTerm):
#根据在测试集中新出现的词来更新语料库的词概率信息(加 1 法)
global total_num,num_dic
return 1/(total_num + num_dic)
def test():
global postings
log_PT = 0
f = open('test_LM.txt','r',encoding='utf-8',errors='ignore')
document = f.read()
f.close()
test_wNum = 0
words = line_token(document)
for expected_str in words:
test_wNum += 1
#加 1 法平滑
if expected_str in postings:
log_PT += math.log(postings[expected_str][1],2)
else:
# print("update_posting!!!")
# update_postings_dic(expected_str)
# log_PT += math.log(postings[expected_str][1],2)
print("one not in posting!!!")
temp = get_pw_of_absent(str)
log_PT += math.log(temp,2)
print("log_PT:"+str(log_PT))
print("test_num:"+str(test_wNum))
PPL = pow(2,-log_PT/test_wNum)
return PPL
"""
# document=re.sub(r"\W|\d|\s{2,}"," ",document)#保留字母下划线
# document = document.replace('__eou__','\t') #将__eou__替换为分割符号
# sentences = document.split('\t')
#
# for sen in sentences:
# terms=TextBlob(sen).words.singularize()
#
# for word in terms:
# expected_str = Word(word)
# expected_str = expected_str.lemmatize("v")#还原动词原型
# #加 1 法平滑
# if expected_str in postings:
# log_PT += math.log(postings[expected_str][1])
# else:
# update_postings_dic(expected_str)
# log_PT += math.log(postings[expected_str][1])
# print(sen)
"""
if __name__ == "__main__":
main()
bi-gram code:
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 11 18:52:22 2019
@author: Mr.relu
"""
from textblob import TextBlob
from textblob import Word
from collections import defaultdict
import re
import math
postings = defaultdict(dict) #存放词频信息的postings
pre_postings = defaultdict(dict)
V = 0 #语料词汇量
def main():
global postings,pre_postings,V
get_dic()
V = len(pre_postings)-1
print("V:"+str(V))
# print(pre_postings)
# print(postings)
cal_probability()
# print(postings)
print("get here~~~")
ppl = test()
print("the test PPL score is:"+str(round(ppl,5)))
def get_dic():
global postings,pre_postings
f = open('train_LM.txt','r',encoding='utf-8',errors='ignore')
document = f.read()
f.close()
document=document.lower()
document=re.sub(r"\W|\d|\s{2,}"," ",document)#保留字母下划线
document=re.sub(r"\s{2,}"," ",document)
document = document.replace('__eou__','\t') #将__eou__替换为分割符号
sentences = document.split('\t')
if sentences[-1]=="":
sentences.pop()
print("pop one ...")
# print(sentences)
for sen in sentences:
terms=TextBlob(sen).words.singularize()
result=[]
for word in terms:
expected_str = Word(word)
expected_str = expected_str.lemmatize("v")
result.append(expected_str)
result.insert(0,"s**s")
result.append("e**e")
i = 1
while i < len(result):
str1 = result[i-1]
strr = str1 + '$' + result[i]
if strr in postings:
postings[strr][0] += 1
else:
postings[strr][0] = 1
if str1 in pre_postings:
pre_postings[str1] +=1
else:
pre_postings[str1] =1
i+=1
# print(pre_postings)
def cal_probability():
global postings,pre_postings,V
for term in postings:
inde = term.split('$')[0]
# print(term)
# print(inde)
postings[term][1] = postings[term][0]/pre_postings[inde]
def get_pw_of_absent(newTerm):
#根据在测试集中新出现的词来更新语料库的词概率信息(加 1 法)
global pre_postings,V
tem = newTerm.slipt('$')[0]
pw = 0.0
if tem in pre_postings:
#第一种将|V|换为了 Wi-1 词后能出现的词的可能数,
#之前为pre_postings[Wi-1]次里面去重后的种类数——set(...)<<|V| 一般远小于
#再加一种为现在可能的种数
#但是这种平滑可能会导致新出现的词元的概率比较大,应当回过头去将其它可能的情况的词元概率也做修改,重新计算
#本计算为了简化就不做修改,分母加|V|,使新词的概率比较小,一定程度上认为原训练语料库的数据可靠性高
#新出现的词只是很小概率的事
#pw = 1/(pre_postings[tem]+set(pre_postings[tem])+1)
pw = 1/(pre_postings[tem]+V)
else:
#pw = 1/(0+0+1)
pw = 1/V
return pw
def test():
global postings
log_PT = 0
f = open('test_LM.txt','r',encoding='utf-8',errors='ignore')
document = f.read()
f.close()
test_wNum = 0
document=document.lower()
document=re.sub(r"\W|\d|\s{2,}"," ",document)#保留字母下划线
document=re.sub(r"\s{2,}"," ",document)
document = document.replace('__eou__','\t') #将__eou__替换为分割符号
sentences = document.split('\t')
if sentences[-1]=="":
sentences.pop()
print("pop one ...")
for sen in sentences:
terms=TextBlob(sen).words.singularize()
result=[]
for word in terms:
expected_str = Word(word)
expected_str = expected_str.lemmatize("v")
result.append(expected_str)
result.insert(0,"s**s")
result.append("e**e")
i = 1
while i < len(result):
strr = result[i-1] + '$' + result[i]
test_wNum += 1
if strr in postings:
log_PT += math.log(postings[strr][1],2)
else:
print("one not in posting!!!")
temp = get_pw_of_absent(strr)
log_PT += math.log(temp,2)
i+=1
print("log_PT:"+str(log_PT))
print("test_num:"+str(test_wNum))
PPL = pow(2,-log_PT/test_wNum)
return PPL
if __name__ == "__main__":
main()