Machine Learning & Recommendation & NLP & DL

自然语言处理N天-Day0802从自然语言处理角度看HMM和CR

2019-02-17  本文已影响6人  我的昵称违规了
新建 Microsoft PowerPoint 演示文稿 (2).jpg

说明:本文依据《中文自然语言处理入门实战》完成。目前网上有不少转载的课程,我是从GitChat上购买。

近几年在自然语言处理领域中,HMM(隐马尔可夫模型)和 CRF(条件随机场)算法常常被用于分词、句法分析、命名实体识别、词性标注等。
由于两者之间有很大的共同点,所以在很多应用上往往是重叠的,但在命名实体、句法分析等领域 CRF 似乎更胜一筹。通常来说如果做自然语言处理,这两个模型应该都要了解,下面我们来看看本文的内容。

第八课 从自然语言处理角度看HMM和CRF

基于HMM训练中文分词器

1.模型介绍

HMM模型是由一个“五元组”构成的集合。
1.StatusSet:状态值集合,状态值集合为 (B, M, E, S),其中 B 为词的首个字,M 为词中间的字,E 为词语中最后一个字,S 为单个字,B、M、E、S 每个状态代表的是该字在词语中的位置。
2.ObservedSet:观察值集合,观察值集合就是所有语料的汉字,甚至包括标点符号所组成的集合。
3.TransProbMatrix:转移概率矩阵,状态转移概率矩阵的含义就是从状态 X 转移到状态 Y 的概率,是一个4×4的矩阵,即 {B,E,M,S}×{B,E,M,S}。
4.EmitProbMatrix:发射概率矩阵,发射概率矩阵的每个元素都是一个条件概率,代表 P(Observed[i]|Status[j]) 概率。
5.InitStatus:初始状态分布,初始状态概率分布表示句子的第一个字属于 {B,E,M,S} 这四种状态的概率。
使用HMM进行分词,要解决的问题就是参数已知的情况下,如何求解状态值序列。

2.语料准备

本次训练使用的预料 syj_trainCorpus_utf8.txt 是我爬取的短文本处理生成的。整个语料大小 264M,包含1116903条数据,UTF-8 编码,词与词之间用空格隔开,用来训练分词模型。
教程给的是CSDN地址,很没劲又设置了1分的付费下载,搞得必须去充值才行,于是就在淘宝付费代下,以下为百度云盘地址

链接:https://pan.baidu.com/s/1eyTPOD7kuDdcIVDLVl6OlA
提取码:qf5k

3.实现

import pickle
import json

STATES = {'B', 'M', 'E', 'S'}
EPS = 0.0001
seg_stop_words = {" ", ",", "。", "“", "”", '“', "?", "!", ":", "《", "》", "、", ";", "·", "‘ ", "’", "──", ",", ".", "?",
                  "!", "`", "~", "@", "#", "$", "%", "^", "&", "*", "(", ")", "-", "_", "+", "=", "[", "]", "{", "}",
                  '"', "'", "<", ">", "\\", "|" "\r", "\n", "\t"}

