python3 implements user based collaborative filtering

  • 2020-10-31 21:51:52
  • OfStack

The example of this paper shares the specific code of python3 implementation based on user collaborative filtering for your reference. The specific content is as follows

Without further ado, just look at the code.


#!/usr/bin/python3 
# -*- coding: utf-8 -*- 
#20170916 Collaborative filtering of film recommendation basis  
# Dictionary and other format data processing and writing directly to the file  
 
 
##from numpy import * 
import time 
from math import sqrt 
##from texttable import Texttable 
 
 
class CF: 
 
 def __init__(self, movies, ratings, k=5, n=20): 
  self.movies = movies#[MovieID,Title,Genres] 
  (self.train_data,self.test_data) = (ratings[0], ratings[1])#[UserID::MovieID::Rating::Timestamp] 
  #  Number of neighbors  
  self.k = k 
  #  The number of recommended  
  self.n = n 
  #  User ratings of movies  
  #  The data format {'UserID The user ID':[(MovieID The movie ID,Rating User ratings of movies )]} 
  self.userDict = {} 
  #  The user who rates a movie  
  #  Data Format: {'MovieID The movie ID':[UserID, The user ID]} 
  # {'1',[1,2,3..],...} 
  self.ItemUser = {} 
  #  Neighborhood information  
  self.neighbors = [] 
  #  Recommended list  
  self.recommandList = []# contains dist And the movie id 
  self.recommand = [] # The training set tests the set of intersection, and only movies id 
  # Users have commented on the movie information  
  self.train_user = [] 
  self.test_user = [] 
  # List of recommendations for users, including only movieid 
  self.train_rec =[] 
  self.test_rec = [] 
  #test Set of movie score prediction data , 
  self.forecast = {}# before k The score set of the nearest neighbors  
  self.score = {}# The final weighted average score set { "The movie id ": Prediction score } 
  # Recall rate and accuracy  
  self.pre = [0.0,0.0] 
  self.z = [0.0, 0.0] 
 ''''' 
 userDict Data Format:  
 '3': [('3421', 0.8), ('1641', 0.4), ('648', 0.6), ('1394', 0.8), ('3534', 0.6), ('104', 0.8), 
 ('2735', 0.8), ('1210', 0.8), ('1431', 0.6), ('3868', 0.6), ('1079', 1.0), ('2997', 0.6), 
 ('1615', 1.0), ('1291', 0.8), ('1259', 1.0), ('653', 0.8), ('2167', 1.0), ('1580', 0.6), 
 ('3619', 0.4), ('260', 1.0), ('2858', 0.8), ('3114', 0.6), ('1049', 0.8), ('1261', 0.2), 
 ('552', 0.8), ('480', 0.8), ('1265', 0.4), ('1266', 1.0), ('733', 1.0), ('1196', 0.8), 
 ('590', 0.8), ('2355', 1.0), ('1197', 1.0), ('1198', 1.0), ('1378', 1.0), ('593', 0.6), 
 ('1379', 0.8), ('3552', 1.0), ('1304', 1.0), ('1270', 0.6), ('2470', 0.8), ('3168', 0.8), 
 ('2617', 0.4), ('1961', 0.8), ('3671', 1.0), ('2006', 0.8), ('2871', 0.8), ('2115', 0.8), 
 ('1968', 0.8), ('1136', 1.0), ('2081', 0.8)]} 
 ItemUser Data Format:  
 {'42': ['8'], '2746': ['10'], '2797': ['1'], '2987': ['5'], '1653': ['5', '8', '9'], 
 '194': ['5'], '3500': ['8', '10'], '3753': ['6', '7'], '1610': ['2', '5', '7'], 
 '1022': ['1', '10'], '1244': ['2'], '25': ['8', '9'] 
 ''' 
  
#  will ratings convert userDict and ItemUser 
 def formatRate(self,train_or_test): 
  self.userDict = {} 
  self.ItemUser = {} 
  for i in train_or_test:#[UserID,MovieID,Rating,Timestamp] 
   #  The highest score is 5  Divided by the 5  Perform data retrieval 1 the  
##   temp = (i[1], float(i[2]) / 5) 
   temp = (i[1], float(i[2])) 
