Breadth First Search Algorithm (BFS) in Apache Spark

Breadth-first search (BFS) is an algorithm for traversing or searching a tree or graph data structures.
In this blog post, we would take a quick tour of the algorithm and see how it could be implemented in Apache Spark.
Let’s say, we want to know the degree of connection between O and M.
O is connected in first degree to P
O is connected to V and C, through P. So V and C are second degree connections to O
O is connected to M, through P and C, hence M is a 3rd degree connection of O.
Refer to the below diagrams to understand the working of the algorithm.

spark_bfs_pic1

Fig 1 – Gives the initial state, where we assume all the nodes (s, m, o, p, v, c, t, m) are not connected to each other in any way. This is denoted by infinity values for all the nodes.

Fig 2 – We start the algorithm at point O. We make O green and give it the value 0 to denote a zero degree of connectivity to itself. We change the color of the node to green to denote that it needs to be processed.

spark_bfs_pic2

Fig 3 – We traverse  to nodes P and S from O and put 1 in them to denote a single degree of connection. We now color node O red to denote that processing of this node has completed. We now color nodes P and S green to denote that these will be the next to be processed.

Fig 4 – from node P, we traverse to nodes  v and c and put a 2 to denote a second degree of connection to node O. We do the same for node m from S. Nodes M, V and C are now marked green to denote the nodes to be processed next. Node P is now marked red to denote that processing for this node has completed.

spsrk_bfs_pic3

Fig 5 – We traverse to node t from v, and m from c and mark them 3 to denote third degree of connection from O. Nodes m, v and c are now marked red, t and m marked green.

Fig 6 – We stop processing once we reach m . We mark nodes t and m red.

The Apache Spark Implementation of BFS in Python

Test data : Below is the format of the test data. Only a few lines of the test data has been displayed to explain the connection information.

Snippet of a few lines of the graph.txt file which provides the connection information.

//////////////////////////////////////////////////////

5980 2731 3712 1587 6084 2472 2546 6313

5981 3569 5353 4087 2653

5980  4554  1113   5544 7777 2345

2713 1000 2111

///////////////////////////////////////////////////

As you can see, each line consists on an id (the first column) and the ids of all the other people who share a first degree connection with them

Names.txt file

////////////////////

19413 “Abhik”

19414 “Roger”

19415 “Kiran”

//////////////////

The file Names.txt simply consists of the mapping of the ids to the names of the individuals. This file will not be used in the program, but could be later used to lookup the names of the individuals.

The Apache Spark Implementation for Python

/////////////////////////////////////////////////////////

#The basic configuration

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster(“local”).setAppName(“DegreesOfConnection”)

sc = SparkContext(conf = conf)

# The individuals we wish to find the degree of connection between:

startCharacterID = 5980 # Jorge

targetCharacterID = 1326 # Adam

# The accumulator, used to signal when we find the target character during

# Breadth First Search traversal.

# An accumulator variable is accessible to all nodes of a spark cluster and can be used as

# a Global counter. So whenever the value of an accumulator variable gets updated, all the

# nodes in the Spark cluster can see its value

 hitCounter = sc.accumulator(0)

def convertToBFS(line):

fields = line.split()

heroID = int(fields[0])

connections = []

for connection in fields[1:]:

connections.append(int(connection))

color = ‘WHITE’

distance = 10000

if (heroID == startCharacterID):

color = ‘GREEN’

distance = 0

return (heroID, (connections, distance, color))

def createStartingRdd():

inputFile = sc.textFile(“file:///sparkcourse/marvel-graph.txt”)

return inputFile.map(convertToBFS)

def bfsMap(node):

characterID = node[0]

data = node[1]

connections = data[0]

distance = data[1]

color = data[2]

results = []

#If this node needs to be expanded and processed…

if (color == ‘GREEN’):

for connection in connections:

newCharacterID = connection

newDistance = distance + 1

newColor = ‘GREEN’

if (targetCharacterID == connection):

hitCounter.add(1)

newEntry = (newCharacterID, ([], newDistance, newColor))

results.append(newEntry)

#We’ve processed this node, so color it RED

color = ‘RED’

#Emit the input node so we don’t lose it.

results.append( (characterID, (connections, distance, color)) )

return results

def bfsReduce(data1, data2):

edges1 = data1[0]

edges2 = data2[0]

distance1 = data1[1]

distance2 = data2[1]

color1 = data1[2]

color2 = data2[2]

distance = 10000

color = ‘WHITE’

edges = []

# See if one is the original node with its connections.

# If so preserve them.

if (len(edges1) > 0):

edges = edges1

if (len(edges2) > 0):

for connection in edges2:

edges.append(connection)

# Preserve minimum distance information

if (distance1 < distance):

distance = distance1

if (distance2 < distance):

distance = distance2

# Preserve darkest color information

if (color1 == ‘WHITE’ and (color2 == ‘GREEN’ or color2 == ‘RED’)):

color = color2

if (color1 == ‘GREEN’ and color2 == ‘RED’):

color = color2

if (color2 == ‘WHITE’ and (color1 == ‘GREEN’ or color1 == ‘RED’)):

color = color1

if (color2 == ‘GREEN’ and color1 == ‘RED’):

color = color1

return (edges, distance, color)

#Main program here is here and it calls the respective functions to perform map, reduce and

 # convert lines info BFS format:

 iterationRdd = createStartingRdd()

for iteration in range(0, 10):

print “Running BFS iteration# ” + str(iteration+1)

# Create new vertices as needed to darken or reduce distances in the

# reduce stage. If we encounter the node we’re looking for as a GREEN

# node, increment our accumulator to signal that we’re done.

mapped = iterationRdd.flatMap(bfsMap)

# Note that mapped.count() action here forces the RDD to be evaluated, and

# that’s the only reason our accumulator is actually updated.

print “Processing ” + str(mapped.count()) + ” values.”

if (hitCounter.value > 0):

print “Reached the target individual! From ” + str(hitCounter.value) \

+ ” different direction(s).”

break

# Reducer combines data for each character ID, preserving the darkest

# color and shortest path.

iterationRdd = mapped.reduceByKey(bfsReduce)

////////////////////////////////////////////////////////////////////

Understanding the code with an example of the data flow

To get a better understanding of the programming logic, consider the below sample data

We will look at how the data flows through the program during the first pass denoted by  the statement, iteration in range(0, 10), in the main program.

//////////////////////////////////////////////////////

5980 2731 3712 1587 6084 2472 2546 6313

5981 3569 5353 4087 2653

5980  4554  1113   5544 7777 2345

 2713 1000 2111

///////////////////////////////////////////

The output of rdd called inputFile will be same as the above sample dataset, as we are simply reading the data here.

 The values of iterationRdd will be as below

///////////////////////////////////////////////////////////////////

(5980, ([2731 3712 1587 6084 2472 2546 6313], 0, GREEN))

(5981, ([3569 5353 4087 2653], 1000, WHITE))

(5980, ([4554  1113   5544 7777 2345], 0, GREEN))

(2713, ([1000 2111], 1000, WHITE)

///////////////////////////////////////////////////////////////

The output of mapped rdd will be as below

 newEntry = (newCharacterID, ([], newDistance, newColor))

 ////////////////////////////////////////////////////

 (2731, ([], 1, GREEN)

 (37121, ([], 1, GREEN)

 (1587, ([], 1, GREEN)

 (6084, ([], 1, GREEN)

 (2472, ([], 1, GREEN)

 (2546, ([], 1, GREEN)

 (6313, ([], 1, GREEN)

(5980, ([2731 3712 1587 6084 2472 2546 6313], 0, RED))

  (4554, ([], 1, GREEN)

 (1113, ([], 1, GREEN)

 (5544, ([], 1, GREEN)

 (7777, ([], 1, GREEN)

 (2345, ([], 1, GREEN)

(5980, ([4554  1113   5544 7777 2345], 0, RED))

(2713, ([1000 2111], 1000, WHITE)

(5981, ([3569 5353 4087 2653], 1000, WHITE))

 /////////////////////////////////////////////////////////////////

 The output of mapped.reduceByKey(bfsReduce)

 Will be

 ///////////////////////////////////////////////////////

(2731, ([1000 2111], 1, GREEN)

(37121, ([], 1, GREEN)

(5980, ([2731 3712 1587 6084 2472 2546 6313 4554  1113   5544 7777 2345], 0, RED))

(5981, ([3569 5353 4087 2653], 1000, WHITE))

///////////////////////////////////////////////////////////

Running the above program will produce the below output

//////////////////////////////////////////

 Running BFS iteration # 1

Processing 2445 values

 Running BFS iteration # 2

Processing 38799 values

Running BFS iteration # 3

Processing 2698 values

Hit the target character! From 1 different directions(s).

////////////////////////////////////////////////////////////////////

 ////////////////////////////////////////////

References: Inspired by the excellent training materials produced by Frank Kane

http://frank-kane.com/

///////////////////////////////////////////////////////////////////////////

Author: Abhik Roy

 

 

16 thoughts on “Breadth First Search Algorithm (BFS) in Apache Spark

  1. Woah! I’m really enjoying the template/theme of this blog. It’s simple, yet effective. A lot of times it’s difficult to get that “perfect balance” between superb usability and appearance. I must say you’ve done a awesome job with this. In addition, the blog loads extremely fast for me on Safari. Superb Blog!

  2. It’s truly a great and useful piece of info. I’m glad that you shared this helpful information with us. Please stay us up to date like this. Thanks for sharing.

    1. My contact information is in the ‘about me page’ and the ‘linkedin connect with me’ icon

      Cheers

      Abhik

  3. You could definitely see your skills within the article you write.

    The sector hopes for even more passionate writers
    just like you who are not afraid to say the direction they believe.
    Always go after your heart.

  4. When someone writes an paragraph he/she keeps the image of
    your user in their/her brain that the way a user can understand it.

    Thus that’s why this post is perfect. Thanks!

  5. Appreciating the time and effort you put into your site and in depth information you present.
    It’s nice to come across a blog every once in a while that
    isn’t the same out of date rehashed information. Great read!
    I’ve bookmarked your site and I’m including your RSS feeds to my Google account.

  6. Excellent post. I was previously checking continuously this weblog
    and I am inspired! Extremely helpful information specifically the final section :
    ) I look after such information a great deal. I was previously searching
    for this certain information for a while. Thanks and best of luck.

Leave a Reply

Your email address will not be published. Required fields are marked *