6NER实战-(1)数据预处理

2019-10-08  本文已影响0人  弟弟们的哥哥

本文以命名实体识别NER数据预处理为例


将训练集中每句话变成4个list:


Batch:


构建模型:


Bi-LSTM


1. 加载数据集

ef load_sentences(path):
    """
    加载数据集,每一行至少包含一个汉字和一个标记
    句子和句子之间是以空格进行分割
    最后返回句子集合
    :param path:
    :return:
    """
    # 存放数据集
    sentences = []
    # 临时存放每一个句子
    sentence = []
    for line in codecs.open(path, 'r', encoding='utf-8'):
        # 去掉两边空格
        line = line.strip()
        # 首先判断是不是空,如果是则表示句子和句子之间的分割点
        if not line:
            if len(sentence) > 0:
                sentences.append(sentence)
                # 清空sentence表示一句话完结
                sentence = []
        else:
            if line[0] == " ":
                continue
            else:
                word = line.split()
                assert len(word) >= 2
                sentence.append(word)
    # 循环走完,要判断一下,防止最后一个句子没有进入到句子集合中
    if len(sentence) > 0:
        sentences.append(sentence)
    return sentences
image.png

2. 更新数据集编码

def update_tag_scheme(sentences, tag_scheme):
    """
    更新为指定编码
    :param sentences:
    :param tag_scheme:
    :return:
    """
    for i, s in enumerate(sentences):
        tags = [w[-1] for w in s]
        if not data_utils.check_bio(tags):
            s_str = "\n".join(" ".join(w) for w in s)
            raise Exception("输入的句子应为BIO编码,请检查输入句子%i:\n%s" % (i, s_str))

        if tag_scheme == "BIO":
            for word, new_tag in zip(s, tags):
                word[-1] = new_tag

        if tag_scheme == "BIOES":
            new_tags = data_utils.bio_to_bioes(tags)
            for word, new_tag in zip(s, new_tags):
                word[-1] = new_tag
        else:
            raise Exception("非法目标编码")

data_utils中的check_bio(tags),,首先确定是不是合法的BIO

def check_bio(tags):
    """
    检测输入的tags是否是bio编码
    如果不是bio编码
    那么错误的类型
    (1)编码不在BIO中
    (2)第一个编码是I
    (3)当前编码不是B,前一个编码不是O
    :param tags:
    :return:
    """
    for i, tag in enumerate(tags):
        if tag == 'O':
            continue
        tag_list = tag.split("-")
        if len(tag_list) != 2 or tag_list[0] not in set(['B', 'I']):
            # 非法编码
            return False
        if tag_list[0] == 'B':
            continue
        elif i == 0 or tags[i - 1] == 'O':
            # 如果第一个位置不是B或者当前编码不是B并且前一个编码0,则全部转换成B
            tags[i] = 'B' + tag[1:]
        elif tags[i - 1][1:] == tag[1:]:
            # 如果当前编码的后面类型编码与tags中的前一个编码中后面类型编码相同则跳过
            continue
        else:
            # 如果编码类型不一致,则重新从B开始编码
            tags[i] = 'B' + tag[1:]
    return True

把BIO转换成BIOES

def bio_to_bioes(tags):
    """
    把bio编码转换成bioes编码
    返回新的tags
    :param tags:
    :return:
    """
    new_tags = []
    for i, tag in enumerate(tags):
        if tag == 'O':
            # 直接保留,不变化
            new_tags.append(tag)
        elif tag.split('-')[0] == 'B':
            # 如果tag是以B开头,那么我们就要做下面的判断
            # 首先,如果当前tag不是最后一个,并且紧跟着的后一个是I
            if (i + 1) < len(tags) and tags[i + 1].split('-')[0] == 'I':
                # 直接保留
                new_tags.append(tag)
            else:
                # 如果是最后一个或者紧跟着的后一个不是I,那么表示单字,需要把B换成S表示单字
                new_tags.append(tag.replace('B-', 'S-'))
        elif tag.split('-')[0] == 'I':
            # 如果tag是以I开头,那么我们需要进行下面的判断
            # 首先,如果当前tag不是最后一个,并且紧跟着的一个是I
            if (i + 1) < len(tags) and tags[i + 1].split('-')[0] == 'I':
                # 直接保留
                new_tags.append(tag)
            else:
                # 如果是最后一个,或者后一个不是I开头的,那么就表示一个词的结尾,就把I换成E表示一个词结尾
                new_tags.append(tag.replace('I-', 'E-'))

        else:
            raise Exception('非法编码')
    return new_tags

