自然语言处理之语言模型(language model)

2019-05-11  本文已影响0人  Mr_Relu

编程环境:

anaconda + python3.7
完整代码及数据已经更新至GitHub,欢迎fork~GitHub链接


声明:创作不易,未经授权不得复制转载
statement:No reprinting without authorization


1:用 python 编程实践语言模型 (uni-gram 和 bi-gram),加入平滑技术。

2:计算 test.txt 中句子的 PPL,对比 uni-gram 和 bi-gram 语言模型效果。

uni-gram code:

# -*- coding: utf-8 -*-
"""
Created on Mon Mar 11 08:10:22 2019

@author: Mr.relu
"""
from textblob import TextBlob
from textblob import Word
from collections import defaultdict
from functools import reduce 
import re
import math
from sklearn.externals import joblib


dictionary = set()              #文档词典

postings = defaultdict(dict)    #存放词频信息的postings

total_num = 0                   #总词频数

num_dic = 0                     #总词个数


def main():
    global postings,total_num,num_dic,dictionary
    
    get_dic()
    cal_probability()
    num_dic = len(dictionary)
    
#    joblib.dump(postings, 'uni_gram_postings.pkl')
#    postings = joblib.load('uni_gram_postings.pkl')
    
    print("Total number of train words:"+str(total_num))
    print("number of dictionary:"+str(num_dic))

    ppl = test()
   
    print("the test PPL score is:"+str(round(ppl,5)))


def get_dic():
    global dictionary,total_num,postings
    f = open('train_LM.txt','r',encoding='utf-8',errors='ignore')
    lines = f.readlines()
    f.close()

    for line in lines:
        terms = line_token(line)
        #print(terms)
        d_tnum=len(terms)#预处理后每篇文档的总词数
        #print(d_tnum)
        unique_terms = set(terms)
        dictionary = dictionary.union(unique_terms)#并入总词典
        total_num += d_tnum
        
        for term in unique_terms:
            
            c_term=terms.count(term)
            if term in postings:
                postings[term][0] += c_term
            else:
                postings[term][0] = c_term

       
    
    
def line_token(document):

    document=document.lower()
    document = document.replace('__eou__','') #将__eou__删除  
    #document=re.sub(r'', " ",document)
    #document=re.sub(r"\W|\d|_|\s{2,}"," ",document)#\W只保留数字字母下划线。\d匹配任意数字 \s{2,}匹配两个以上的空格
    document=re.sub(r"\W|\d|\s{2,}"," ",document)#保留数字和下划线
    terms=TextBlob(document).words.singularize()
    result=[]
    for word in terms:
        expected_str = Word(word)
        expected_str = expected_str.lemmatize("v")#还原动词原型
        result.append(expected_str)
    return result     
  

def cal_probability():
    global postings,total_num
    
    for term in postings:
        postings[term][1] = postings[term][0]/total_num
        



def get_pw_of_absent(newTerm):
    #根据在测试集中新出现的词来更新语料库的词概率信息(加 1 法)
    
    global total_num,num_dic
    
    return 1/(total_num + num_dic)
    


def test():
    global postings
    log_PT = 0
    f = open('test_LM.txt','r',encoding='utf-8',errors='ignore')
    document = f.read()
    f.close()
    test_wNum = 0
    words = line_token(document)
    for expected_str in words:
        test_wNum += 1
        #加 1 法平滑
        if expected_str in postings:
            log_PT += math.log(postings[expected_str][1],2)
        else:
#            print("update_posting!!!")
#            update_postings_dic(expected_str)
#            log_PT += math.log(postings[expected_str][1],2)
            print("one not in posting!!!")
            temp = get_pw_of_absent(str)
            log_PT += math.log(temp,2)
    print("log_PT:"+str(log_PT))
    print("test_num:"+str(test_wNum))
    PPL = pow(2,-log_PT/test_wNum)  
    return PPL
"""
#    document=re.sub(r"\W|\d|\s{2,}"," ",document)#保留字母下划线
#    document = document.replace('__eou__','\t') #将__eou__替换为分割符号
#    sentences = document.split('\t')
#    
#    for sen in sentences:
#        terms=TextBlob(sen).words.singularize()
#        
#        for word in terms:
#            expected_str = Word(word)
#            expected_str = expected_str.lemmatize("v")#还原动词原型
#            #加 1 法平滑
#            if expected_str in postings:
#                log_PT += math.log(postings[expected_str][1])
#            else:
#                update_postings_dic(expected_str)
#                log_PT += math.log(postings[expected_str][1])
#        print(sen)
"""  
if __name__ == "__main__":
    main()

bi-gram code:

# -*- coding: utf-8 -*-
"""
Created on Mon Mar 11 18:52:22 2019

@author: Mr.relu
"""
from textblob import TextBlob
from textblob import Word
from collections import defaultdict
import re
import math

postings = defaultdict(dict)    #存放词频信息的postings

pre_postings = defaultdict(dict)

V = 0                           #语料词汇量

def main():
    global postings,pre_postings,V
    get_dic()
    V = len(pre_postings)-1
    print("V:"+str(V))
#    print(pre_postings)
#    print(postings)
    cal_probability()
#    print(postings) 
    print("get here~~~")
    ppl = test()   
    print("the test PPL score is:"+str(round(ppl,5)))


def get_dic():
    
    global postings,pre_postings
    f = open('train_LM.txt','r',encoding='utf-8',errors='ignore')
    document = f.read()
    f.close()
    
    document=document.lower()
    document=re.sub(r"\W|\d|\s{2,}"," ",document)#保留字母下划线
    document=re.sub(r"\s{2,}"," ",document)
    document = document.replace('__eou__','\t') #将__eou__替换为分割符号
    sentences = document.split('\t')
    
    if sentences[-1]=="":
        sentences.pop()
        print("pop one ...")
#    print(sentences)   
    for sen in sentences:        
        terms=TextBlob(sen).words.singularize()
        
        result=[]
        for word in terms:
            expected_str = Word(word)
            expected_str = expected_str.lemmatize("v")
            result.append(expected_str)
                                 
        result.insert(0,"s**s")
        result.append("e**e")
        
        i = 1
        while i < len(result):
            
            str1 = result[i-1]
            
            strr = str1 + '$' + result[i]
            
            if strr in postings:
                postings[strr][0] += 1
            else:
                postings[strr][0] = 1
            
            if str1 in pre_postings:
                pre_postings[str1] +=1
            else:
                pre_postings[str1] =1
            i+=1
    
#    print(pre_postings)
    
    

def cal_probability():
    global postings,pre_postings,V

    for term in postings:
        inde = term.split('$')[0]
#        print(term)
#        print(inde)
        postings[term][1] = postings[term][0]/pre_postings[inde]
        


def get_pw_of_absent(newTerm):
    #根据在测试集中新出现的词来更新语料库的词概率信息(加 1 法) 
    global pre_postings,V
    
    tem = newTerm.slipt('$')[0]
    pw = 0.0
    
    if tem in pre_postings:
        #第一种将|V|换为了 Wi-1 词后能出现的词的可能数,
        #之前为pre_postings[Wi-1]次里面去重后的种类数——set(...)<<|V| 一般远小于
        #再加一种为现在可能的种数
        #但是这种平滑可能会导致新出现的词元的概率比较大,应当回过头去将其它可能的情况的词元概率也做修改,重新计算
        #本计算为了简化就不做修改,分母加|V|,使新词的概率比较小,一定程度上认为原训练语料库的数据可靠性高
        #新出现的词只是很小概率的事
        #pw = 1/(pre_postings[tem]+set(pre_postings[tem])+1)
        pw = 1/(pre_postings[tem]+V)
    
    else:
        #pw = 1/(0+0+1)
        pw = 1/V
        
    return pw

    


def test():
    global postings
    
    log_PT = 0
    f = open('test_LM.txt','r',encoding='utf-8',errors='ignore')
    document = f.read()
    f.close()

    test_wNum = 0
    
    document=document.lower()
    document=re.sub(r"\W|\d|\s{2,}"," ",document)#保留字母下划线
    document=re.sub(r"\s{2,}"," ",document)
    document = document.replace('__eou__','\t') #将__eou__替换为分割符号
    sentences = document.split('\t')
    
    if sentences[-1]=="":
        sentences.pop()
        print("pop one ...")
    
    for sen in sentences:        
        terms=TextBlob(sen).words.singularize()
        
        result=[]
        for word in terms:
            expected_str = Word(word)
            expected_str = expected_str.lemmatize("v")
            result.append(expected_str)
                      
        result.insert(0,"s**s")
        result.append("e**e")
        
        i = 1
        while i < len(result):
            
            strr = result[i-1] + '$' + result[i]
            test_wNum += 1
            if strr in postings:                
                log_PT += math.log(postings[strr][1],2)
            else:
                print("one not in posting!!!")
                temp = get_pw_of_absent(strr)
                log_PT += math.log(temp,2)
            i+=1

    print("log_PT:"+str(log_PT))
    print("test_num:"+str(test_wNum))
    PPL = pow(2,-log_PT/test_wNum)  
    return PPL  
            
if __name__ == "__main__":
    main()

上一篇下一篇

猜你喜欢

热点阅读