本文实例为大家分享了python3实现基于用户协同过滤的具体代码,供大家参考,具体内容如下
废话不多说,直接看代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
|
#!/usr/bin/python3 # -*- coding: utf-8 -*- #20170916号协同过滤电影推荐基稿 #字典等格式数据处理及直接写入文件 ##from numpy import * import time from math import sqrt ##from texttable import Texttable class CF: def __init__( self , movies, ratings, k = 5 , n = 20 ): self .movies = movies #[MovieID,Title,Genres] ( self .train_data, self .test_data) = (ratings[ 0 ], ratings[ 1 ]) #[UserID::MovieID::Rating::Timestamp] # 邻居个数 self .k = k # 推荐个数 self .n = n # 用户对电影的评分 # 数据格式{'UserID用户ID':[(MovieID电影ID,Rating用户对电影的评星)]} self .userDict = {} # 对某电影评分的用户 # 数据格式:{'MovieID电影ID':[UserID,用户ID]} # {'1',[1,2,3..],...} self .ItemUser = {} # 邻居的信息 self .neighbors = [] # 推荐列表 self .recommandList = [] #包含dist和电影id self .recommand = [] #训练集合测试集的交集,且仅有电影id #用户评过电影信息 self .train_user = [] self .test_user = [] #给用户的推荐列表,仅含movieid self .train_rec = [] self .test_rec = [] #test中的电影评分预测数据集合, self .forecast = {} #前k个近邻的评分集合 self .score = {} #最终加权平均后的评分集合{“电影id”:预测评分} #召回率和准确率 self .pre = [ 0.0 , 0.0 ] self .z = [ 0.0 , 0.0 ] ''''' userDict数据格式: '3': [('3421', 0.8), ('1641', 0.4), ('648', 0.6), ('1394', 0.8), ('3534', 0.6), ('104', 0.8), ('2735', 0.8), ('1210', 0.8), ('1431', 0.6), ('3868', 0.6), ('1079', 1.0), ('2997', 0.6), ('1615', 1.0), ('1291', 0.8), ('1259', 1.0), ('653', 0.8), ('2167', 1.0), ('1580', 0.6), ('3619', 0.4), ('260', 1.0), ('2858', 0.8), ('3114', 0.6), ('1049', 0.8), ('1261', 0.2), ('552', 0.8), ('480', 0.8), ('1265', 0.4), ('1266', 1.0), ('733', 1.0), ('1196', 0.8), ('590', 0.8), ('2355', 1.0), ('1197', 1.0), ('1198', 1.0), ('1378', 1.0), ('593', 0.6), ('1379', 0.8), ('3552', 1.0), ('1304', 1.0), ('1270', 0.6), ('2470', 0.8), ('3168', 0.8), ('2617', 0.4), ('1961', 0.8), ('3671', 1.0), ('2006', 0.8), ('2871', 0.8), ('2115', 0.8), ('1968', 0.8), ('1136', 1.0), ('2081', 0.8)]} ItemUser数据格式: {'42': ['8'], '2746': ['10'], '2797': ['1'], '2987': ['5'], '1653': ['5', '8', '9'], '194': ['5'], '3500': ['8', '10'], '3753': ['6', '7'], '1610': ['2', '5', '7'], '1022': ['1', '10'], '1244': ['2'], '25': ['8', '9'] ''' # 将ratings转换为userDict和ItemUser def formatRate( self ,train_or_test): self .userDict = {} self .ItemUser = {} for i in train_or_test: #[UserID,MovieID,Rating,Timestamp] # 评分最高为5 除以5 进行数据归一化 ## temp = (i[1], float(i[2]) / 5) temp = (i[ 1 ], float (i[ 2 ])) ## temp = (i[1], i[2]) # 计算userDict {'用户id':[(电影id,评分),(2,5)...],'2':[...]...}一个观众对每一部电影的评分集合 if (i[ 0 ] in self .userDict): self .userDict[i[ 0 ]].append(temp) else : self .userDict[i[ 0 ]] = [temp] # 计算ItemUser {'电影id',[用户id..],...}同一部电影的观众集合 if (i[ 1 ] in self .ItemUser): self .ItemUser[i[ 1 ]].append(i[ 0 ]) else : self .ItemUser[i[ 1 ]] = [i[ 0 ]] # 格式化userDict数据 def formatuserDict( self , userId, p): #userID为待查询目标,p为近邻对象 user = {} #user数据格式为:电影id:[userID的评分,近邻用户的评分] for i in self .userDict[userId]: #i为userDict数据中的每个括号同81行 user[i[ 0 ]] = [i[ 1 ], 0 ] for j in self .userDict[p]: if (j[ 0 ] not in user): user[j[ 0 ]] = [ 0 , j[ 1 ]] #说明目标用户和近邻用户没有同时对一部电影评分 else : user[j[ 0 ]][ 1 ] = j[ 1 ] #说明两者对同一部电影都有评分 return user # 计算余弦距离 def getCost( self , userId, p): # 获取用户userId和p评分电影的并集 # {'电影ID':[userId的评分,p的评分]} 没有评分为0 user = self .formatuserDict(userId, p) x = 0.0 y = 0.0 z = 0.0 for k, v in user.items(): #k是键,v是值 x + = float (v[ 0 ]) * float (v[ 0 ]) y + = float (v[ 1 ]) * float (v[ 1 ]) z + = float (v[ 0 ]) * float (v[ 1 ]) if (z = = 0.0 ): return 0 return z / sqrt(x * y) #计算皮尔逊相似度 ## def getCost(self, userId, p): ## # 获取用户userId和l评分电影的并集 ## # {'电影ID':[userId的评分,l的评分]} 没有评分为0 ## user = self.formatuserDict(userId, p) ## sumxsq = 0.0 ## sumysq = 0.0 ## sumxy = 0.0 ## sumx = 0.0 ## sumy = 0.0 ## n = len(user) ## for k, v in user.items(): ## sumx +=float(v[0]) ## sumy +=float(v[1]) ## sumxsq += float(v[0]) * float(v[0]) ## sumysq += float(v[1]) * float(v[1]) ## sumxy += float(v[0]) * float(v[1]) ## up = sumxy -sumx*sumy/n ## down = sqrt((sumxsq - pow(sumxsq,2)/n)*(sumysq - pow(sumysq,2)/n)) ## if(down == 0.0): ## return 0 ## return up/down # 找到某用户的相邻用户 def getNearestNeighbor( self , userId): neighbors = [] self .neighbors = [] # 获取userId评分的电影都有那些用户也评过分 for i in self .userDict[userId]: #i为userDict数据中的每个括号同95行#user数据格式为:电影id:[userID的评分,近邻用户的评分] for j in self .ItemUser[i[ 0 ]]: #i[0]为电影编号,j为看同一部电影的每位用户 if (j ! = userId and j not in neighbors): neighbors.append(j) # 计算这些用户与userId的相似度并排序 for i in neighbors: #i为用户id dist = self .getCost(userId, i) self .neighbors.append([dist, i]) # 排序默认是升序,reverse=True表示降序 self .neighbors.sort(reverse = True ) self .neighbors = self .neighbors[: self .k] #切片操作,取前k个 ## print('neighbors',len(neighbors)) # 获取推荐列表 def getrecommandList( self , userId): self .recommandList = [] # 建立推荐字典 recommandDict = {} for neighbor in self .neighbors: #这里的neighbor数据格式为[[dist,用户id],[],....] movies = self .userDict[neighbor[ 1 ]] #movies数据格式为[(电影id,评分),(),。。。。] for movie in movies: if (movie[ 0 ] in recommandDict): recommandDict[movie[ 0 ]] + = neighbor[ 0 ] ####???? else : recommandDict[movie[ 0 ]] = neighbor[ 0 ] # 建立推荐列表 for key in recommandDict: #recommandDict数据格式{电影id:累计dist,。。。} self .recommandList.append([recommandDict[key], key]) #recommandList数据格式【【累计dist,电影id】,【】,。。。。】 self .recommandList.sort(reverse = True ) ## print(len(self.recommandList)) self .recommandList = self .recommandList[: self .n] ## print(len(self.recommandList)) # 推荐的准确率 def getPrecision( self , userId): ## print("开始!!!") #先运算test_data,这样最终self.neighbors等保留的是后来计算train_data后的数据(不交换位置的话就得在gR函数中增加参数保留各自的neighbor) ( self .test_user, self .test_rec) = self .getRecommand( self .test_data,userId) #测试集的用户userId所评价的电影和给该用户推荐的电影列表 ( self .train_user, self .train_rec) = self .getRecommand( self .train_data,userId) #训练集的用户userId所评价的所有电影集合(self.train_user)和给该用户推荐的电影列表(self.train_rec) #西安电大的张海朋:基于协同过滤的电影推荐系统的构建(2015)中的准确率召回率计算 for i in self .test_rec: if i in self .train_rec: self .recommand.append(i) self .pre[ 0 ] = len ( self .recommand) / len ( self .train_rec) self .z[ 0 ] = len ( self .recommand) / len ( self .test_rec) #北京交大黄宇:基于协同过滤的推荐系统设计与实现(2015)中的准、召计算 self .recommand = [] #这里没有归零的话,下面计算初始recommand不为空 for i in self .train_rec: if i in self .test_user: self .recommand.append(i) self .pre[ 1 ] = len ( self .recommand) / len ( self .train_rec) self .z[ 1 ] = len ( self .recommand) / len ( self .test_user) ## print(self.train_rec,self.test_rec,"20",len(self.train_rec),len(self.train_rec)) #对同一用户分别通过训练集和测试集处理 def getRecommand( self ,train_or_test,userId): self .formatRate(train_or_test) self .getNearestNeighbor(userId) self .getrecommandList(userId) user = [i[ 0 ] for i in self .userDict[userId]] #用户userId评分的所有电影集合 recommand = [i[ 1 ] for i in self .recommandList] #推荐列表仅有电影id的集合,区别于recommandList(还含有dist) ## print("userid该用户已通过训练集测试集处理") return (user,recommand) #对test的电影进行评分预测 def foreCast( self ): self .forecast = {} #?????前面变量统一定义初始化后,函数内部是否需要该初始化???? same_movie_id = [] neighbors_id = [i[ 1 ] for i in self .neighbors] #近邻用户数据仅含用户id的集合 for i in self .test_user: #i为电影id,即在test里的i有被推荐到 if i in self .train_rec: same_movie_id.append(i) for j in self .ItemUser[i]: #j为用户id,即寻找近邻用户的评分和相似度 if j in neighbors_id: user = [i[ 0 ] for i in self .userDict[j]] #self.userDict[userId]数据格式:数据格式为[(电影id,评分),(),。。。。];这里的userid应为近邻用户p a = self .neighbors[neighbors_id.index(j)] #找到该近邻用户的数据【dist,用户id】 b = self .userDict[j][user.index(i)] #找到该近邻用户的数据【电影id,用户id】 c = [a[ 0 ], b[ 1 ], a[ 1 ]] if (i in self .forecast): self .forecast[i].append(c) else : self .forecast[i] = [c] #数据格式:字典{“电影id”:【dist,评分,用户id】【】}{'589': [[0.22655856915174025, 0.6, '419'], [0.36264561173211646, 1.0, '1349']。。。} ## print(same_movie_id) #每个近邻用户的评分加权平均计算得预测评分 self .score = {} if same_movie_id : #在test里的电影是否有在推荐列表里,如果为空不做判断,下面的处理会报错 for movieid in same_movie_id: total_d = 0 total_down = 0 for d in self .forecast[movieid]: #此时的d已经是最里层的列表了【】;self.forecast[movieid]的数据格式[[]] total_d + = d[ 0 ] * d[ 1 ] total_down + = d[ 0 ] self .score[movieid] = [ round (total_d / total_down, 3 )] #加权平均后取3位小数的精度 #在test里但是推荐没有的电影id,这里先按零计算 for i in self .test_user: if i not in movieid: self .score[i] = [ 0 ] else : for i in self .test_user: self .score[i] = [ 0 ] ## return self.score #计算平均绝对误差MAE def cal_Mae( self ,userId): self .formatRate( self .test_data) ## print(self.userDict) for item in self .userDict[userId]: if item[ 0 ] in self .score: self .score[item[ 0 ]].append(item[ 1 ]) #self.score数据格式[[预测分,实际分]] ## #过渡代码 ## for i in self.score: ## pass return self .score # 基于用户的推荐 # 根据对电影的评分计算用户之间的相似度 ## def recommendByUser(self, userId): ## print("亲,请稍等片刻,系统正在快马加鞭为你运作中") #人机交互辅助解读, ## self.getPrecision(self,userId) # 获取数据 def readFile(filename): files = open (filename, "r" , encoding = "utf-8" ) data = [] for line in files.readlines(): item = line.strip().split( "::" ) data.append(item) return data files.close() def load_dict_from_file(filepath): _dict = {} try : with open (filepath, 'r' ,encoding = "utf -8" ) as dict_file: for line in dict_file.readlines(): (key, value) = line.strip().split( ':' ) _dict[key] = value except IOError as ioerr: print ( "文件 %s 不存在" % (filepath)) return _dict def save_dict_to_file(_dict, filepath): try : with open (filepath, 'w' ,encoding = "utf - 8" ) as dict_file: for (key,value) in _dict.items(): dict_file.write( '%s:%s\n' % (key, value)) except IOError as ioerr: print ( "文件 %s 无法创建" % (filepath)) def writeFile(data,filename): with open (filename, 'w' , encoding = "utf-8" )as f: f.write(data) # -------------------------开始------------------------------- def start3(): start1 = time.clock() movies = readFile( "D:/d/movies.dat" ) ratings = [readFile( "D:/d/201709train.txt" ),readFile( "D:/d/201709test.txt" )] demo = CF(movies, ratings, k = 20 ) userId = '1000' demo.getPrecision(userId) ## print(demo.foreCast()) demo.foreCast() print (demo.cal_Mae(userId)) ## demo.recommendByUser(ID) #上一句只能实现固定用户查询,这句可以实现“想查哪个查哪个”,后期可以加个循环,挨个查,查到你不想查 print ( "处理的数据为%d条" % ( len (ratings[ 0 ]) + len (ratings[ 1 ]))) ## print("____---",len(ratings[0]),len(ratings[1])) ## print("准确率: %.2f %%" % (demo.pre * 100)) ## print("召回率: %.2f %%" % (demo.z * 100)) print (demo.pre) print (demo.z) end1 = time.clock() print ( "耗费时间: %f s" % (end1 - start1)) def start1(): start1 = time.clock() movies = readFile( "D:/d/movies.dat" ) ratings = [readFile( "D:/d/201709train.txt" ),readFile( "D:/d/201709test.txt" )] demo = CF(movies, ratings, k = 20 ) demo.formatRate(ratings[ 0 ]) writeFile( str (demo.userDict), "D:/d/dd/userDict.txt" ) writeFile( str (demo.ItemUser), "D:/d/dd/ItemUser.txt" ) ## save_dict_to_file(demo.userDict,"D:/d/dd/userDict.txt") ## save_dict_to_file(demo.ItemUser,"D:/d/dd/ItemUser.txt") print ( "处理结束" ) ## with open("D:/d/dd/userDict.txt",'r',encoding = 'utf-8') as f: ## diction = f.read() ## i = 0 ## for j in eval(diction): ## print(j) ## i += 1 ## if i == 4: ## break def start2(): start1 = time.clock() movies = readFile( "D:/d/movies.dat" ) ratings = [readFile( "D:/d/201709train.txt" ),readFile( "D:/d/201709test.txt" )] demo = CF(movies, ratings, k = 20 ) demo.formatRate_toMovie(ratings[ 0 ]) writeFile( str (demo.movieDict), "D:/d/dd/movieDict.txt" ) ## writeFile(str(demo.userDict),"D:/d/dd/userDict.txt") ## writeFile(str(demo.ItemUser), "D:/d/dd/ItemUser.txt") ## save_dict_to_file(demo.userDict,"D:/d/dd/userDict.txt") ## save_dict_to_file(demo.ItemUser,"D:/d/dd/ItemUser.txt") print ( "处理结束" ) if __name__ = = '__main__' : start1() |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/qqzhuimengren/article/details/78345891