##   temp = (i[1], i[2]) 
   #  To calculate userDict {' The user id':[( The movie id, score ),(2,5)...],'2':[...]...}1 An audience to each 1 A collection of movie scores  
   if(i[0] in self.userDict): 
    self.userDict[i[0]].append(temp) 
   else: 
    self.userDict[i[0]] = [temp] 
   #  To calculate ItemUser {' The movie id',[ The user id..],...} with 1 The audience for a movie  
   if(i[1] in self.ItemUser): 
    self.ItemUser[i[1]].append(i[0]) 
   else: 
    self.ItemUser[i[1]] = [i[0]]   
 
 #  formatting userDict data  
 def formatuserDict(self, userId, p):#userID Is the target to be queried, p Nearest neighbor object  
  user = {} 
  #user The data format is: movie id : [userID The score of the nearest neighbor user ] 
  for i in self.userDict[userId]:#i for userDict Each parenthesis in the data is the same 81 line  
   user[i[0]] = [i[1], 0] 
  for j in self.userDict[p]: 
   if(j[0] not in user): 
    user[j[0]] = [0, j[1]]# Indicates that the target user and the neighbor user are not paired simultaneously 1 Movie score  
   else: 
    user[j[0]][1] = j[1]# Show that the two are identical 1 Every movie gets a score  
  return user 
  
   
 
 #  Calculate the cosine distance  
 def getCost(self, userId, p): 
  #  Get the user userId and p Grade the union of movies  
  # {' The movie ID' : [userId The score, p The score ]}  Not graded as 0 
  user = self.formatuserDict(userId, p) 
  x = 0.0 
  y = 0.0 
  z = 0.0 
  for k, v in user.items():#k Is the key, v Is the value  
   x += float(v[0]) * float(v[0]) 
   y += float(v[1]) * float(v[1]) 
   z += float(v[0]) * float(v[1]) 
  if(z == 0.0): 
   return 0 
  return z / sqrt(x * y) 
 # Calculate Pearson similarity  
##  def getCost(self, userId, p): 
##   #  Get the user userId and l Grade the union of movies  
##   # {' The movie ID' : [userId The score, l The score ]}  Not graded as 0 
##   user = self.formatuserDict(userId, p) 
##   sumxsq = 0.0 
##   sumysq = 0.0 
##   sumxy = 0.0 
##   sumx = 0.0 
##   sumy = 0.0 
##   n = len(user) 
##   for k, v in user.items(): 
##    sumx +=float(v[0]) 
##    sumy +=float(v[1]) 
##    sumxsq += float(v[0]) * float(v[0]) 
##    sumysq += float(v[1]) * float(v[1]) 
##    sumxy += float(v[0]) * float(v[1]) 
##   up = sumxy -sumx*sumy/n 
##   down = sqrt((sumxsq - pow(sumxsq,2)/n)*(sumysq - pow(sumysq,2)/n)) 
##   if(down == 0.0): 
##    return 0 
##   return up/down 
 
#  Locate an adjacent user of a user  
 def getNearestNeighbor(self, userId): 
  neighbors = [] 
  self.neighbors = [] 
  #  To obtain userId Every movie that is rated has those users overrating it  
  for i in self.userDict[userId]:#i for userDict Each parenthesis in the data is the same 95 line #user The data format is: movie id : [userID The score of the nearest neighbor user ] 
   for j in self.ItemUser[i[0]]:#i[0] Is the movie number, j To see with 1 Every user of a movie  
    if(j != userId and j not in neighbors): 
     neighbors.append(j) 
  #  Calculate the relationship between these users and userId And sort  
  for i in neighbors:#i For the user id 
   dist = self.getCost(userId, i) 
   self.neighbors.append([dist, i]) 
  #  Sort is ascending by default, reverse=True According to descending order  
  self.neighbors.sort(reverse=True) 
  self.neighbors = self.neighbors[:self.k]# Slice operation, before taking k a  
##  print('neighbors',len(neighbors)) 
 
  #  Get the recommendation list  
 def getrecommandList(self, userId): 
  self.recommandList = [] 
  #  Build a recommendation dictionary  
  recommandDict = {} 
  for neighbor in self.neighbors:# Here, neighbor Data format is [[dist , the user id],[],....] 
   movies = self.userDict[neighbor[1]]#movies Data format is [( The movie id To score ),() . ] 
   for movie in movies: 
    if(movie[0] in recommandDict): 
     recommandDict[movie[0]] += neighbor[0]#### ????  
    else: 
     recommandDict[movie[0]] = neighbor[0] 
 
  #  Create a recommendation list  
  for key in recommandDict:#recommandDict The data format { The movie id : the cumulative dist . } 
   self.recommandList.append([recommandDict[key], key])#recommandList Data format [[cumulative dist , the film id [/ b] [/ b] [/ b]  】   
  self.recommandList.sort(reverse=True) 
##  print(len(self.recommandList)) 
  self.recommandList = self.recommandList[:self.n] 
