数据挖掘

第七章:概率和朴素贝叶斯

2018-10-24  本文已影响9人  无赖宵小

近邻算法又称为被动学习算法。这种算法只是将训练集的数据保存起来,在收到测试数据时才会进行计算。

贝叶斯算法则是一种主动学习算法,它会根据训练集构建起一个模型,并用这个模型来对新的记录进行分类,因此速度会快很多。

贝叶斯算法的两个优点:能够给出分类结果的置信度;以及它是一种主动学习算法。

概率

用 P( h ) 来表示事件 h 发生的概率;

用 P( h | D ) 来表示 D 条件下事件 h 发生的概率。

P( h ) 表示事件 h 发生的概率,称为 h 的先验概率;

P( h | d ) 称为后验概率,表示在观察了数据集 d 之后,h 事件发生的概率是多少。

贝叶斯法则

贝叶斯法则描述了 P( h )、P( h | D )、P( D )、以及P( D | h )这四个概率之间的关系:

在数据挖掘中,我们通常会使用这个公式去判断不同事件之间的关系。

例:我们要为一家销售电子产品的公司发送宣传邮件,共有笔记本、台式机、平板电脑三种产品。我们需要根据目标用户的类型来分别派送这三种宣传邮件。有一位居住在 88005 地区的女士,她的女儿在读大学,并居住在家中,而且她还会参加瑜伽课程,那我们应该派发哪种邮件?

我们用D来表示这位客户的特征:

因此我们需要计算以下三个概率:

选择概率最大的结果。

最大后验估计

如果我们有 h1, h2, ... hn,它们相当于不同的类别

在计算出以上这些概率后,选取最大的结果,就能用作分类了。这种方法叫最大后验估计,记为hMAP

H 表示所有的时间,所以 h ∈ H 表示“对于集合中的每一个事件”。整个公式的含义就是:对于集合中的每一个事件,计算出 P( h | D) 的值,并取最大的结果。

对于所有的事件,公式的分母都是 P( D ) ,因此即便只计算 P( D | h )P( h ),也可以判断出最大的结果。

例:已知这种癌症在美国的感染率是 0.8%。血液检验的结果有阳性和阴性两种,且存在准确性的问题:如果这个人患有癌症,则有 98% 的几率测出阳性;如果他没有癌症,会有 97% 的几率测出阴性。Ann 到医院做了血液检测,呈阳性。

描述语言的公式表示:

美国有 0.8% 的人患有这种癌症:P( 癌症 ) = 0.008

99.2% 的人没有患有这种癌症:P( ┐癌症 ) = 0.992

对于患有癌症的人,他的血液检测结果返回阳性的概率是 98%:P( 阳性 | 癌症 ) = 0.98

对于患有癌症的人,检测结果返回阴性的概率是 2%:P( 阴性 | 癌症 ) = 0.02

对于没有癌症的人,返回阴性的概率是 97%:P( 阴性 | ┐癌症 ) = 0.97

对于没有癌症的人,返回阳性的概率是 3%:P( 阳性 | ┐癌症 ) = 0.03

贝叶斯法则计算:

P( 阳性 | 癌症 )P( 癌症 ) = 0.98 * 0.008 = 0.0078

P( 阳性 | ┐癌症 )P( ┐癌症 ) = 0.03 * 0.992 = 0.0298

分类结果是她不会患有癌症


朴素贝叶斯

用到不止一个前提条件时,计算这样的概率只需将各个条件概率相乘即可:

P( 买绿茶 | 88005 & 买有机食品 ) = P( 88005 | 买绿茶 )P( 买有机食品 | 买绿茶 )P( 买绿茶 ) = 0.6 * 0.8 * 0.5 = 0.24

P( ┐买绿茶 | 88005 & 买有机食品 ) = P( 88005 | ┐买绿茶 )P( 买有机食品 | ┐买绿茶 )P( ┐买绿茶 ) = 0.4 * 0.25 * 0.5 = 0.05

P( i500 | 健康、中等水平、热情一般、适应) = P( 健康 | i500 )P( 中等水平 | i500 )P( 热情一般 | i500 )P( 适应 | i500 )

