Collaborative Filtering using Apache Spark

A collaborative filtering process could be implemented using Apache Spark. In simplest terms,  collaborative filtering is a technique to recommend similar entities by studying the characteristics of each of the entities in the collection set. Example, recommendations on similar cities, similar movies etc.

In this blog post, I will present a very simplified Apache Spark and Python program to demonstrate a use case.

Sample data set

Below is a small sample from the dataset, which is in a file named city.data

200  222  5    876042340

210  40   3    891035994

224  29   3    888104457

200 114    3    878977899

The first column is the user id of individuals who rated the city, the second column is the city id and the third column the rating.

The city.name file could contain a list of city ids and corresponding city names.

Example

456 Chicago

///////////////////////////////////////////////////////////////////////////////////////////////////////

The Apache Spark Python Program

 

/////////////////////////////////////////////////////////////////////////////////////////////////////////

# *****************************************

import sys

from pyspark import SparkConf, SparkContext

from math import sqrt

# ******************************************

def dicCityName():

cityNames = {}

with open(“city.name”) as f:

for line in f:

fields = line.split(‘|’)

cityNames[int(fields[0])] = fields[1].decode(‘ascii’, ‘ignore’)

return cityNames

# *****************************************************

def makePairs((user, ratings)):

(city1, rating1) = ratings[0]

(city2, rating2) = ratings[1]

return ((city1, city2), (rating1, rating2))

# *******************************************************

def filterDuplicates( (userID, ratings) ):

(city1, rating1) = ratings[0]

(city2, rating2) = ratings[1]

return city1 < city2

# ********************************************************

def computeCosineSimilarity(ratingPairs):

numPairs = 0

sum_mm = sum_nn = sum_mn = 0

for ratingM, ratingN in ratingPairs:

sum_mm += ratingM * ratingM

sum_nn += ratingN * ratingN

sum_mn += ratingM * ratingN

numPairs += 1

numerator = sum_mn

denominator = sqrt(sum_mm) * sqrt(sum_nn)

score = 0

if (denominator):

score = (numerator / (float(denominator)))

return (score, numPairs)

# ***************************************************

# Main Program starts here

conf = SparkConf().setMaster(“local[*]”).setAppName(“SimilarCities”)

sc = SparkContext(conf = conf)

# *****************************************************

# We first create a dictionary containing city id s and corresponding city names

print “\nLoading city names…”

nameDict = dicCityName()

# ***************************************************

data = sc.textFile(“file:///SparkCourse/city.data)

# Map ratings to key / value pairs: user ID => city ID, rating

ratings = data.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

# The content of ratings rdd will be as below

# 200, (222, 5)

# 210, (40,  3)

# 224, (29,  3)

# 200, (114, 3)

# ***************************************************************

# Emit every city pair rated together by the same user.

# Self-join to find every combination. This is basically a Cartesian product join.

joinedRatings = ratings.join(ratings)

# At this point our RDD consists of userID => ((cityID, rating), (cityID, rating))

# The content of joinedRatings RDD  for userid 200 will be

# 200, [(222, 5), (222, 5)]

# 200,[(222, 5), (114,  3)]

# 200,[(114, 3), (222,  5)]

# 200,[(114, 3),(114,  3)]

# ******************************************************

# Filter out duplicate pairs

uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

# The output of uniqueJoinedRatings RDD will be

# 200, [(222, 5), (222, 5)]

# 200,[(114, 3), (222,  5)]

# 200,[(114, 3),(114,  3)]

# ******************************************************

# Now key by (city1, city2) pairs.

cityPairs = uniqueJoinedRatings.map(makePairs)

# The contect of cityPairs RDD would be

#  (222, 222), (5,5)

#  (114, 222), (3,5)

#  (114,114), (3,3)

# ***********************************************************

# We now have (city1, city2) => (rating1, rating2)

# Now we would collect all ratings for each city pair and compute similarity

cityPairRatings = cityPairs.groupByKey()

# The content of cityPairRatings would be

# (222, 222), [(5,5),……]

# (114, 222), [(3,5), ……]

# and so on

# We now have (city1, city2) = > (rating1, rating2), (rating1, rating2) …

# Can now compute similarities suing Cosine Similarity logic.

cityPairSimilarities = cityPairRatings.mapValues(computeCosineSimilarity).cache()

# *********************************************************************

# Save the results if desired

#cityPairSimilarities.sortByKey()

#cityPairSimilarities.saveAsTextFile(“city-sims”)

# Extract similarities for the city we care about that have “good similarity scores and

# have a minimum number of  ratings pair”.

if (len(sys.argv) > 1):

scoreThreshold = 0.97

coOccurenceThreshold = 50

cityID = int(sys.argv[1])

# Filter for cities with this sim that are “good” as defined by

# We would apply the thresholds here

filteredResults = cityPairSimilarities.filter(lambda((pair,sim)): \

(pair[0] == cityID or pair[1] == cityID) \

and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)

# Sort by the quality of the score.

results = filteredResults.map(lambda((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(5)

# ****************************************************************

print “Top 5 similar cities for ” + nameDict[cityID]

for result in results:

(sim, pair) = result

# Display the similarity result that isn’t the city we’re looking at

similarCityID = pair[0]

if (similarCityID == cityID):

similarCityID = pair[1]

print nameDict[similarCityID] + “\tscore: ” + str(sim[0]) + “\tstrength: ” + str(sim[1])

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Sample output of the script

Calling the above program with an argument (city id, for which you want to find similar ratings pairs)

Example

Spark-submit program.name 2334

Sample output

Dallas   score:  98  Strength 200

Plano   score:  97  Strength 199

==============================

============

////////////////////////////////////////////

References: Inspired by the excellent training materials produced by Frank Kane

http://frank-kane.com/

///////////////////////////////////////////////////////////////////////////

Author: Abhik Roy

 

6 thoughts on “Collaborative Filtering using Apache Spark

  1. I do not even know how I ended up here, but I thought this post was good. I do not know who you are but certainly you’re going to a famous blogger if you aren’t already 😉 Cheers!

Leave a Reply

Your email address will not be published. Required fields are marked *