数据挖掘

第六章:优化近邻算法

2018-10-24  本文已影响4人  无赖宵小

KNN 算法

k 近邻算法( kNN ):考察新记录周围距离最近的 k 条记录,而不是只看一条。

每个近邻都有投票权,程序会将新纪录判定为得票数最多的分类。比如说,我们使用三个近邻( k = 3 ),其中两条记录属于体操,一条记录属于马拉松,那我们会判定 x 为体操。

因此,我们在判定一条记录的具体分类时可以用这种投票的方法。如果两个分类的票数相等,就随机选取一个。

但对于需要预测具体数值的情形,我们可以计算** k 个近邻的距离加权平均值**。

例:

在计算平均值的时候我们希望距离越近的用户影响越大,因此可以对距离取倒数,从而得到下表:

把所有的距离倒数除以距离倒数的和( 0.2 + 0.1 + 0.067 = 0.367 ),从而得到评分的权重:

求的平均值,即对新纪录的预测值:

kNN算法实现

def knn(self, itemVector):
    """使用kNN算法判断itemVector所属类别"""
    # 使用heapq.nsmallest来获得k个近邻
    neighbors = heapq.nsmallest(self.k, [(self.manhattan(itemVector, item[1]), item)for item in self.data])
    # 每个近邻都有投票权
    results = {}

    for neighbor in neighbors:
        theClass = neighbor[1][0]
        results.setdefault(theClass, 0)
        results[theClass] += 1

    resultList = sorted([(i[1], i[0]) for i in results.items()], reverse=True)
    # 获取得票最高的分类
    maxVotes = resultList[0][0]
    possibleAnswers = [i[1] for i in resultList if i[0] == maxVotes]
    # 若得票相等则随机选取一个
    answer = random.choice(possibleAnswers)    return(answer)

# -*- coding:utf-8 -*-

'''
Created on 2018年11月27日

@author: KingSley
'''

import heapq
import random

