Monthly Archives: January 2016

Taming Apache Spark with Python – Programming examples Part 2

This blog is continuation of Taming Apache Spark with Python – Programming examples Part 1.

Building on Part 1, we shall explore some more programming principals using an example. The Apache Spark architecture diagram will be provided throughout the series to make it easier to follow the progressive programming examples.

Spark_blog1_pic1

Program Flow

Apache Spark provides different APIs for programming. We can use python, scala or java. Here we are using a python IDE to develop a Spark program using the python programming language.

Spark typically has an onion layer architecture. When the python program is run (developed using the Enthough Canopy IDE in this example), it runs on Spark core engine in Scala. Scala in turn is developed in Java, so the program is ultimately running in Java.

Programming Principals explored

  1. mapValues
  2. reduceByKey
  3. Integer Casting

 

Test Data

We shall use the below test data to explain the programming principals. The  data has been generated randomly and gives information on the number of violent crimes for a particular county. The counties are represented by county ids which are in the second column of the data set. The third column represents the number of crimes while the first column is simply a record identifier.

0 33 385
1 26 2
2 55 221
3 40 465
4 68 21
5 59 318
6 37 220
7 25 307
8 40 380
9 27 181
10 53 191
11 57 372
12 54 253
13 56 444
14 33 49
15 40 49
16 22 323
17 33 13
18 45 455
19 40 246
20 67 220
21 19 268
22 40 72
23 51 271
24 25 1

Problem Statement

We will write a Python Spark program that will provide the average number of violent crimes for each given county.

The Python Program

———————————————————————————————

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster(“local”).setAppName(“CrimebyCounty”)

sc = SparkContext(conf = conf)

———————————————————————————————-

The first step is to import the Spark Context and the Spark configuration into the python program. We also import the collections package which helps us in ordering the final output data. The collections package is not specific to Spark, it is purely a python feature.

A Spark Context is the main entry point for the Spark functionality. It represents a connection to the spark cluster (or your local computer if running in local mode) and is used to create the RDDs, accumulators and broadcast variables in the cluster.

RDD (Resilient Distributed Datasets)

An RDD is a collection of datasets. They are immutable and resilient and can be stored either in memory or persistent storage like HDFS. RDDs can be created from other RDDs through transformations and many different types of actions can be applied on them. An RDD does Lazy Evaluation, which mean it does not process any data until an action is performed on them. This enables an RDD to choose an optimum access plan based on all the transformations being applied. RDDs are also fault tolerant, so if a node in the cluster fails, the RDD can be taken over by another node in the cluster, thus computations can carry on without  program termination.

We then create a Spark context called sc, providing the program name as CrimebyCounty. setMaster(“local”) tells Spark to run the program in local mode. SparkConf() is the Spark configuration, and we are taking all default configuration values here.

———————————————————————————-

def parseLine(line):

fields = line.split(‘,’)

county = int(fields[1])

crime = int(fields[2])

return (county, crime)

lines = sc.textFile(“file:///SparkCourse/countycrimes.csv”)

rdd = lines.map(parseLine)

——————————————————————————–

We first define a function named parseLine, in the usual standard way we do in most other programming languages. The function takes the lines rdd (which we define later), splits the values based on ‘,’ as it is a csv file, and then casts as integer the fields 1 and 2. The value of the field 1 is assigned to county while the value of field 2 is assigned to crime. The function then returns county and crime.

Example for the sample comma separated values

 

0 33 385
1 26 2

The parseLine function returns

{33, 385}

{26,2}

The lines rdd  reads the file countycrimes.csv which contains the test data and feeds it to the parseLine function. The output of parseLines function creates another rdd called rdd.

—————————————————————————————————————————-

totalsByCounty = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

averagesByCounty = totalsByCounty.mapValues(lambda x: x[0] / x[1])

————————————————————————————————————————-

Next, a mapValues function is applied, which only passes the values to the map function.

rdd.mapValues(lambda x: (x, 1))

The lambda function adds a ‘1’ to each of the values.

 For  sample data feed of

{33, 385}

{26,2}

{33,49}

{33,13}

The output produced by mapValues is

(33, {385,1})

(26, {2,1})

(33, {49,1})

(33, {13,1})

As you can see above, the keys 33, 26 and 33 are not fed into the map function as we used mapValues function instead of map.

reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

The reduceByKey is an action which performs the mentioned arithmetic operation for each group of keys.

For sample data

(33, {385,1})

(33, {49,1})

(33, {13, 1})

Output of reduceByKeys gives

(33, {447, 3})

averagesByCounty = totalsByCounty.mapValues(lambda x: x[0] / x[1])

Finally averagesByCounty gives the average crime rate for each county.

(33, {447, 3})

Produces

(33, 149)

——————————————————

results = averagesByCounty.collect()

for result in results:

print result

——————————————————–

The above lines of code simply collects the results and prints them out.

Below is the final result set

(33, 149)

(67, 220)

(68, 21)

(37, 220)

(51, 271)

(40, 242)

(54, 253)

(45, 455)

(59, 318)

(19, 268)

(25, 154)

(53, 191)

(22, 323)

(55, 221)

(56, 444)

(57, 372)

(26, 2)

(27, 181)

 The code put together looks like this

———————————————————————————

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster(“local”).setAppName(“CrimebyCounty”)

sc = SparkContext(conf = conf)

 def parseLine(line):

fields = line.split(‘,’)

county = int(fields[1])

crime = int(fields[2])

return (county, crime)

 lines = sc.textFile(“file:///SparkCourse/countycrimes.csv”)

rdd = lines.map(parseLine)

totalsByCounty = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

averagesByCounty = totalsByCounty.mapValues(lambda x: x[0] / x[1])

results = averagesByCounty.collect()

for result in results:

print result

——————————————————————————————————-

Conclusion: In this blog we built on part1 of the series and explored mapValues and reduceByKeys. Progressively advanced concepts will be covered in upcoming series.