class HMM_Model:
    def __init__(self):
        self.trans_mat = {}  # trans_mat:状态转移矩阵,trans_mat[state1][state2] 表示训练集中由 state1 转移到 state2 的次数。
        self.emit_mat = {}  # emit_mat:观测矩阵,emit_mat[state][char] 表示训练集中单字 char 被标注为 state 的次数。
        self.init_vec = {}  # init_vec:初始状态分布向量,init_vec[state] 表示状态 state 在训练集中出现的次数。
        self.state_count = {}  # state_count:状态统计向量,state_count[state]表示状态 state 出现的次数。
        self.states = {}
        self.inited = False

    # 初始化第一个方法中的数据结构
    def setup(self):
        for state in self.states:
            # build trans_mat
            self.trans_mat[state] = {}
            for target in self.states:
                self.trans_mat[state][target] = 0.0
            self.emit_mat[state] = {}
            self.init_vec[state] = 0
            self.state_count[state] = 0
        self.inited = True

    # filename 指定模型名称,默认模型名称为 hmm.json,这里提供两种格式的保存类型,JSON 或者 pickle 格式,通过参数 code 来决定,code 的值为 code='json' 或者 code = 'pickle',默认为 code='json'
    def save(self, filename="hmm.json", code='json'):
        fw = open(filename, 'w', encoding='utf-8')
        data = {
            "trans_mat": self.trans_mat,
            "emit_mat": self.emit_mat,
            "init_vec": self.init_vec,
            "state_count": self.state_count
        }
        if code == "json":
            txt = json.dumps(data)
            txt = txt.encode('utf-8').decode('unicode-escape')
            fw.write(txt)
        elif code == "pickle":
            pickle.dump(data, fw)
        fw.close()

    # filename 指定模型名称,默认模型名称为 hmm.json,这里提供两种格式的保存类型,JSON 或者 pickle 格式,通过参数 code 来决定,code 的值为 code='json' 或者 code = 'pickle',默认为 code='json'
    def load(self, filename="hmm.json", code="json"):
        fr = open(filename, 'r', encoding='utf-8')
        if code == "json":
            txt = fr.read()
            model = json.loads(txt)
        elif code == "pickle":
            model = pickle.load(fr)
        self.trans_mat = model["trans_mat"]
        self.emit_mat = model["emit_mat"]
        self.init_vec = model["init_vec"]
        self.state_count = model["state_count"]
        self.inited = True
        fr.close()

    # 用来训练模型,因为使用的标注数据集, 因此可以使用更简单的监督学习算法,训练函数输入观测序列和状态序列进行训练, 依次更新各矩阵数据。类中维护的模型参数均为频数而非频率, 这样的设计使得模型可以进行在线训练,使得模型随时都可以接受新的训练数据继续训练,不会丢失前次训练的结果
    def do_train(self, observes, states):
        if not self.inited:
            self.setup()
        for i in range(len(states)):
            if i == 0:
                self.init_vec[states[0]] += 1
                self.state_count[states[0]] += 1
            else:
                self.trans_mat[states[i - 1]][states[i]] += 1
                self.state_count[states[i]] += 1
                if observes[i] not in self.emit_mat[states[i]]:
                    self.emit_mat[states[i]][observes[i]] = 1
                else:
                    self.emit_mat[states[i]][observes[i]] += 1

    # 在进行预测前,需将数据结构的频数转换为频率
    def get_prob(self):
        init_vec = {}
        trans_mat = {}
        emit_mat = {}
        default = max(self.state_count.values())
        for key in self.init_vec:
            if self.state_count[key] != 0:
                init_vec[key] = float(self.init_vec[key]) / self.state_count[key]
            else:
                init_vec[key] = float(self.init_vec[key]) / default

        for key1 in self.trans_mat:
            trans_mat[key1] = {}
            for key2 in self.trans_mat[key1]:
                if self.state_count[key1] != 0:
                    trans_mat[key1][key2] = float(self.trans_mat[key1][key2]) / self.state_count[key1]
                else:
                    trans_mat[key1][key2] = float(self.trans_mat[key1][key2]) / default

        for key1 in self.emit_mat:
            emit_mat[key1] = {}
            for key2 in self.emit_mat[key1]:
                if self.state_count[key1] != 0:
                    emit_mat[key1][key2] = float(self.emit_mat[key1][key2]) / self.state_count[key1]
                else:
                    emit_mat[key1][key2] = float(self.emit_mat[key1][key2]) / default
        return init_vec, trans_mat, emit_mat

    # 采用Viterbi(维特比)算法求得最优路径
    def do_predict(self, sequence):
        tab = [{}]
        path = {}
        init_vec, trans_mat, emit_mat = self.get_prob()

        # 初始化
        for state in self.states:
            tab[0][state] = init_vec[state] * emit_mat[state].get(sequence[0], EPS)
            path[state] = [state]

        # 创建动态搜索表
        for t in range(1, len(sequence)):
            tab.append({})
            new_path = {}
            for state1 in self.states:
                items = []
                for state2 in self.states:
                    if tab[t - 1][state2] == 0:
                        continue
                    prob = tab[t - 1][state2] * trans_mat[state2].get(state1, EPS) * emit_mat[state1].get(sequence[t],
                                                                                                          EPS)
                    items.append((prob, state2))
                best = max(items)
                tab[t][state1] = best[0]
                new_path[state1] = path[best[1]] + [state1]
            path = new_path

        # 搜索最有路径
        prob, state = max([(tab[len(sequence) - 1][state], state) for state in self.states])
        return path[state]


def get_tags(src):
    tags = []
    if len(src) == 1:
        tags = ['S']
    elif len(src) == 2:
        tags = ['B', 'E']
    else:
        m_num = len(src) - 2
        tags.append('B')
        tags.extend(['M'] * m_num)
        tags.append('E')
    return tags