3. 构建字典映射

def word_mapping(sentences):
    """
    构建字典
    :param sentences:
    :return:
    """
    word_list = [[x[0] for x in s] for s in sentences]  # 得到所有的字
    dico = data_utils.create_dico(word_list)
    dico['<PAD>'] = 10000001
    dico['<UNK>'] = 10000000
    word_to_id, id_to_word = data_utils.create_mapping(dico)
    return dico, word_to_id, id_to_word 

Create_dico用来统计词频,这里也可以引入collections.Counter()来计算词频

def create_dico(item_list):
    """
    对于item_list中的每一个items,统计items中item在item_list中的次数
    item:出现的次数
    :param item_list:
    :return:
    """
    assert type(item_list) is list
    dico = {}
    for items in item_list:
        for item in items:
            if item not in dico: #第一次出现,标记为1
                dico[item] = 1
            else:
                dico[item] += 1
    return dico

根据词频来创建映射的方法create_mapping()

def create_mapping(dico):
    """
    创建item to id, id_to_item
    item的排序按词典中出现的次数
    :param dico:
    :return:
    """
    sorted_items = sorted(dico.items(), key=lambda x: (-x[1], x[0]))
    id_to_item = {i: v[0] for i, v in enumerate(sorted_items)}
    item_to_id = {v: k for k, v in id_to_item.items()}
    return item_to_id, id_to_item

4. 构造tag映射

def tag_mapping(sentences):
    """
    构建标签字典
    :param sentences:
    :return:
    """
    tag_list = [[x[1] for x in s] for s in sentences]
    dico = data_utils.create_dico(tag_list)
    tag_to_id, id_to_tag = data_utils.create_mapping(dico)
    return dico, tag_to_id, id_to_tag

5.prepare dataset

def prepare_dataset(sentences, word_to_id, tag_to_id, train=True):
    """
    数据预处理,返回list其实包含
    -word_list
    -word_id_list
    -word char indexs
    -tag_id_list
    :param sentences:
    :param word_to_id:
    :param tag_to_id:
    :param train:
    :return:
    """
    none_index = tag_to_id['O']

    data = []
    for s in sentences:
        word_list = [w[0] for w in s]  # 集中所有的字
        word_id_list = [word_to_id[w if w in word_to_id else '<UNK>'] for w in word_list]  # 得到所有的字对应的id
        segs = data_utils.get_seg_features("".join(word_list))
        if train:
            tag_id_list = [tag_to_id[w[-1]] for w in s]
        else:
            tag_id_list = [none_index for w in s]
        # 此时对于一个句子则得到4个特征列表
        data.append([word_list, word_id_list, segs, tag_id_list])

    return data

6. jieba来分词,或者用nltk.word_tokenize(sentence)方法来分词

def get_seg_features(words):
    """
    利用jieba分词
    采用类似bioes的编码,0表示单个字成词, 1表示一个词的开始, 2表示一个词的中间,3表示一个词的结尾
    :param words:
    :return:
    """
    seg_features = []

    word_list = list(jieba.cut(words))

    for word in word_list:
        if len(word) == 1:
            seg_features.append(0)
        else:
            temp = [2] * len(word)
            temp[0] = 1
            temp[-1] = 3
            seg_features.extend(temp)
    return seg_features

7. 批量输入batchManager

class BatchManager(object):
    def __init__(self, data, batch_size):
        self.batch_data = self.sort_and_pad(data, batch_size)
        self.len_data = len(self.batch_data)

    def sort_and_pad(self, data, batch_size):
        num_batch = int(math.ceil(len(data) / batch_size))
        sorted_data = sorted(data, key=lambda x: len(x[0]))  # 按照长度对数据进行排序操作
        batch_data = list()
        for i in range(num_batch):
            # 按照批次进行填充,所以每个批次的数据长度是一样的
            batch_data.append(self.pad_data(sorted_data[i * batch_size: (i + 1) * batch_size]))
        return batch_data

