Exploring Broadcast variables in Apache Spark

In this blog post, we shall explore broadcast variables in Apache Spark. Broadcast variables are used to broadcast a data file to all data processing nodes of a Spark cluster. It is a very useful concept as it can be used to minimize data traffic between data processing nodes. A perfect use case would be a lookup table. If the lookup table is broadcasted to all the data processing nodes, each node would have a full copy of the file, hence it would not need to fetch the data from other nodes, thus reducing inter node traffic.

The below diagram shows the broadcasting concept, where a full copy of the data file is sent to all data processing nodes of the Spark Cluster.

spark_broadcast_variable_pic1

Sample Code

To demonstrate the usage of broadcast variables, we shall use the publically available data set from http://grouplens.org/datasets/movielens/. We are using the ml-100k records for this demonstration.

///////////////////////////////////////////////////////////////////////////////////////////

MovieLens data sets were collected by the GroupLens Research Project

at the University of Minnesota.

This data set consists of:

* 100,000 ratings (1-5) from 943 users on 1682 movies.

* Each user has rated at least 20 movies.

* Simple demographic info for the users (age, gender, occupation, zip)

/////////////////////////////////////////////////////////////////////////////////////////

========================================================================

CITATION F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets:History and Context. ACM Transactions on Interactive IntelligentSystems (TiiS) 5, 4, Article 19 (December 2015), 19 pages.DOI=http://dx.doi.org/10.1145/2827872

=========================================================================

Problem Description

We shall find the most watched movies and order them in ascending order. The output required is in the format

Movie Name       Number of times watched

Below are the sample lines of the u.data and u.item files the program would use

u.data

Userid      movieid  rating    timestamp

196                242       3            881250949

186                302       3            891717742

22                  377       1            878887116

244                51         2            880606923

166                346       1            886397596

u.item

1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0

2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0

The Python Code  with comments

///////////////////////////////////////////////////////////////////////////////////////

# We import the spark config and spark context

from pyspark import SparkConf, SparkContext

# We define a dictionary. In this case the dictionary consists of movie id and corresponding

# movie name. example

# 1 Toy Story (1995)

def dictMovieNames():

movieNames = {}

with open(“ml-100k/u.ITEM”) as f:

for line in f:

fields = line.split(‘|’)

movieNames[int(fields[0])] = fields[1]

return movieNames

# We run Spark in local mode using defautls and set the

#program names as MostWatchedMovies. We create the spark context called sc.

conf = SparkConf().setMaster(“local”).setAppName(“MostWatchedMovies”)

sc = SparkContext(conf = conf)

# the dictionary is broadcasted to all data processing nodes. The dictionary name is mnDict.

mnDict = sc.broadcast(dictMovieNames())

# The u.data file is read and the fields split based on space delimiter. A ‘1’ is then added to

# each of the mapped outputs

# Sample output

# 242    1

# 302    1

# 377    1

#242     1

# We then reducebykey and add the number of occurrences of the movie ids

#  Sample output

# 242   2

# 302   1

#377    1

# the key value pairs are then swapped and sorted based on number of occurrences of movie ids

# Sample output

# 2   242

# 1 302

# 1 377

lines = sc.textFile(“file:///abhik/ml-100k/u.data”)

movies = lines.map(lambda x: (int(x.split()[1]), 1))

movieCounts = movies.reduceByKey(lambda x, y: x + y)

flipped = movieCounts.map( lambda (x, y) : (y, x))

sortedMovies = flipped.sortByKey()

# The movie name of the corresponding movie id is then fetched from the broadcasted

# mnDic dictionary and replaced with the movie ids and the final result collected in the

# sortedMoviesWithNames rdd.

sortedMoviesWithNames = sortedMovies.map(lambda (count, movie) : (mnDict.value[movie], count))

# Output the final result to standard output

 finalresults = sortedMoviesWithNames.collect()

for result infinal results:

print result

//////////////////////////////////////////////////////////////////////////////////////

Below is the output (first few lines) produced when the program is run

(“Bram Stoker’s Dracula (1992)”, 120)
(‘Deer Hunter, The (1978)’, 120)
(‘Cinema Paradiso (1988)’, 121)
(‘Harold and Maude (1971)’, 121)
(‘Carrie (1976)’, 121)
(‘My Left Foot (1989)’, 121)
(‘Magnificent Seven, The (1954)’, 121)
(‘Killing Fields, The (1984)’, 121)
(‘Bonnie and Clyde (1967)’, 122)
(‘Dumbo (1941)’, 123)
(‘Henry V (1989)’, 124)
(‘Under Siege (1992)’, 124)
(‘Chasing Amy (1997)’, 124)
(‘Alien: Resurrection (1997)’, 124)
(‘Great Escape, The (1963)’, 124)
(‘Ben-Hur (1959)’, 124)
(‘Amistad (1997)’, 124)
(‘Cold Comfort Farm (1995)’, 125)
(’12 Angry Men (1957)’, 125)
(‘Lost Highway (1997)’, 125)
(‘My Fair Lady (1964)’, 125)
(‘Jackie Brown (1997)’, 126)
(‘James and the Giant Peach (1996)’, 126)
(‘Stargate (1994)’, 127)

Author: Abhik Roy

 

12 thoughts on “Exploring Broadcast variables in Apache Spark

  1. Magnificent goods from you, man. I’ve bear in mind your stuff previous to and you are just extremely wonderful. I really like what you have obtained right here, certainly like what you are stating and the best way by which you are saying it. You make it entertaining and you still take care of to keep it smart. I can not wait to read far more from you. That is really a wonderful website.

  2. Fantastic goods from you, man. I’ve understand your stuff previous to and you are just extremely great. I actually like what you’ve acquired here, really like what you’re saying and the way in which you say it. You make it entertaining and you still care for to keep it wise. I cant wait to read far more from you. This is really a terrific site.

    1. There is a ‘subscribe to’ button on the upper right hand side of the blog page

      Cheers

      Abhik

  3. Somebody necessarily assist to create severely articles I would personally state.

    This can be the first time I frequented your website page and thus far?
    I amazed together with the analysis you made to make this kind of submit extraordinary.
    Fantastic activity!

Leave a Reply

Your email address will not be published. Required fields are marked *