K最近邻属于一种分类算法,他的解释最容易,近朱者赤,近墨者黑,我们想看一个人是什么样的,看他的朋友是什么样的就可以了。当然其他还牵着到,看哪方面和朋友比较接近(对象特征),怎样才算是跟朋友亲近,一起吃饭还是一起逛街算是亲近(距离函数),根据朋友的优秀不优秀如何评判目标任务优秀不优秀(分类算法),是否不同优秀程度的朋友和不同的接近程度要考虑一下(距离权重),看几个朋友合适(k值),能否以分数的形式表示优秀度(概率分布)。
K最近邻概念:
它的工作原理是:存在一个样本数据集合,也称作为训练样本集,并且样本集中每个数据都存在标签,即我们知道样本集中每一个数据与所属分类的对应关系。输入没有标签的新数据后,将新的数据的每个特征与样本集中数据对应的特征进行比较,然后算法提取样本最相似数据(最近邻)的分类标签。一般来说,我们只选择样本数据集中前k个最相似的数据,这就是k-近邻算法中k的出处,通常k是不大于20的整数。最后,选择k个最相似数据中出现次数最多的分类,作为新数据的分类。
今天我们使用k最近邻算法来构建白酒的价格模型。
构造数据集
构建一个葡萄酒样本数据集。白酒的价格跟等级、年代有很大的关系。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
from random import random,randint import math # 根据等级和年代对价格进行模拟 def wineprice(rating,age): peak_age = rating - 50 # 根据等级计算价格 price = rating / 2 if age>peak_age: # 经过“峰值年”,后续5年里其品质将会变差 price = price * ( 5 - (age - peak_age) / 2 ) else : # 价格在接近“峰值年”时会增加到原值的5倍 price = price * ( 5 * ((age + 1 ) / peak_age)) if price< 0 : price = 0 return price # 生成一批模式数据代表样本数据集 def wineset1(): rows = [] for i in range ( 300 ): # 随机生成年代和等级 rating = random() * 50 + 50 age = random() * 50 # 得到一个参考价格 price = wineprice(rating,age) # 添加一些噪音 price * = (random() * 0.2 + 0.9 ) # 加入数据集 rows.append({ 'input' :(rating,age), 'result' :price}) return rows |
数据间的距离
使用k最近邻,首先要知道那些最近邻,也就要求知道数据间的距离。我们使用欧几里得距离作为数据间的距离。
1
2
3
4
5
6
|
# 使用欧几里得距离,定义距离 def euclidean(v1,v2): d = 0.0 for i in range ( len (v1)): d + = (v1[i] - v2[i]) * * 2 return math.sqrt(d) |
获取与新数据距离最近的k个样本数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# 计算给预测商品和原数据集中任一其他商品间的距离。data原数据集,vec1预测商品 def getdistances(data,vec1): distancelist = [] # 遍历数据集中的每一项 for i in range ( len (data)): vec2 = data[i][ 'input' ] # 添加距离到距离列表 distancelist.append((euclidean(vec1,vec2),i)) # 距离排序 distancelist.sort() return distancelist #返回距离列表 |
根据距离最近的k个样本数据预测新数据的属性
1、简单求均值
1
2
3
4
5
6
7
8
9
10
11
12
|
# 对距离值最小的前k个结果求平均 def knnestimate(data,vec1,k = 5 ): # 得到经过排序的距离值 dlist = getdistances(data,vec1) avg = 0.0 # 对前k项结果求平均 for i in range (k): idx = dlist[i][ 1 ] avg + = data[idx][ 'result' ] avg = avg / k return avg |
2、求加权平均
如果使用直接求均值,有可能存在前k个最近邻中,可能会存在距离很远的数据,但是他仍然属于最近的前K个数据。当存在这种情况时,对前k个样本数据直接求均值会有偏差,所以使用加权平均,为较远的节点赋予较小的权值,对较近的节点赋予较大的权值。
那么具体该怎么根据数据间距离分配权值呢?这里使用三种递减函数作为权值分配方法。
2.1、使用反函数为近邻分配权重。
1
2
|
def inverseweight(dist,num = 1.0 ,const = 0.1 ): return num / (dist + const) |
2.2、使用减法函数为近邻分配权重。当最近距离都大于const时不可用。
1
2
3
4
5
|
def subtractweight(dist,const = 1.0 ): if dist>const: return 0 else : return const - dist |
2.3、使用高斯函数为距离分配权重。
1
2
|
def gaussian(dist,sigma = 5.0 ): return math.e * * ( - dist * * 2 / ( 2 * sigma * * 2 )) |
有了权值分配方法,下面就可以计算加权平均了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# 对距离值最小的前k个结果求加权平均 def weightedknn(data,vec1,k = 5 ,weightf = gaussian): # 得到距离值 dlist = getdistances(data,vec1) avg = 0.0 totalweight = 0.0 # 得到加权平均 for i in range (k): dist = dlist[i][ 0 ] idx = dlist[i][ 1 ] weight = weightf(dist) avg + = weight * data[idx][ 'result' ] totalweight + = weight if totalweight = = 0 : return 0 avg = avg / totalweight return avg |
第一次测试
上面完成了使用k最近邻进行新数据预测的功能,下面我们进行测试。
1
2
3
4
5
6
7
8
9
10
11
|
if __name__ = = '__main__' : #只有在执行当前模块时才会运行此函数 data = wineset1() #创建第一批数据集 result = knnestimate(data,( 95.0 , 3.0 )) #根据最近邻直接求平均进行预测 print (result) result = weightedknn(data,( 95.0 , 3.0 ),weightf = inverseweight) #使用反函数做权值分配方法,进行加权平均 print (result) result = weightedknn(data, ( 95.0 , 3.0 ), weightf = subtractweight) # 使用减法函数做权值分配方法,进行加权平均 print (result) result = weightedknn(data, ( 95.0 , 3.0 ), weightf = gaussian) # 使用高斯函数做权值分配方法,进行加权平均 print (result) |
交叉验证
交叉验证是用来验证你的算法或算法参数的好坏,比如上面的加权分配算法我们有三种方式,究竟哪个更好呢?我们可以使用交叉验证进行查看。
随机选择样本数据集中95%作为训练集,5%作为新数据,对新数据进行预测并与已知结果进行比较,查看算法效果。
要实现交叉验证,要实现将样本数据集划分为训练集和新数据两个子集的功能。
1
2
3
4
5
6
7
8
9
10
|
# 划分数据。test测试数据集占的比例。其他数据集为训练数据 def dividedata(data,test = 0.05 ): trainset = [] testset = [] for row in data: if random()<test: testset.append(row) else : trainset.append(row) return trainset,testset |
还要能应用算法,计算预测结果与真实结果之间的误差度。
1
2
3
4
5
6
7
8
9
|
# 使用数据集对使用算法进行预测的结果的误差进行统计,一次判断算法好坏。algf为算法函数,trainset为训练数据集,testset为预测数据集 def testalgorithm(algf,trainset,testset): error = 0.0 for row in testset: guess = algf(trainset,row[ 'input' ]) #这一步要和样本数据的格式保持一致,列表内个元素为一个字典,每个字典包含input和result两个属性。而且函数参数是列表和元组 error + = (row[ 'result' ] - guess) * * 2 #print row['result'],guess #print error/len(testset) return error / len (testset) |
有了数据拆分和算法性能误差统计函数。我们就可以在原始数据集上进行多次这样的实验,统计平均误差。
1
2
3
4
5
6
7
|
# 将数据拆分和误差统计合并在一起。对数据集进行多次划分,并验证算法好坏 def crossvalidate(algf,data,trials = 100 ,test = 0.1 ): error = 0.0 for i in range (trials): trainset,testset = dividedata(data,test) error + = testalgorithm(algf,trainset,testset) return error / trials |
交叉验证测试
1
2
3
4
5
6
7
8
9
10
11
12
13
|
if __name__ = = '__main__' : #只有在执行当前模块时才会运行此函数 data = wineset1() #创建第一批数据集 print (data) error = crossvalidate(knnestimate,data) #对直接求均值算法进行评估 print ( '平均误差:' + str (error)) def knn3(d,v): return knnestimate(d,v,k = 3 ) #定义一个函数指针。参数为d列表,v元组 error = crossvalidate(knn3, data) #对直接求均值算法进行评估 print ( '平均误差:' + str (error)) def knninverse(d,v): return weightedknn(d,v,weightf = inverseweight) #定义一个函数指针。参数为d列表,v元组 error = crossvalidate(knninverse, data) #对使用反函数做权值分配方法进行评估 print ( '平均误差:' + str (error)) |
不同类型、值域的变量、无用变量
在样本数据的各个属性中可能并不是取值范围相同的同类型的数据,比如上面的酒的属性可能包含档次(0-100),酒的年限(0-50),酒的容量(三种容量375.0ml、750.0ml、1500.0ml),甚至在我们获取的样本数据中还有可能包含无用的数据,比如酒生产的流水线号(1-20之间的整数)。在计算样本距离时,取值范围大的属性的变化会严重影响取值范围小的属性的变化,以至于结果会忽略取值范围小的属性。而且无用属性的变化也会增加数据之间的距离。
所以就要对样本数据的属性进行缩放到合适的范围,并要能删除无效属性。
构造新的数据集
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# 构建新数据集,模拟不同类型变量的问题 def wineset2(): rows = [] for i in range ( 300 ): rating = random() * 50 + 50 #酒的档次 age = random() * 50 #酒的年限 aisle = float (randint( 1 , 20 )) #酒的通道号(无关属性) bottlesize = [ 375.0 , 750.0 , 1500.0 ][randint( 0 , 2 )] #酒的容量 price = wineprice(rating,age) #酒的价格 price * = (bottlesize / 750 ) price * = (random() * 0.2 + 0.9 ) rows.append({ 'input' :(rating,age,aisle,bottlesize), 'result' :price}) return rows |
实现按比例对属性的取值进行缩放的功能
1
2
3
4
5
6
7
|
# 按比例对属性进行缩放,scale为各属性的值的缩放比例。 def rescale(data,scale): scaleddata = [] for row in data: scaled = [scale[i] * row[ 'input' ][i] for i in range ( len (scale))] scaleddata.append({ 'input' :scaled, 'result' :row[ 'result' ]}) return scaleddata |
那就剩下最后最后一个问题,究竟各个属性缩放多少呢。这是一个优化问题,我们可以通过优化技术寻找最优化解。而需要优化的成本函数,就是通过缩放以后进行预测的结果与真实结果之间的误差值。误差值越小越好。误差值的计算同前面交叉验证时使用的相同crossvalidate函数
下面构建成本函数
1
2
3
4
5
6
7
8
|
# 生成成本函数。闭包 def createcostfunction(algf,data): def costf(scale): sdata = rescale(data,scale) return crossvalidate(algf,sdata,trials = 10 ) return costf weightdomain = [( 0 , 10 )] * 4 #将缩放比例这个题解的取值范围设置为0-10,可以自己设定,用于优化算法 |
优化技术的可以参看https://www.zzvips.com/article/127282.html
测试代码
1
2
3
4
5
6
7
8
|
if __name__ = = '__main__' : #只有在执行当前模块时才会运行此函数 #========缩放比例优化=== data = wineset2() # 创建第2批数据集 print (data) import optimization costf = createcostfunction(knnestimate,data) #创建成本函数 result = optimization.annealingoptimize(weightdomain,costf,step = 2 ) #使用退火算法寻找最优解 print (result) |
不对称分布
对于样本数据集包含多种分布情况时,输出结果我们也希望不唯一。我们可以使用概率结果进行表示,输出每种结果的值和出现的概率。
比如葡萄酒有可能是从折扣店购买的,而样本数据集中没有记录这一特性。所以样本数据中价格存在两种形式的分布。
构造数据集
1
2
3
4
5
6
7
|
def wineset3(): rows = wineset1() for row in rows: if random()< 0.5 : # 葡萄酒是从折扣店购买的 row[ 'result' ] * = 0.6 return rows |
如果以结果概率的形式存在,我们要能够计算指定范围的概率值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
# 计算概率。data样本数据集,vec1预测数据,low,high结果范围,weightf为根据距离进行权值分配的函数 def probguess(data,vec1,low,high,k = 5 ,weightf = gaussian): dlist = getdistances(data,vec1) #获取距离列表 nweight = 0.0 tweight = 0.0 for i in range (k): dist = dlist[i][ 0 ] #距离 idx = dlist[i][ 1 ] #索引号 weight = weightf(dist) #权值 v = data[idx][ 'result' ] #真实结果 # 当前数据点位于指定范围内么? if v> = low and v< = high: nweight + = weight #指定范围的权值之和 tweight + = weight #总的权值之和 if tweight = = 0 : return 0 # 概率等于位于指定范围内的权重值除以所有权重值 return nweight / tweight |
对于多种输出、以概率和值的形式表示的结果,我们可以使用累积概率分布图或概率密度图的形式表现。
绘制累积概率分布图
1
2
3
4
5
6
7
8
|
from pylab import * # 绘制累积概率分布图。data样本数据集,vec1预测数据,high取值最高点,k近邻范围,weightf权值分配 def cumulativegraph(data,vec1,high,k = 5 ,weightf = gaussian): t1 = arange( 0.0 ,high, 0.1 ) cprob = array([probguess(data,vec1, 0 ,v,k,weightf) for v in t1]) #预测产生的不同结果的概率 plot(t1,cprob) show() |
绘制概率密度图
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# 绘制概率密度图 def probabilitygraph(data,vec1,high,k = 5 ,weightf = gaussian,ss = 5.0 ): # 建立一个代表价格的值域范围 t1 = arange( 0.0 ,high, 0.1 ) # 得到整个值域范围内的所有概率 probs = [probguess(data,vec1,v,v + 0.1 ,k,weightf) for v in t1] # 通过加上近邻概率的高斯计算结果,对概率值做平滑处理 smoothed = [] for i in range ( len (probs)): sv = 0.0 for j in range ( 0 , len (probs)): dist = abs (i - j) * 0.1 weight = gaussian(dist,sigma = ss) sv + = weight * probs[j] smoothed.append(sv) smoothed = array(smoothed) plot(t1,smoothed) show() |
测试代码
1
2
3
4
5
6
|
if __name__ = = '__main__' : #只有在执行当前模块时才会运行此函数 data = wineset3() # 创建第3批数据集 print (data) cumulativegraph(data,( 1 , 1 ), 120 ) #绘制累积概率密度 probabilitygraph(data,( 1 , 1 ), 6 ) #绘制概率密度图 |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://blog.csdn.net/luanpeng825485697/article/details/78796773