streaming

The Art of Side Effects: Curing Apache Spark™ Streaming’s Amnesia (Part 2/2)

Maintaining batch-invariant state in Apache Spark™ Streaming is non-trivial. In Part 2 of this 2-part series, we'll implement two other solutions using accumulators and external storage, respectively. (Part 1 is here.)

Accumulators

Accumulators are shared variables that support associative operations. This enables them to perform operations in parallel. One key property of accumulators is that only the driver program can read the value of the accumulator while workers can only "add" to it. They are extremely useful for calculating counts and sums across RDDs. In addition to these simple operations, accumulators can also contain Scala collections. For instance, one can create a HashMap based accumulator via:

mapAcc = StreamingContext#sparkContext.accumulableCollection(mutable.HashMap[String, Int]()) 

and then add items to to it:

mapAcc += (keyStr -> valInt)

This insertion of values is typically performed within a foreachRDD transform.

In our particular use-case, we need to keep track of a few statistics. So rather than embedding this logic within the core streaming flow, let's create a custom accumulator to provide this functionality. Spark provides two interfaces for implementing custom accumulators to this end: AccumulatorParam and AccumulableParam. The former is used when the value to be added to the accumulator is the same as the accumulated value. For example, if the accumulator is a 2-tuple of ints, then only 2-tuple ints can be added to it. In contrast, AccumulableParam allows a different type for the added value. In fact, under the hood, AccumulatorParam is just syntactic sugar atop AccumulableParam.

We will need to employ AccumulableParam, as our input value ("stockPrice": Float, "stockVolume": Long) is different than the values that we need to accumulate ("maxVolume": Long, "minVolume": Long, "priceCounter": Long). In addition, these metrics need to be maintained per stock symbol.

Each concrete implementation of AccumulableParam needs to override the following 3 methods:

  • zero(): The identity value of the accumulator

  • addAccumulator(): Addition of a single value

  • addInPlace(): Merging of two accumulators. Invoked each time the values of sharded accumulators from different tasks need to be aggregated

The code for our custom StockAccum accumulator is presented in Listing 3. Internally it maintains a hash map indexed by the stock symbol to hold per stock stats. The second type parameter to AccumulableParam needs to represent the value to be added, which in this particular case is a 2-tuple of the form:

(String, (Float, Long)): ("stockSym", ("stockPrice", "stockVolume")) 

In the identity initialization method (line 2) we create a new HashMap. To enable two accumulators to be merged, the addInPlace method (line 5), compares each stock symbol in the two maps and copies over the max and min of the two to the first map, respectively. For the counter, it simply adds the values from the maps. Finally, the addAccumulator method (line 15) adds the values to the previous set of values for a particular stock symbol (or initializes it if the symbol is encountered the first time). The counter is incremented by checking if the current stock price exceeds 500 (line 19).

Listing 3: Custom accumulator to keep track of global stock stats

  1. object StockAccum extends AccumulableParam[mutable.HashMap[String, (Long, Long, Long)], (String, (Float, Long))] {
  2. def zero(t: mutable.HashMap[String, (Long, Long, Long)]): mutable.HashMap[String, (Long, Long, Long)] = {
  3. new mutable.HashMap[String, (Long, Long, Long)]()
  4. }
  5. def addInPlace(t1: mutable.HashMap[String, (Long, Long, Long)], t2: mutable.HashMap[String, (Long, Long, Long)]): mutable.HashMap[String, (Long, Long, Long)] = {
  6. t1 ++ t2.map {
  7. case (k, v2) => (k -> {
  8. val v1 = t1.getOrElse(k, (Long.MaxValue, Long.MinValue, 0L))
  9. val newMin = if (v2.1 < v1.1) v2.1 else v1.1
  10. val newMax = if (v2.2 > v1.2) v2.2 else v1.2
  11. (newMin, newMax, v1.3 + v2.3)
  12. })
  13. }
  14. }
  15. def addAccumulator(t1: mutable.HashMap[String, (Long, Long, Long)], t2: (String, (Float, Long))): mutable.HashMap[String, (Long, Long, Long)] = {
  16. val prevStats = t1.getOrElse(t2._1, (Long.MaxValue, Long.MinValue, 0L))
  17. val newVals = t2._2
  18. var newCount = prevStats._3
  19. if (newVals._1 > 500.0) {
  20. newCount += 1
  21. }
  22. val newMin = if (newVals.2 < prevStats.1) newVals.2 else prevStats.1
  23. val newMax = if (newVals.2 > prevStats.2) newVals.2 else prevStats.2
  24. t1 += t2._1 -> (newMin, newMax, newCount)
  25. }
  26. }

Listing 4 shows the usage of the accumulator. It replaces the foreachRDD transform in Listing 1. (Listing 1 appears in part 1 of this post.) Within the inner foreach (which is executed on worker nodes), we add values to the accumulator (line 4). These values are subsequently displayed in the driver process by printing the hash map from the accumulator (line 7).

