June 23, 2016

Word Count application with Apache Spark and Java

05:30 Posted by Durga swaroop Perla , , , , , 2 comments
Apache Spark is becoming ubiquitous by day and has been dubbed the next big thing in the Big Data world. Spark has been replacing MapReduce with its speed and scalability. In this Spark series we will try to solve various problems using Spark and Java.
Word count program is the big data equivalent of the classic Hello world program. The aim of this program is to scan a text file and display the number of times a word has occurred in that particular file. And for this word count application we will be using Apache spark 1.6 with Java 8.

For this program, we will be running spark in a stand alone mode. So you don't need to setup a cluster. Even Hadoop is not required for this exercise. Assuming you have Spark, Java and Maven installed properly, let's proceed.

Creating pom.xml

To compile Java programs with Maven, you will need a pom.xml file with the required dependencies. Use this pom.xml file if you don't have one available with you.
<?xml version="1.0" encoding="UTF-8"?>
      <!-- Spark dependency -->
Now, save this file as pom.xml and put it in the same folder as your src directory.

Input File

After creating the pom file, you will need an input file on which we will run our Wordcount program, to count the number of occurrences of each word. This is the file I will be using. 
It is close to midnight and something evil is lurking in the dark
Under the moonlight you see a sight that almost stops your heart
You try to scream but terror takes the sound before you make it
You start to freeze as horror looks you right between the eyes
You are paralyzed 

Java Program

Once we have the pom file ready, we can start with the code.
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;
import java.util.Arrays;

public class WordCount {
 public static void main(String[] args) {

  SparkConf conf = new SparkConf().setMaster("local").setAppName("wordCount");
  JavaSparkContext sc = new JavaSparkContext(conf);

  // Load our input data.
  String inputFile = "file:///home/dsp/Desktop/sparkExamples/sample_testing/resources/inputFile";

  JavaRDD < String > input = sc.textFile(inputFile);
  // Split in to list of words
  JavaRDD < String > words = input.flatMap(l -> Arrays.asList(l.split(" ")));

  // Transform into pairs and count.
  JavaPairRDD < String, Integer > pairs = words.mapToPair(w -> new Tuple2(w, 1));

  JavaPairRDD < String, Integer > counts = pairs.reduceByKey((x, y) -> x + y);



Once we have everything ready, its time to execute our program and see the output.
To compile it, first execute this in the directory with the pom file.
 mvn clean && mvn compile && mvn package 
This will take sometime to run the first time because maven will have to download and install the dependencies. After successful compilation, It creates the target folder and a jar file named freblogg-spark-tutorial-0.0.1.jar.

Then to execute the program you need to run the spark-submit script in your SPARK_HOME folder.
 $SPARK_HOME/bin/spark-submit --class "WordCount" target/freblogg-spark-tutorial-0.0.1.jar
Once this command is executed your screen will be completely filled with spark logs. If you scroll a bit to the top, you will see the following output, which is the output we are interested in.
 [(freeze,1), (are,1), (Under,1), (it,1), (is,2), (you,3), (takes,1), (lurking,1), (right,1), (that,1), (a,1), (You,3), (terror,1), (start,1), (dark,1), (between,1), (scream,1), (before,1), (to,3), (as,1), (in,1), (moonlight,1), (sound,1), (midnight,1), (see,1), (stops,1), (sight,1), (try,1), (something,1), (paralyzed,1), (evil,1), (It,1), (eyes,1), (make,1), (almost,1), (but,1), (and,1), (close,1), (heart,1), (looks,1), (your,1), (horror,1), (the,4)]
That is the counts of each word in the file. So, there you go. You have successfully written your first Spark application. Congratulations. You're officially a Spark programmer now!

Understanding the code

Now that we have our application set up, let's see what the program is doing, step by step.

First we have the spark variables sc and conf. Don't worry too much about them right now. All you need to know is that every Spark program needs those two lines.
 SparkConf conf = new SparkConf().setMaster("local")                                                        .setAppName("wordCount");
  JavaSparkContext sc = new JavaSparkContext(conf); 
 So, just copy paste the lines in every application you are going to work on.

Next we are reading the input file using RDD's. RDD's are essentially blob's of text that you read from various sources and you can transform them in to whatever you want using various operations.  Here we are reading the input file from our local file system. If you want to read from HDFS, then replace the file:/// with hdfs:///
 String inputFile = "file:///home/dsp/Desktop/sparkExamples/sample_testing/resources/inputFile";
JavaRDD < String > input = sc.textFile(inputFile);

Then we have our first transformation operation on the input RDD we have created in the above step.
Flat Map is an inbuilt function that takes one input and can provide any number of outputs depending on the operations used inside it.
 JavaRDD < String > words = input.flatMap(l -> Arrays.asList(l.split(" "))); 
Here we are splitting the sentence on white space characters. So, the flatmap function here returns a list of all the words in the input document and that will be stored in the RDD named words. For more about Flatmap, read this : Spark FlatMap and Map

Next, we have another transformation mapToPair that returns a Tuple of word and the number 1.
And, a Tuple is very similar to ordered pairs in Cartesian coordinate system. Tuple2 looks like (x,y), where x is the Key. Similarly Tuple3 will be (x,y,z) and so on.
 JavaPairRDD < String, Integer > pairs = words.mapToPair(w -> new Tuple2(w, 1));
As an example, the word you in the input will be mapped to (you,1) by mapToPair function. And, since the result is a pair, we have to store it in a JavaPairRDD which supports pairs.

And, then we are doing the final transformation on the pairs that will add up individual counts of each word.
JavaPairRDD < String, Integer > counts = pairs.reduceByKey((x, y) -> x + y);
ReduceByKey method groups all the Tuple pairs with the same key. We have the word 'you' repeated thrice and so we have (you,1) three times. Now, (you,1) , (you,1), (you,1) will become (you,3)  because of  the sum we are doing inside the function. And similarly for the other words. 

Then finally we are performing an action on the RDD which is where the actual computation of all the above steps takes place. collect() will return all the elements in the RDD and we are printing that using println, giving us the output we want.

So there you go, Your first Spark application completed. To learn more go through the documentation and examples given on the Spark's webpage and subscribe to Freblogg for more tutorials.

Happy Sparking!

Image : http://www.datanami.com/wp-content/uploads/2014/12/spark-and-java-8.png

Self Promotion:

If you have liked this article and would like to see more, subscribe to our Facebook and G+ pages.

Facebook page @ Facebook.com/freblogg

Google Plus Page @ Google.com/freblogg


Post a Comment

Please Enter your comment here......