训练用 Python 代码:

class Classifier:
    def __init__(self, bucketPrefix, testBucketNumber, dataFormat):
        """
        bucketPrefix 分桶数据集文件前缀
        testBucketNumber 测试桶编号
        dataFormat 数据格式,形如:attr attr attr attr class
        """
        total = 0
        classes = {}
        counts = {}
        # 从文件中读取数据
        self.format = dataFormat.strip().split('\t')
        self.prior = {}
        self.conditional = {}

        # 遍历十个桶
        for i in range(1, 11):
            # 跳过测试桶
            if i != testBucketNumber:
                filename = "%s-%02i" % (bucketPrefix, i)
                f = open(filename)
                lines = f.readlines()
                f.close()

                for line in lines:
                    fields = line.strip().split('\t')
                    ignore = []
                    vector = []

                    for i in range(len(fields)):
                        if self.format[i] == 'num':
                            vector.append(float(fields[i]))
                        elif self.format[i] == 'attr':
                            vector.append(fields[i])
                        elif self.format[i] == 'comment':
                            ignore.append(fields[i])
                        elif self.format[i] == 'class':
                            category = fields[i]

                    # 处理该条记录
                    total += 1
                    classes.setdefault(category, 0)
                    counts.setdefault(category, {})
                    classes[category] += 1
                    # 处理各个属性
                    col = 0
                    for columnValue in vector:
                        col += 1
                        counts[category].setdefault(col, {})
                        counts[category][col].setdefault(columnValue, 0)
                        counts[category][col][columnValue] += 1

        # 计数结束,开始计算概率
        # 计算先验概率P(h)
        for (category, count) in classes.items():
            self.prior[category] = count / total

        # 计算条件概率P(h|D)
        for (category, columns) in counts.items():
            self.conditional.setdefault(category, {})
            for (col, valueCounts) in columns.items():
                self.conditional[category].setdefault(col, {})
                for (attrValue, count) in valueCounts.items():
                    self.conditional[category][col][attrValue] = (count / classes[category])        
        self.tmp = counts

分类用 Python 代码:

def classify(self, itemVector):
    """返回itemVector所属类别"""
    results = []

    for (category, prior) in self.prior.items():
        prob = prior
        col = 1
        for attrValue in itemVector:
            if not attrValue in self.conditional[category][col]:
                # 属性不存在,返回0概率
                prob = 0
            else:
                prob = prob * self.conditional[category][col][attrValue]
            col += 1
        results.append((prob, category))

    # 返回概率最高的结果
    return(max(results)[1])

概率估计

使用朴素贝叶斯计算得到的概率其实是真实概率的一种估计,而真是概率是对全量数据做统计得到的。

大部分情况下,这种估计都是接近于真实概率的。但但真是概率非常小时,这种抽样统计的方法就会有问题了。

在朴素贝叶斯中,概率为 0 的影响是很大的,甚至会不顾其他概率的大小。此外,抽样统计的另一个问题是会低估真实概率。

上式中的 nc 可能为 0,解决方法是将公式变为:

决定常数 m 的方法有很多,我们这里使用值的类别作为 m ,比如投票有赞成和否决两种类别,所以 m 就为 2。p 则是先验概率,比如赞成和否决的概率分别是 0.5,那么 p 就是 0.5。


数值型数据

朴素贝叶斯算法使用的是分类型数据,在贝叶斯方法中,我们会对事物进行计数,这种计数则是可以度量的。

方法一:区分类别

我们可以划定几个范围作为分类,如:

划分类别后,就可以应用朴素贝叶斯算法了。

方法二:高斯分布

高斯分布

标准差是用来衡量数据的离散程度的,如果所有数据都接近于平均值,那标准差也会比较小。

总体标准差和样本标准差

对所有的数据进行统计,得到的便是总体标准差。

无法获取总体的数据,只能选取一部分样本,这时计算得到的就是样本标准差。

标准差

高斯分布

68% 的数据会标准差为 1 的范围内,95% 的数据会落在标准差为 2 的范围内:

用希腊字母 μ (读“谬”)来表示平均值,σ (读“西格玛”)来表示标准差。

例( e 是自然常数,约等于 2.718 ):

代码实现

import math
def pdf(mean, ssd, x):
    """概率密度函数,计算P(x|y)"""
    ePart = math.pow(math.e, -(x - mean) ** 2 / (2 * ssd ** 2))
    return (1.0 / (math.sqrt(2 * math.pi) * ssd)) * ePart

算法对比

贝叶斯方法的优点:

实现简单(只需计数即可)

需要的训练集较少

运算效率

贝叶斯方法的缺点:

无法学习特征之间的相互影响

kNN算法的优点:

实现也比较简单

不需要按特定形式准备数据

需要大量内存保存训练集数据

kNN算法的用途:

处理训练集较大的情况,包括推荐系统、蛋白质分析、图片分类等。


# -*- coding:utf-8 -*-

'''
Created on 2018年11月28日

@author: KingSley
'''

import math

class Classifier:
    
    def __init__(self, bucketPrefix, testBucketNumber, dataFormat):
        """
        bucketPrefix 分桶数据集文件前缀
        testBucketNumber 测试桶编号
        dataFormat 数据格式,形如:attr attr attr class
        """
        
        total = 0
        classes = {}
        # 对分类型数据进行计数
        counts = {}
        # 对数值型数据进行求和
        # 使用下面两个变量来计算每个分类各个特征的平均值和样本标准差
        totals = {}
        numericValues = {}
        
        # 从文件中读取文件

        self.format = dataFormat.strip().split('\t')
        self.prior = {}
        self.conditional = {}
        # 用 1-10 来标记桶
        for i in range(1, 11):
            # 判断该桶时候包含在训练集中
            if i != testBucketNumber:
                filename = "%s-%02i" % (bucketPrefix, i)
                f = open(filename)
                lines = f.readlines()
                f.close()
                for line in lines:
                    fields = line.strip().split('\t')
                    ignore = []
                    vector = []
                    nums = []
                    for i in range(len(fields)):
                        if self.format[i] == 'num':
                            nums.append(float(fields[i]))
                        elif self.format[i] == 'attr':
                            vector.append(fields[i])                           
                        elif self.format[i] == 'comment':
                            ignore.append(fields[i])
                        elif self.format[i] == 'class':
                            category = fields[i]
                    # 处理该条记录
                    total += 1
                    classes.setdefault(category, 0)
                    counts.setdefault(category, {})
                    totals.setdefault(category, {})
                    numericValues.setdefault(category, {})
                    classes[category] += 1
                    # 处理分类型数据
                    col = 0
                    for columnValue in vector:
                        col += 1
                        counts[category].setdefault(col, {})
                        counts[category][col].setdefault(columnValue, 0)
                        counts[category][col][columnValue] += 1
                    # 处理数值型数据
                    col = 0
                    for columnValue in nums:
                        col += 1
                        totals[category].setdefault(col, 0)
                        #totals[category][col].setdefault(columnValue, 0)
                        totals[category][col] += columnValue
                        numericValues[category].setdefault(col, [])
                        numericValues[category][col].append(columnValue)
        
        # 计数结束,开始计算概率
        
        # 计算先验概率 P(h)
        for (category, count) in classes.items():
            self.prior[category] = count / total
            
        # 计算条件概率 P(h|D)
        for (category, columns) in counts.items():
            self.conditional.setdefault(category, {})
            for (col, valueCounts) in columns.items():
                self.conditional[category].setdefault(col, {})
                for (attrValue, count) in valueCounts.items():
                    self.conditional[category][col][attrValue] = (count / classes[category])
        self.tmp =  counts
        
        # 计算平均值和样本标准差
        self.means = {}
        self.totals = totals
        for (category, columns) in totals.items():
            self.means.setdefault(category, {})
            for (col, cTotal) in columns.items():
                self.means[category][col] = cTotal / classes[category]

        self.ssd = {}
        for (category, columns) in numericValues.items():
            
            self.ssd.setdefault(category, {})
            for (col, values) in columns.items():
                SumOfSquareDifferences = 0
                theMean = self.means[category][col]
                for value in values:
                    SumOfSquareDifferences += (value - theMean)**2
                columns[col] = 0
                self.ssd[category][col] = math.sqrt(SumOfSquareDifferences / (classes[category]  - 1))
    
    def testBucket(self, bucketPrefix, bucketNumber):
        """读取 bucketPrefix - bucketNumber 所指定的文件作为测试集"""
        
        filename = "%s-%02i" % (bucketPrefix, bucketNumber)
        f = open(filename)
        lines = f.readlines()
        totals = {}
        f.close()
        loc = 1
        for line in lines:
            loc += 1
            data = line.strip().split('\t')
            vector = []
            numV = []
            classInColumn = -1
            for i in range(len(self.format)):
                if self.format[i] == 'num':
                    numV.append(float(data[i]))
                elif self.format[i] == 'attr':
                    vector.append(data[i])
                elif self.format[i] == 'class':
                    classInColumn = i
            theRealClass = data[classInColumn]
            classifiedAs = self.classify(vector, numV)
            totals.setdefault(theRealClass, {})
            totals[theRealClass].setdefault(classifiedAs, 0)
            totals[theRealClass][classifiedAs] += 1
        return totals
    
    def classify(self, itemVector, numVector):
        """返回 itemVector 所属类别"""
        results = []
        sqrt2pi = math.sqrt(2 * math.pi)
        for (category, prior) in self.prior.items():
            prob = prior
            col = 1
            for attrValue in itemVector:
                if not attrValue in self.conditional[category][col]:
                    # 属性不存在,返回 0 概率
                    prob = 0
                else:
                    prob = prob * self.conditional[category][col][attrValue]
                col += 1
            col = 1
            for x in  numVector:
                mean = self.means[category][col]
                ssd = self.ssd[category][col]
                ePart = math.pow(math.e, -(x - mean)**2/(2*ssd**2))
                prob = prob * ((1.0 / (sqrt2pi*ssd)) * ePart)
                col += 1
            results.append((prob, category))
        # 返回概率最高的值
        return(max(results)[1])
    