填充数据pad_data

def pad_data(data):
    word_list = []
    word_id_list = []
    seg_list = []
    tag_id_list = []
    max_length = max([len(sentence[0]) for sentence in data])
    for line in data:
        words, word_ids, segs, tag_ids = line
        padding = [0] * (max_length - len(words))
        word_list.append(words + padding)
        word_id_list.append(word_ids + padding)
        seg_list.append(segs + padding)
        tag_id_list.append(tag_ids + padding)
    return [word_list, word_id_list, seg_list, tag_id_list]

8. shuffle随机打乱数据

# 随机得到一个批次的数据
def iter_batch(self, shuffle=False):
    if shuffle:
        random.shuffle(self.batch_data)
    for idx in range(self.len_data):
        yield self.batch_data[idx]

到这里就算最基本的数据预处理完成,想了解更细节,也可以通过matplot来看看数据集样本分布,比如train

import matplotlib.pyplot as plt
import collections
train_cnt = collections.Counter(map(len,train_data))
plt.bar(*zip(*train_cnt.items(),color='r'))
plt.xlabel('Sentence Length for Training Data')
plt.ylabel('Samples')
plt.show()
image.png

看一下各个不同命名实体识别的数量情况

def stat_entities(data):
    cnt = collections.defaultdict(int)
    for sentence in data:
        for char, tag in sentence:
            if tag.statswith("B-"):
                cnt[tag] += 1
    cnt["samples"] = len(data)
    return cnt
print(stat_entities(train_data))
image.png

同理,也可查看dev_data和 test_data

除了以上最基本的预处理,还有一些常用的,如去停用词,我们创建一个stopwords.ttxt,这里面可以放一些日常场景需要除去的词,如冠词,人称,数字等特定词,用pd.read_csv打开即可,因为stopword.txt很小,数据集如果较大用IO更快更节省内存。

# 加载停用词
stopwords = pd.read_csv('stopwords.txt', index_col=False, quoting=3, sep="\t", names=['stopword'], encoding='utf-8')
stopwords = stopwords['stopword'].values

分词和停用词相结合

# 定义分词和打标签函数preprocess_text
# 参数content_lines即为上面转换的list
# 参数sentences是定义的空list,用来储存打标签之后的数据
# 参数category 是类型标签
def preprocess_text(content_lines, sentences, category):
    for line in content_lines:
        try:
            segs = jieba.lcut(line)
            segs = [v for v in segs if not str(v).isdigit()]  # 去数字
            segs = list(filter(lambda x: x.strip(), segs))  # 去左右空格
            segs = list(filter(lambda x: len(x) > 1, segs))  # 长度为1的字符
            segs = list(filter(lambda x: x not in stopwords, segs))  # 去掉停用词
            sentences.append((" ".join(segs), category))  # 打标签
        except Exception:
            print(line)
            continue 

另外,还有转换大小写等等具体会根据不同环境来处理数据。


总结一下数据预处理:

  1. 语料清洗:把不相关的,视为噪点的内容删除。人工去重,对齐,删除和标注等。

  2. 分词(以jieba为例):

-  精确分词 jieba.cut(content, cut_all=False)
- 全模式 jieba.cut(content, cut_all=True) 所有的词都扫描出来
- 搜索引擎模式 jieba.cut_for_search(content) 在精确分词基础上再切分,提高召回率
- lcut代替cut  jieba.lcut(content) 这样做为了返回List
- 获取词性:import jieba.posseg as psg    psg.lcut(content)
  1. 词性标注:就是给每次词语打标签,这样可以让文本在后面处理时融入更多的有用的语言信息。分为基于规则和基于统计两种,基于统计如最大熵,HMM,CRF

  2. 去停用词:这个比较灵活,一般情况下标点符号,语气词,人称等都可以去掉,把常用停用词放进一个文档,用时候直接调用。但是做情感分类时候,语气词,感叹词也是应该保留的,因为这些词对语气程度,感情色彩有一定的贡献和意义。

  3. 放进字典,词和tag都转换成id,zip成新的数组。大篇幅就是处理这个玩意,这里就不啰嗦了。

上一篇下一篇

猜你喜欢

热点阅读