class Classifier:
    def __init__(self, bucketPrefix, testBucketNumber, dataFormat, k):
        """该分类器程序将从 bucketPrefix 指定的一系列文件中读取数据,
        并留出 testBucketNumber 指定的桶来做测试集,其余的做训练集。
        dataFormat 用来表示数据的格式,如:
        "class num num num num num comment"
        """
        
        self.medianAndDeviation = []
        self.k = k 
        
        # 从文件中读取文件

        self.format = dataFormat.strip().split('\t')
        self.data = []
        # 用 1-10 来标记桶
        for i in range(1, 11):
            # 判断该桶时候包含在训练集中
            if i != testBucketNumber:
                filename = "%s-%02i" % (bucketPrefix, i)
                f = open(filename)
                lines = f.readlines()
                f.close()
                for line in lines[1:]:
                    fields = line.strip().split('\t')
                    ignore = []
                    vector = []
                    for i in range(len(fields)):
                        if self.format[i] == 'num':
                            vector.append(float(fields[i]))
                        elif self.format[i] == 'comment':
                            ignore.append(fields[i])
                        elif self.format[i] == 'class':
                            classification = fields[i]
                    self.data.append((classification, vector, ignore))
        self.rawData = list(self.data)
        # 获取特征向量的长度
        self.vlen = len(self.data[0][1])
        # 标准化数据
        for i in range(self.vlen):
            self.normalizeColumn(i)
            
    def getMedian(self, alist):
        """返回中位数"""
        if alist == []:
            return []
        blist = sorted(alist)
        length = len(alist)
        if length % 2 == 1:
            # 列表有奇数个元素,返回中间元素
            return blist[int(((length + 1) / 2) -  1)]
        else:
            # 列表有偶数个元素,返回总量两个元素的均值
            v1 = blist[int(length / 2)]
            v2 = blist[(int(length / 2) - 1)]
            return (v1 + v2) / 2.0
        
    def getAbsoluteStandardDeviation(self, alist, median):
        """计算绝对偏差"""
        sum = 0
        for item in alist:
            sum += abs(item - median)
        return sum / len(alist)
    
    def normalizeColumn(self, columnNumber):
        """标准化 self.data 中的 columnNumber 列"""
        # 将该列所有值提取到一个列表中
        col = [v[1][columnNumber] for v in self.data]
        median = self.getMedian(col)
        asd = self.getAbsoluteStandardDeviation(col, median)
        #print("Median: %f   ASD = %f" % (median, asd))
        self.medianAndDeviation.append((median, asd))
        for v in self.data:
            v[1][columnNumber] = (v[1][columnNumber] - median) / asd

    def normalizeVector(self, v):
        """对每列的中位数和绝对偏差,计算标准化向量 v"""
        vector = list(v)
        for i in range(len(vector)):
            (median, asd) = self.medianAndDeviation[i]
            vector[i] = (vector[i] - median) / asd
        return vector
    
    def testBucket(self, bucketPrefix, bucketNumber):
        """读取 bucketPrefix - bucketNumber 所指定的文件作为测试集"""
        
        filename = "%s-%02i" % (bucketPrefix, bucketNumber)
        f = open(filename)
        lines = f.readlines()
        totals = {}
        f.close()
        for line in lines:
            data = line.strip().split('\t')
            vector = []
            classInColumn = -1
            for i in range(len(self.format)):
                if self.format[i] == 'num':
                    vector.append(float(data[i]))
                elif self.format[i] == 'class':
                    classInColumn = i
            theRealClass = data[classInColumn]
            classifiedAs = self.classify(vector)
            totals.setdefault(theRealClass, {})
            totals[theRealClass].setdefault(classifiedAs, 0)
            totals[theRealClass][classifiedAs] += 1
        return totals
        
    def manhattan(self, vector1, vector2):
        """计算曼哈顿距离"""
        return sum(map(lambda v1, v2: abs(v1 - v2), vector1, vector2))
    
    def nearestNeighbor(self, itemVector):
        """返回 itemVector 的邻近"""
        return min([(self.manhattan(itemVector, item[1]), item) for item in self.data])
    
    def knn(self, itemVector):
        """使用 knn 算法判断 itemVector 所属类别"""
        # 使用 heapq.nsmallest 来获得 k 个近邻
        neighbors = heapq.nsmallest(self.k, [(self.manhattan(itemVector, item[1]), item) for item in self.data])
        # 每个近邻都有投票权
        results = {}
        for neighbor in neighbors:
            theClass = neighbor[1][0]
            results.setdefault(theClass, 0)
            results[theClass] += 1
        resultList = sorted([(i[1], i[0]) for i in results.items()], reverse=True)
        # 获取得票最高的分类
        maxVectes = resultList[0][0]
        possibleAnswers = [i[1] for i in resultList if i[0] == maxVectes]
        # 若得票相等则随机选取一个
        answer = random.choice(possibleAnswers)
        return(answer)
    
    def classify(self, itemVector):
        """预测 itemVector 的分类"""
        return self.knn(self.normalizeVector(itemVector))
    
def tenfold(bucketPrefix, dataFormat, k):
    results = {}
    for i in range(1, 11):
        c = Classifier(bucketPrefix, i, dataFormat, k)
        t = c.testBucket(bucketPrefix, i)
        for (key, value) in t.items():
            results.setdefault(key, {})
            for (ckey, cvalue) in value.items():
                results[key].setdefault(ckey, 0)
                results[key][ckey] += cvalue
                
    # 输出结果
    categories = list(results.keys())
    categories.sort()
    print(   "\n       Classified as: ")
    header =    "        "
    subheader = "      +"
    for category in categories:
        header += category + "   "
        subheader += "----+"
    print (header)
    print (subheader)
    total = 0.0
    correct = 0.0
    for category in categories:
        row = category + "    |"
        for c2 in categories:
            if c2 in results[category]:
                count = results[category][c2]
            else:
                count = 0
            row += " %2i |" % count
            total += count
            if c2 == category:
                correct += count
        print(row)
    print(subheader)
    print("\n%5.3f percent correct" %((correct * 100) / total))
    print("total of %i instances" % total)
   
print("SMALL DATA SET") 
tenfold("pimaSmall\pimaSmall","num\tnum\tnum\tnum\tnum\tnum\tnum\tnum\tclass", 3)
print("\n\nLARGE DATA SET")
tenfold("pima\pima","num\tnum\tnum\tnum\tnum\tnum\tnum\tnum\tclass", 3)

参考原文作者:Ron Zacharski CC BY-NC 3.0] https://github.com/egrcc/guidetodatamining

参考原文原文 http://guidetodatamining.com/

参考译文来自 @egrcchttps://github.com/egrcc/guidetodatamining

上一篇 下一篇

猜你喜欢

热点阅读