##  print(len(self.recommandList)) 
 #  Accuracy of recommendation  
 def getPrecision(self, userId): 
##  print(" Go!! ") 
# First operation test_data , so that eventually self.neighbors And so on are reserved for later calculation train_data After the data ( If you don't switch places, you have to be here gR Add parameters to the function and keep their own neighbor) 
  (self.test_user,self.test_rec) = self.getRecommand(self.test_data,userId)# The user of the test set userId Rated movies and a list of movies recommended to the user  
  (self.train_user,self.train_rec) = self.getRecommand(self.train_data,userId)# The user of the training set userId Set of all movies evaluated (self.train_user) And a list of movies recommended to the user (self.train_rec) 
# Zhang Haipeng of Xi 'an TV University: Construction of Movie Recommendation System Based on Collaborative Filtering ( 2015 ), the accuracy of recall rate calculation  
  for i in self.test_rec: 
   if i in self.train_rec: 
    self.recommand.append(i) 
  self.pre[0] = len(self.recommand)/len(self.train_rec) 
  self.z[0] = len(self.recommand)/len(self.test_rec) 
  # Huang Yu, Beijing Jiaotong University: Design and Implementation of Recommendation System Based on Collaborative Filtering ( 2015 ), call calculation  
  self.recommand = []# If there's no return to zero, let's calculate the initial recommand Don't empty  
  for i in self.train_rec: 
   if i in self.test_user: 
    self.recommand.append(i) 
  self.pre[1] = len(self.recommand)/len(self.train_rec) 
  self.z[1] = len(self.recommand)/len(self.test_user) 
##  print(self.train_rec,self.test_rec,"20",len(self.train_rec),len(self.train_rec)) 
  # The same 1 Users are processed through the training set and the test set respectively  
 def getRecommand(self,train_or_test,userId): 
  self.formatRate(train_or_test) 
  self.getNearestNeighbor(userId) 
  self.getrecommandList(userId) 
  user = [i[0] for i in self.userDict[userId]]# The user userId Score all movie sets  
  recommand = [i[1] for i in self.recommandList]# The recommended list is only movies id Student: the set of recommandList (also contains dist )  
##  print("userid The user has been processed by the training set test set ") 
  return (user,recommand) 
 # right test The ratings of the films were predicted  
 def foreCast(self): 
  self.forecast = {}# ?? Preceding system of variables 1 After defining an initialization, is it required inside the function??  
  same_movie_id = [] 
  neighbors_id = [i[1] for i in self.neighbors] # The nearest neighbor user data contains only users id A collection of   
     
  for i in self.test_user:#i For the film id In which the test In the i Have been recommended to  
   if i in self.train_rec: 
    same_movie_id.append(i) 
    for j in self.ItemUser[i]:#j For the user id, The ratings and similarity of the nearest neighbor users  
     if j in neighbors_id: 
      user = [i[0] for i in self.userDict[j]]#self.userDict[userId] Data format: Data format is [( The movie id To score ),() . ] ; Here, userid Should be a neighbor user p 
      a = self.neighbors[neighbors_id.index(j)]# Find the data of the nearest neighbor user [ dist , the user id 】  
      b = self.userDict[j][user.index(i)]# Find the nearest neighbor user's data [movie] id , the user id 】  
      c = [a[0], b[1], a[1]] 
      if (i in self.forecast): 
       self.forecast[i].append(c) 
      else: 
       self.forecast[i] = [c]# Data format: Dictionary { "The movie id ":" dist , ratings, users id  】  【  】  }{'589': [[0.22655856915174025, 0.6, '419'], [0.36264561173211646, 1.0, '1349'] . } 
##  print(same_movie_id) 
  # A weighted average of the scores of each neighbor was calculated to predict the score  
  self.score = {} 
  if same_movie_id :# in test If the movie is in the recommended list, if it is empty, the following processing will report an error  
   for movieid in same_movie_id: 
    total_d = 0 
    total_down = 0 
    for d in self.forecast[movieid]:# At this time d It is already the innermost list []; self.forecast[movieid] Data format [[]] 
     total_d += d[0]*d[1] 
     total_down += d[0] 
    self.score[movieid] = [round(total_d/total_down,3)]# Take the weighted average 3 Precision of decimal places  
   # in test But I recommend movies that are not available id, I'm going to do zero here  
   for i in self.test_user: 
    if i not in movieid: 
     self.score[i] = [0] 
  else: 
   for i in self.test_user: 
    self.score[i] = [0] 
##  return self.score 
 # Calculate the mean absolute error MAE 
 def cal_Mae(self,userId): 
  self.formatRate(self.test_data) 
