Performance

From the Driver to the Executors

I have worked with a number of people who are new to Apache Spark™, and have an existing program that they want to port to it.  Spark supports a number of programming languages, is relatively easy to program with, and has a large and growing community behind it, so when it comes time to scale your application from a single machine to many, Spark is an attractive platform.  But there are a few gotchas to look out for, and this post describes a common one that I have seen a number of times.  The Spark tuning guide offers some tips, and in future blog posts we’ll look at several other examples, but today we’ll look at the process of offloading work from your driver program to the executors in the cluster.

Every Spark program has a driver component, which is launched when you submit your program using spark-submit, or when you start using the spark-shell or pyspark console.  This part of your program feels a lot like your original application.  In fact, you can likely run your program as a Spark driver without any changes, and get the same performance you had before.

Let’s use an example.  We’re going to write a program that searches some text to find the longest example of Standard Pilish in it.  The idea here is to find a sequence of words, whose lengths correspond to the digits of π. For example, the phrase “May I have a Spark framework in Python, Scala, and Java8 ?” contains a sequence of words with lengths of 3.1415926535.  We want our program to take any arbitrary text file as input, and then find the longest sequence of Pilish within it.

To write our program, we need to know the digits of π.  There are numerous examples of how to generate π, including in the Spark examples folder, so there’s no need to get into that here.  For our purposes, we’ll just create an array of the first 100 digits which we can refer to.  We can always make this array longer if we happen to ever see a result of the full length.

The second thing we need is the code to apply the rules of Standard Pilish to a file.  Here’s a simple Java program, which does exactly this.  We’ll use this as our starting point for porting to Spark.  To keep the code short and concise, I’m omitting much of the error checking that should be in a production-quality program.

public class PilishFinder1 {
    private static final byte[] digits = { 3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5, 8, ...

    private static class PilishMatch {
        int digits;
        String text;
        ...
    }

    // determine if this word is a match for this position in the digits of pi
    // returns the number of digits matched.
    private static int matches(int wordLen, int position) throws Exception {...}
    ...

    // take in a file handle, return the longest match
    public PilishMatch findLongestSequence(File file) {
        ...
        return new PilishMatch(longestMatchingDigits, returnValue);
    }

    public static void main(String[] args) {
        ...
        long startTime = System.currentTimeMillis();

        File folder = new File(args[0]);
        File[] listOfFiles = folder.listFiles();
        for (int i = 0; i < listOfFiles.length; i++) {
            if (listOfFiles[i].isFile()) {
                PilishMatch longestInThisFile = finder.findLongestSequence(listOfFiles[i]);
                if (longestInThisFile.digits > longestFound.digits) {
                    longestFound = longestInThisFile;
                    fileFoundIn = listOfFiles[i].getName();
                }
            }
        }

        long overallTime = System.currentTimeMillis() - startTime;
        ...
    }
}

Our initial program takes a single argument pointing us to a directory, checks each file in that directory one by one for a sequence of Pilish text, and returns to us the longest sequence found in all files. It’s simple, small, and you can try it on your own favorite set of files. When I run this on a local directory of over 2000 files taking up ~250Mb on my machine, it takes about 6.5 seconds to complete.

When the input data is local and small, as with this test data, the local program will always outperform the distributed program. But when working with large volumes of distributed data, a distributed program will produce an answer where the local program cannot. To handle a large set of files spread across a cluster, we’ll modify our program by sourcing the files from HDFS rather than local disk, and use Spark to load those files into a RDD, which we can then search. Here is our new Spark-enabled PilishFinder program:

public class PilishFinder2 {
     ...
    // take in a file name and contents, return the longest match
    public PilishMatch findLongestSequence(Tuple2<String, String> file) {
        ...
        return new PilishMatch(longestMatchingDigits, returnValue);
    }

    public static void main(String[] args) {
        ...
        SparkConf conf = new SparkConf().setAppName("PilishFinder");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<String, String> filesAndNamesRDD = sc.wholeTextFiles(args[0]);
        PilishMatch longestFound = new PilishMatch(0, null);
        String fileFoundIn = null;
        Iterator<Tuple2<String, String>> iterator = filesAndNamesRDD.toLocalIterator();
        PilishFinder2 finder = new PilishFinder2();
        while (iterator.hasNext()) {
            Tuple2<String, String> fileNameContents = iterator.next();
            PilishMatch longestInThisFile = finder.findLongestSequence(fileNameContents);
            if (longestInThisFile.digits > longestFound.digits) {
                longestFound = longestInThisFile;
                fileFoundIn = fileNameContents._1;
            }
        }
        ...
    }
}

When I run this on a simple two machine cluster, with the same files now in HDFS, it takes about 66.5 seconds to run, or about 10x the time of the original program, not including startup and shutdown of Spark itself. We should expect some slowdown by transitioning to HDFS, but this seems excessive, and it is.

The problem is in our main class. Our driver program properly created a RDD in the cluster, but we then created an iterator which brings the contents of those files, one by one, over to the driver for processing. So although this is a valid Spark program, it’s not a good one, and this is a pattern I have seen in multiple Spark programs. One way to detect this situation is if your driver program uses a lot of CPU when the rest of the cluster is unexpectedly idle.

Let’s modify our program again, to scan each file in the executors, rather than in the driver. The main change we need to make is to process the files in-place by using a RDD.map(..) function, and determine the longest one with the RDD.max(..) operation. To do this, we need to implement the Function and Comparator interfaces in our class so that we can pass it along as an argument to map and max, and Serializable so that it can be transmitted from the driver to the executors.

public class PilishFinder3 implements Function<Tuple2<String, String>, PilishFinder3.PilishMatch>, Comparator, Serializable {
    private static final long serialVersionUID = -6530967095767098059L;
    private static final byte[] digits = { 3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5, 8, ...

    public static class PilishMatch implements Serializable {
        private static final long serialVersionUID = -7898517200702663174L;
        int digits;
        String text;
        String fileName;
        ...
    }

    // take in a file name and contents, return the longest match
    public PilishMatch(int digits, String text, String fileName) {
        ...
        return new PilishMatch(longestMatchingDigits, returnValue, file._1);
    }

    @Override public PilishMatch call(Tuple2<String, String> file) throws Exception {
        return findLongestSequence(file);
    }

    @Override public int compare(PilishMatch o1, PilishMatch o2) {
        return o1.digits - o2.digits;
    }

    public static void main(String[] args) {
        ...
        SparkConf conf = new SparkConf().setAppName("PilishFinder");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<String, String> filesAndNamesRDD = sc.wholeTextFiles(args[0]);
        PilishFinder3 finder = new PilishFinder3();
        PilishMatch result = filesAndNamesRDD.map(finder).max(finder);
        ...
    }
}

This last version takes just over 22 seconds on the same machines, a 3x speedup. I’m sure that there are additional optimizations that could be made to speed it up even further, and I encourage you to try this program on your own favorite data set.  All three versions of the source are available here.

Here are a few 5-digit sequences I found:

From the movie “Pulp Fiction”: “Can I have a towel
From the book “Harry Potter 3: Prisoner of Azkaban”: “But I wish I could

Spark Technology Center

Newsletter

Subscribe to the Spark Technology Center newsletter for the latest thought leadership in Apache Spark™, machine learning and open source.

Subscribe

Newsletter

You Might Also Enjoy