def tenfold(bucketPrefix, dataFormat):
    results = {}
    for i in range(1, 11):
        c = Classifier(bucketPrefix, i, dataFormat)
        t = c.testBucket(bucketPrefix, i)
        for (key, value) in t.items():
            results.setdefault(key, {})
            for (ckey, cvalue) in value.items():
                results[key].setdefault(ckey, 0)
                results[key][ckey] += cvalue
                
    # 输出结果
    categories = list(results.keys())
    categories.sort()
    print(   "\n            Classified as: ")
    header =    "             "
    subheader = "               +"
    for category in categories:
        header += "% 10s   " % category
        subheader += "-------+"
    print (header)
    print (subheader)
    total = 0.0
    correct = 0.0
    for category in categories:
        row = " %10s    |" % category 
        for c2 in categories:
            if c2 in results[category]:
                count = results[category][c2]
            else:
                count = 0
            row += " %5i |" % count
            total += count
            if c2 == category:
                correct += count
        print(row)
    print(subheader)
    print("\n%5.3f percent correct" %((correct * 100) / total))
    print("total of %i instances" % total)
    
def pdf(mean, ssd, x):
    """概率密度函数,计算 P(x|y)"""
    ePart = math.pow(math.e, -(x-mean)**2/(2*ssd**2))
    print (ePart)
    return (1.0 / (math.sqrt(2*math.pi)*ssd)) * ePart

tenfold("pimaSmall\pimaSmall", "num\tnum\tnum\tnum\tnum\tnum\tnum\tnum\tclass")
tenfold("pima\pima", "num\tnum\tnum\tnum\tnum\tnum\tnum\tnum\tclass")

参考原文作者:Ron Zacharski CC BY-NC 3.0] https://github.com/egrcc/guidetodatamining

参考原文原文 http://guidetodatamining.com/

参考译文来自 @egrcchttps://github.com/egrcc/guidetodatamining

上一篇 下一篇

猜你喜欢

热点阅读