##  print(self.userDict) 
  for item in self.userDict[userId]: 
   if item[0] in self.score: 
    self.score[item[0]].append(item[1])#self.score The data format [[ Prediction score, actual score ]] 
##  # The transition of code  
##  for i in self.score: 
##   pass 
  return self.score 
    #  User-based recommendations  
 #  The similarity between users is calculated based on the rating of the movie  
## def recommendByUser(self, userId): 
##  print(" Dear, please wait a moment, the system is working for you ")   # Human-computer interaction assisted interpretation,  
##  self.getPrecision(self,userId) 
 
 
#  To get the data  
def readFile(filename): 
 files = open(filename, "r", encoding = "utf-8") 
 data = [] 
 for line in files.readlines(): 
  item = line.strip().split("::") 
  data.append(item) 
 return data 
 files.close() 
def load_dict_from_file(filepath): 
 _dict = {} 
 try: 
  with open(filepath, 'r',encoding = "utf -8") as dict_file: 
   for line in dict_file.readlines(): 
    (key, value) = line.strip().split(':') 
    _dict[key] = value 
 except IOError as ioerr: 
  print (" file  %s  There is no " % (filepath)) 
 return _dict 
def save_dict_to_file(_dict, filepath): 
 try: 
  with open(filepath, 'w',encoding = "utf - 8") as dict_file: 
   for (key,value) in _dict.items(): 
    dict_file.write('%s:%s\n' % (key, value)) 
 
 except IOError as ioerr: 
  print (" file  %s  Unable to create the " % (filepath)) 
def writeFile(data,filename): 
 with open(filename, 'w', encoding = "utf-8")as f: 
  f.write(data) 
 
 
# ------------------------- start ------------------------------- 
 
def start3(): 
 start1 = time.clock() 
 movies = readFile("D:/d/movies.dat") 
 ratings = [readFile("D:/d/201709train.txt"),readFile("D:/d/201709test.txt")] 
 demo = CF(movies, ratings, k=20) 
 userId = '1000' 
 demo.getPrecision(userId) 
## print(demo.foreCast()) 
 demo.foreCast() 
 print(demo.cal_Mae(userId)) 
## demo.recommendByUser(ID)  # on 1 Sentence can only achieve fixed user query, this sentence can achieve "want to check which check which", later can add a loop, one by one check, check to you do not want to check  
 print(" The data processed is %d article " % (len(ratings[0])+len(ratings[1]))) 
## print("____---",len(ratings[0]),len(ratings[1])) 
## print(" Accuracy:  %.2f %%" % (demo.pre * 100)) 
## print(" The recall rate:  %.2f %%" % (demo.z * 100)) 
 print(demo.pre) 
 print(demo.z) 
 end1 = time.clock() 
 print(" Time consuming:  %f s" % (end1 - start1)) 
def start1(): 
 start1 = time.clock() 
 movies = readFile("D:/d/movies.dat") 
 ratings = [readFile("D:/d/201709train.txt"),readFile("D:/d/201709test.txt")] 
 demo = CF(movies, ratings, k = 20) 
 demo.formatRate(ratings[0]) 
 writeFile(str(demo.userDict),"D:/d/dd/userDict.txt") 
 writeFile(str(demo.ItemUser), "D:/d/dd/ItemUser.txt") 
## save_dict_to_file(demo.userDict,"D:/d/dd/userDict.txt") 
## save_dict_to_file(demo.ItemUser,"D:/d/dd/ItemUser.txt") 
 print(" To deal with end ") 
## with open("D:/d/dd/userDict.txt",'r',encoding = 'utf-8') as f: 
##  diction = f.read() 
##  i = 0 
##  for j in eval(diction): 
##   print(j) 
##   i += 1 
##   if i == 4: 
##    break 
def start2(): 
 start1 = time.clock() 
 movies = readFile("D:/d/movies.dat") 
 ratings = [readFile("D:/d/201709train.txt"),readFile("D:/d/201709test.txt")] 
 demo = CF(movies, ratings, k = 20) 
 demo.formatRate_toMovie(ratings[0]) 
 writeFile(str(demo.movieDict),"D:/d/dd/movieDict.txt") 
## writeFile(str(demo.userDict),"D:/d/dd/userDict.txt") 
## writeFile(str(demo.ItemUser), "D:/d/dd/ItemUser.txt") 
## save_dict_to_file(demo.userDict,"D:/d/dd/userDict.txt") 
## save_dict_to_file(demo.ItemUser,"D:/d/dd/ItemUser.txt") 
 print(" To deal with end ")  
 
if __name__ == '__main__': 
 start1() 

Related articles: