Taming Apache Spark with Python – Programming examples Part 3

This blog is continuation of Taming Apache Spark with Python – Programming examples Part 2.

Building on Part 1 and 2, we shall explore some more programming principals using an example. The Apache Spark architecture diagram will be provided throughout the series to make it easier to follow the progressive programming examples.

Program Flow

Apache Spark provides different APIs for programming. We can use python, scala or java. Here we are using a python IDE to develop a Spark program using the python programming language.Spark typically has an onion layer architecture. When the python program is run (developed using the Enthough Canopy IDE in this example), it runs on Spark core engine in Scala. Scala in turn is developed in Java, so the program is ultimately running in Java.

Programming Principals explored

  1. Filter

Test Data

We shall use the below test data to explain the programming principals. The  data has been generated randomly and gives information on weather readings from different station ids. The first column provides the station id, the third column tells us if the reading is the maximum or minimum temperature.  The forth column gives us the temperature reading.

ITE00100554 18000101 TMAX -73 E
ITE00100554 18000101 TMIN -120 E
GM000010962 18000101 PRCP 4 E
EZE00100082 18000101 TMAX -66 E
EZE00100082 18000101 TMIN -123 E
ITE00100554 18000102 TMAX -65 E
ITE00100554 18000102 TMIN -100 E
GM000010962 18000102 PRCP 0 E
EZE00100082 18000102 TMAX -69 E
EZE00100082 18000102 TMIN -11 E
ITE00100554 18000103 TMAX -25 E
ITE00100554 18000103 TMIN -47 E
GM000010962 18000103 PRCP 5 E
EZE00100082 18000103 TMAX -11 E
EZE00100082 18000103 TMIN -74 E
ITE00100554 18000104 TMAX 0 E
ITE00100554 18000104 TMIN -15 E
GM000010962 18000104 PRCP 0 E
EZE00100082 18000104 TMAX -57 E
EZE00100082 18000104 TMIN -79 E
ITE00100554 18000105 TMAX 15 E
ITE00100554 18000105 TMIN -8 E
GM000010962 18000105 PRCP 0 E
EZE00100082 18000105 TMAX -47 E
EZE00100082 18000105 TMIN -33 E
ITE00100554 18000106 TMAX 20 E
ITE00100554 18000106 TMIN 21 E
GM000010962 18000106 PRCP 0 E
EZE00100082 18000106 TMAX -46 E
EZE00100082 18000106 TMIN -52 E
ITE00100554 18000107 TMAX 45 E
ITE00100554 18000107 TMIN 11 E
GM000010962 18000107 PRCP 0 E
EZE00100082 18000107 TMAX -26 E
EZE00100082 18000107 TMIN -55 E
ITE00100554 18000108 TMAX 46 E
ITE00100554 18000108 TMIN 35 E
GM000010962 18000108 PRCP 0 E
EZE00100082 18000108 TMAX -3 E
EZE00100082 18000108 TMIN -33 E
ITE00100554 18000109 TMAX 50 E
ITE00100554 18000109 TMIN 27 E
GM000010962 18000109 PRCP 0 E
EZE00100082 18000109 TMAX 19 E
EZE00100082 18000109 TMIN -49 E
ITE00100554 18000110 TMAX 48 E

 

Problem Statement

We will write a Python Spark program that will give us the maximum recorded temperature for a given station id.

The Python Program

/////////////////////////////////////////////////////////////////////////////////////////////

from pyspark import SparkConf, SparkContext

 conf = SparkConf().setMaster(“local”).setAppName(“MaximumTemperatures”)

sc = SparkContext(conf = conf)

//////////////////////////////////////////////////////////////////////////////////////////////

The first step is to import the Spark Context and the Spark configuration into the python program.

A Spark Context is the main entry point for the Spark functionality. It represents a connection to the spark cluster (or your local computer if running in local mode) and is used to create the RDDs, accumulators and broadcast variables in the cluster.

RDD (Resilient Distributed Datasets)

An RDD is a collection of datasets. They are immutable and resilient and can be stored either in memory or persistent storage like HDFS. RDDs can be created from other RDDs through transformations and many different types of actions can be applied on them. An RDD does Lazy Evaluation, which mean it does not process any data until an action is performed on them. This enables an RDD to choose an optimum access plan based on all the transformations being applied. RDDs are also fault tolerant, so if a node in the cluster fails, the RDD can be taken over by another node in the cluster, thus computations can carry on without  program termination.

We then create a Spark context called sc, providing the program name as MaximumTemperatures. setMaster(“local”) tells Spark to run the program in local mode. SparkConf() is the Spark configuration, and we are taking all default configuration values here.

//////////////////////////////////////////////////////

def parseLine(line):

fields = line.split(‘,’)

stationID = fields[0]

entryType = fields[2]

temp = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0

return (stationID, entryType, temp)

///////////////////////////////////////////////////////////

We first define a function named parseLine, in the usual standard way we do in most other programming languages. The function takes the lines rdd (which we define later), splits the values based on ‘,’ as it is a csv file. It then assigns the first field, denoted by fields[0] to variable stationID, the third field to variable entryType and fourth field to variable temp. An arithmetic operation is then applied which changes the temperature reading in degree Celsius to Fahrenheit. The function returns stationID, entryType and temp.

The lines rdd  reads the file tempreadings.csv which contains the test data and feeds it to the parseLine function. The output of parseLine function creates another rdd called parsedLines.

//////////////////////////////////////////////////////////////////////////////

lines = sc.textFile(“file:///SparkCourse/tempreadings.csv”)

parsedLines = lines.map(parseLine)

maxTemps = parsedLines.filter(lambda x: “TMAX” in x[1])

stationTemps = maxTemps.map(lambda x: (x[0], x[2]))

maxTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))

results = maxTemps.collect()

/////////////////////////////////////////////////////////////////////////

The line

maxTemps = parsedLines.filter(lambda x: “TMAX” in x[1])

is a filter function, which filters the rows which have a value of ‘TMAX’ in the second field, denoted by entryType. This transformation creates a new rdd called maxTemps.

The line

stationTemps = maxTemps.map(lambda x: (x[0], x[2]))

maps the maxTemps rdd and outputs only  stationID and temp columns  and creates a new rdd called stationTemps.

The lines

maxTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))

results = maxTemps.collect()

Will take the rows (which are in the form of stationID and temp) and reduce them based on the keys (stationID) and give us the maximum temperature readings for each unique stationID. The results are collected in the rdd called results.

The below line

for result in results:

print result[0] + “\t{:.2f}F”.format(result[1])

 simply outputs the results in the desired format.

Putting the code together it looks like this

//////////////////////////////////////////////////////////////

from pyspark import SparkConf, SparkContext

 conf = SparkConf().setMaster(“local”).setAppName(“MaximumTemperatures”)

sc = SparkContext(conf = conf)

def parseLine(line):

fields = line.split(‘,’)

stationID = fields[0]

entryType = fields[2]

temp = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0

return (stationID, entryType, temp)

 lines = sc.textFile(“file:///SparkCourse/tempreadings.csv”)

parsedLines = lines.map(parseLine)

maxTemps = parsedLines.filter(lambda x: “TMAX” in x[1])

stationTemps = maxTemps.map(lambda x: (x[0], x[2]))

maxTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))

results = maxTemps.collect()

 for result in results:

print result[0] + “\t{:.2f}F”.format(result[1])

 ////////////////////////////////////////////////////////////////////////////////

Running the script maxtemp.py returns the below result

pyspark maxtemp.py

///////////////////////////////////////

ITE00100554     41.00F

EZE00100082     35.42F

////////////////////////////////////

Author: Abhik Roy

26 thoughts on “Taming Apache Spark with Python – Programming examples Part 3

  1. you are in reality a good webmaster. The site loading pace is incredible. It sort of feels that you’re doing any distinctive trick. In addition, The contents are masterwork. you have done a fantastic process on this subject!

  2. I simply want to tell you that I’m newbie to blogging and site-building and definitely enjoyed you’re blog site. More than likely I’m going to bookmark your blog post . You really come with incredible stories. Thank you for revealing your blog.

  3. Normally I usually do not learn post on blogs, however I want to claim that
    this write-up very forced me to check out and do so!

    Your writing style is amazed me. Thanks, very great article.

  4. I would like to thnkx for the efforts you have put in writing this blog. I am hoping the same high-grade blog post from you in the upcoming as well. In fact your creative writing abilities has inspired me to get my own blog now. Really the blogging is spreading its wings quickly. Your write up is a good example of it.

  5. Thank you for sharing excellent informations. Your web site is so cool. I am impressed by the details that you have on this website. It reveals how nicely you understand this subject. Bookmarked this web page, will come back for extra articles. You, my pal, ROCK! I found just the info I already searched all over the place and simply couldn’t come across. What an ideal web site.

  6. It?¦s actually a great and helpful piece of info. I am satisfied that you just shared this useful information with us. Please stay us up to date like this. Thank you for sharing.

  7. Hi there, You’ve done an incredible job. I will certainly digg it and personally suggest to my friends. I’m sure they will be benefited from this website.

  8. Fantastic blog! Are you experiencing any recommendations for aspiring writers?
    I’m hoping to start my own site soon but I’m a little lost on everything.
    Could you propose beginning from a free platform like WordPress or choose a paid option? There are many
    choices out there that I’m completely confused ..
    Any tips? Many thanks!

    1. I would suggest picking up a free template from WordPress to start with. WordPress has amazing capabilities.

      Good luck and do not forget to share your blog with me once you create it

      Cheers

      Abhik

  9. whoah this blog is wonderful i like studying your posts. Keep up the great
    work! You already know, a lot of individuals are hunting around
    for this information, you can aid them greatly.

  10. I just like the helpful info you supply on your
    own articles. I will bookmark your blog and check once again here regularly.
    I am reasonably sure I am going to be told a lot of new stuff right
    here! Have a great time for the following!

  11. Right here is the right website for anybody who
    would like to find out about this topic. You realize a whole lot its
    almost hard to argue with you (not that I personally will need toHaHa).
    You certainly put a new spin on a subject which has been written about for ages.
    Great stuff, just great!

  12. Hello, Neat post. There is certainly a difficulty together with your site
    in web explorer, might check this? IE still will be the marketplace leader
    and a major section of folks will omit your fantastic writing as a
    result of this problem.

    1. I was able to load the pages in IE. Can you let me know the specific error you get?

      Cheers

      Abhik

  13. Hey There. I found your blog using msn. This can be a really well written article.
    I am going to ensure to bookmark it and return to
    learn a greater portion of your useful info. Thank
    you for the post. I’ll definitely comeback.

  14. Whats up very nice website!! Guy .. Excellent .. Superb ..
    I’ll bookmark your web site and take the feeds also? I’m satisfied
    to seek out so many helpful info here in the post, we’d
    like develop more techniques on this regard, thank
    you for sharing. . . . . .

Leave a Reply

Your email address will not be published. Required fields are marked *