Listing 4: Adding values to an accumulator

  1. val stateAccum = ssc.sparkContext.accumulable(new mutable.HashMap[String, (Long, Long, Long)]())(StockAccum)
  2. .foreachRDD(rdd => {
  3. rdd.foreach({ stock =>
  4. stateAccum += (stock.1, (stock.2.1, stock.2._2))
  5. })
  6. for ((sym, stats) <- stateAccum.value.to) printf("Symbol: %s, Stats: %s\n", sym, stats)
  7. })

As you can tell from the sample code, accumulators are very easy to use and reason about but at the same time, their functionality is limited: 1) only the driver process can read from them, and 2) only associative operations can be performed. What if the application needs to keep arbitrary state and operations?

External Solutions

One option for storing global values is to explicitly turn them into side effects and keep them in external storage. Under this design, in each batch, previous state would be read from external storage, transformed, and then written back. At the same time, this external storage needs to ensure low latency to match the performance of native in-JVM data structures. One such option is Redis, an in-memory key-value store.

Redis

Redis, REmote DIctionary Server, is simply an in-memory data structure directory. It supports a wide range of common data types including, lists, sets, and hash maps. In addition, Redis also contains out of the box implementations of advanced structures such as bitmaps and hyperloglogs. It also enables direct manipulation of these data types. For instance, the hash data structure supports set and get operations. All of these structures are stored in memory for efficient look up. To ensure fault-tolerance, they are periodically synced with disk.

Transactions are atomic at the command level but different commands can explicitly be clumped into a single atomic transaction. Redis leverages the asynchronous master-slave replication mode for redundancy and scalability. Furthermore, it supports a cluster mode wherein data is sharded across nodes.

Continuing our running application, we will leverage Redis to store stock volume and price metrics. To this end, for each stock symbol, the application will store the min and max volume and the price counter in a hash map. Client libraries for Redis exist for all major programming languages. For Java, the package of choice is Jedis, which is what we will employ. To this end, once Redis has been set up, add the following to your build definition file:

libraryDependencies += "redis.clients" % "jedis" % "2.7.3"

Listing 5 shows how Redis can be leveraged to store arbitrary data structures from Spark Streaming applications. In the per partition foreach (line 2), we connect to the Redis server using a Jedis client connection object that takes the hostname of the server as input (line 3). Then for each record, we first need to check if the stock symbol key exists in Redis (line 4 and 5). If it does not, the value tuple is initialized with default values. Otherwise, the previous values stored in Redis are updated (line 8-17) and written back (line 18). In each batch interval, these values are also emitted to standard output in the driver JVM (line 24-26). If the number of keys is small, it might be more efficient to batch (or partition) them in single pipelined call to Redis (obtained via Jedis#pipelined()). Wrapping Redis operations in a foreachRDD operation allows us to make use of the former to store boundless, batch invariant state.

Listing 5: Keeping Spark Streaming application state in Redis

  1. .foreachRDD(rdd => {
  2. rdd.foreachPartition({ part =>
  3. val jedis = new Jedis(hostname)
  4. part.foreach(f => {
  5. val prev = jedis.hmget(f._1, "min", "max", "count")
  6. if (prev(0) == null) {
  7. jedis.hmset(f._1, mutable.HashMap("min" -> Long.MaxValue.toString, "max" -> Long.MinValue.toString, "count" -> 0.toString))
  8. } else {
  9. val prevLong = prev.toList.map(v => v.toLong)
  10. var newCount = prevLong(2)
  11. val newPrice = f.2.1
  12. val newVolume = f.2.2
  13. if (newPrice > 500.0) {
  14. newCount += 1
  15. }
  16. val newMin = if (newVolume < prevLong(0)) newVolume else prevLong(0)
  17. val newMax = if (newVolume > prevLong(1)) newVolume else prevLong(1)
  18. jedis.hmset(f._1, mutable.HashMap("min" -> newMin.toString, "max" -> newMax.toString, "count" -> newCount.toString))
  19. }
  20. })
  21. jedis.close()
  22. })
  23. val jedis = new Jedis(hostname)
  24. jedis.scan(0).getResult.foreach(sym => println("Symbol: %s, Stats: %s".format(sym, jedis.hmget(sym, "min", "max", "count").toString)))
  25. jedis.close()
  26. })

That brings us to the end of this 2-part blogspot. All code can be accessed online: https://github.com/ZubairNabi/prosparkstreaming/tree/master/Chap6

I hope now you too can master the art of side-effects in Spark Streaming.


You can find more such real-world, data-driven applications, code, and recipes in Zubair’s upcoming book “Pro Spark Streaming: The Zen of Real-time Analytics using Apache Spark”, which is going to be published by Apress Publishing in June, 2016. ISBN-13: 978-1484214800.

You can pre-order it from Amazon as well as the publisher’s website:
Amazon: http://www.amazon.com/Pro-Spark-Streaming-Real-time-Analytics/dp/1484214803
Apress: http://www.apress.com/9781484214800

Spark Technology Center

Newsletter

Subscribe to the Spark Technology Center newsletter for the latest thought leadership in Apache Spark™, machine learning and open source.

Subscribe

Newsletter

You Might Also Enjoy