def cut_sent(src, tags):
    word_list = []
    start = -1
    started = False

    if len(tags) != len(src):
        return None

    if tags[-1] not in {'S', 'E'}:
        if tags[-2] in {'S', 'E'}:
            tags[-1] = 'S'
        else:
            tags[-1] = 'E'

    for i in range(len(tags)):
        if tags[i] == 'S':
            if started:
                started = False
                word_list.append(src[start:i])
            word_list.append(src[i])
        elif tags[i] == 'B':
            if started:
                word_list.append(src[start:i])
            start = i
            started = True
        elif tags[i] == 'E':
            started = False
            word = src[start:i + 1]
            word_list.append(word)
        elif tags[i] == 'M':
            continue
    return word_list


class HMMSoyoger(HMM_Model):
    def __init__(self, *args, **kwargs):
        super(HMMSoyoger, self).__init__(*args, **kwargs)
        self.states = STATES
        self.data = None

    # 加载训练数据
    def read_txt(self, filename):
        self.data = open(filename, 'r', encoding="utf-8")

    # 模型训练函数
    def train(self):
        if not self.inited:
            self.setup()

        for line in self.data:
            line = line.strip()
            if not line:
                continue

                # 观测序列
            observes = []
            for i in range(len(line)):
                if line[i] == " ":
                    continue
                observes.append(line[i])

            # 状态序列
            words = line.split(" ")

            states = []
            for word in words:
                if word in seg_stop_words:
                    continue
                states.extend(get_tags(word))
            # 开始训练
            if (len(observes) >= len(states)):
                self.do_train(observes, states)
            else:
                pass

    # 模型分词预测
    def lcut(self, sentence):
        try:
            tags = self.do_predict(sentence)
            return cut_sent(sentence, tags)
        except:
            return sentence

#训练模型,首先实例化 HMMSoyoger 类,然后通过 read_txt() 方法加载语料,再通过 train() 进行在线训练,如果训练语料比较大,可能需要等待一点时间
soyoger=HMMSoyoger()
soyoger.read_txt(r'C:\Users\01\Desktop\机器学习作业\sklearn+tensorflow\corpus_data\syj_trainCorpus_utf8.txt')
soyoger.train()

print(soyoger.lcut("中国的人工智能发展进入高潮阶段。"))
print(soyoger.lcut("中文自然语言处理是人工智能技术的一个重要分支。"))
print(soyoger.lcut("关注可了解更多的教程及排版技巧。问题或建议,请公众号留言"))
print(soyoger.lcut("南京市长是个好同志"))
print(soyoger.lcut("南京市长江大桥"))

我按照教程敲下来的代码,分词结果明显有问题……两个两个分词,就这教程还说好……
['中国', '的', '人工', '智能', '发展', '进入', '高潮', '阶段', '。']
['中文', '自然', '语言', '处理', '是', '人工', '智能', '技术', '的一', '个重', '要分', '支。']
['关注', '可了', '解更', '多的', '教程', '及排', '版技', '巧。', '问题', '或建', '议,', '请公', '众号', '留言']
['南京', '市长', '是', '个好', '同志']
['南京', '市长', '江大', '桥']

试一下HanLP分词

from pyhanlp import *

print(HanLP.segment(r'你好,欢迎在Python中调用HanLP的API'))
testCases = [
    "中国的人工智能发展进入高潮阶段。",
    "中文自然语言处理是人工智能技术的一个重要分支。",
    "关注可了解更多的教程及排版技巧。问题或建议,请公众号留言",
    "南京市长是个好同志",
    "南京市长江大桥"]
for sentence in testCases: print(HanLP.segment(sentence))

#[你好/vl, ,/w, 欢迎/v, 在/p, Python/nx, 中/f, 调用/v, HanLP/nx, 的/ude1, API/nx]
# [中国/ns, 的/ude1, 人工智能/n, 发展/vn, 进入/v, 高潮/n, 阶段/n, 。/w]
# [中文/nz, 自然语言处理/nz, 是/vshi, 人工智能/n, 技术/n, 的/ude1, 一个/mq, 重要/a, 分支/n, 。/w]
# [关注/v, 可/v, 了解/v, 更多/ad, 的/ude1, 教程/n, 及/cc, 排版/vn, 技巧/n, 。/w, 问题/n, 或/c, 建议/n, ,/w, 请/v, 公众/n, 号/q, 留言/n]
# [南京/ns, 市长/nnt, 是/vshi, 个/q, 好/a, 同志/n]
# [南京市/ns, 长江大桥/nz]

结果一目了然,干嘛还要重复造轮子……
关于CRF分词,同样HanLP也有,不过需要通过调用JClass来完成对Java的底层API的调用。

上一篇下一篇

猜你喜欢

热点阅读