The Art of Side Effects: Curing Apache Spark Streaming’s Amnesia (Part 1/2)

Guy Pearce’s character in Christopher Nolan’s psychological thriller, Memento, suffers from anterograde amnesia due to a personal tragedy, which inhibits his ability to form new memories. Throughout the movie his character’s standard introduction consists of “I have this condition”.

If Spark Streaming could talk it would describe a similar condition. Spark Streaming batches are stateless by design: all transformations during a batch only affect the RDDs in that batch. As a result, applications are side effect free: running the same application an infinite number of times results in the same behavior and output. Similar to functional programming, this simplifies debugging and reasoning about the state of a program, as input and output paths are deterministic.

While side effect free applications generally have many advantages, this complicates the design of a large class of applications that requires batch-invariant state. For instance, keeping track of the monthly max, min, and average value of a stock value and triggering a stock purchase if the average value exceeds a threshold. Similarly, building a dynamic user action profile on an e-commerce website to perform A/B testing also requires remembering previous aspects of state.

In this 2-part blog post, we’ll sketch some recipes that apply stateful operations to batch-invariant RDDs. In this first part, we’ll look at two solutions: 1) static variables and 2) the updateStateByKey() transform to maintain state across RDDs. In the second, we’ll employ accumulators and external storage (Redis) to achieve the same goal.

The running use case for us comes from the financial industry to illustrate Stock Market 2.0, which is driven by access to real time stock market tickers and feeds. Specifically, we’ll grab the current stock value and volume for 10 major technology companies from Yahoo Finance and calculate various statistics. Yahoo Finance exposes both a web service and an advanced querying service enabled by YQL (Yahoo Query Language). YQL1 provides a SQL-like interface to query data from Yahoo APIs over HTTP. All the recipes in this chapter leverage its JSON response, which contains close to 80 fields2.

In addition, Spark Streaming out of the box does not contain a connector to generate RDDs from an HTTP source. To this end, we’ll make use of a custom HttpInputDStream creator3.

Static Variables

The simplest state is in the form of counters. For instance, how can we keep track of the max and min stock volume across all 10 securities, count the number of times any stock price has hit 500, and print a message when this counter has reached 1000? To this end, we can use static variables in the driver process in tandem with foreachRDD. Specifically, we can take advantage of the fact that foreachRDD is invoked in the driver process so if we update the value of any static variables, the state will be applicable across RDDs. The code below uses this approach.

Using static counters and foreachRDD to maintain statistics across RDDs/batches:


1. var globalMax: AtomicLong = new AtomicLong(Long.MinValue)   
2. var globalMin: AtomicLong = new AtomicLong(Long.MaxValue)   
3. var globalCounter500: AtomicLong = new AtomicLong(0)   
4. 
5. HttpUtils.createStream(ssc, url = "https://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20(%22IBM,GOOG,MSFT,AAPL,FB,ORCL,YHOO,TWTR,LNKD,INTC%22)%0A%09%09&format=json&diagnostics=true&env=http%3A%2F%2Fdatatables.org%2Falltables.env",   
6. interval = batchInterval)   
7. .flatMap(rec => {   
8. implicit val formats = DefaultFormats   
9. val query = parse(rec) \ "query"   
10. ((query \ "results" \ "quote").children)   
11. .map(rec => ((rec \ "symbol").extract[String], (rec \ "LastTradePriceOnly").extract[String].toFloat, (rec \ "Volume").extract[String].toLong))   
12. })   
13. .foreachRDD(rdd => {   
14. val stocks = rdd.take(10)   
15. stocks.foreach(stock => {   
16. val price = stock._2   
17. val volume = stock._3   
18. if (volume > globalMax.get()) {   
19. globalMax.set(volume)   
20. }   
21. if (volume < globalMin.get()) {   
22. globalMin.set(volume)   
23. }   
24. if (price > 500) {   
25. globalCounter500.incrementAndGet()   
26. }   
27. })   
28. if (globalCounter500.get() > 1000L) {   
29. println("Global counter has reached 1000")   
30. println("Max ----> " + globalMax.get)   
31. println("Min ----> " + globalMin.get)   
32. globalCounter500.set(0)   
33. }   
34. })

There are 2 noteworthy aspects of the code above:

The use of atomic variables to ensure that calculations remain atomic even in the face of concurrent access and modification.

Ingestion of projected data into the driver process (line 14). This works because a) we know the number of stock records apriori and b) the amount of data is small enough to not overwhelm the driver node heap. Note that because all the data has been materialized in the driver program, the inner foreach (line 15) is executed in the driver JVM not on the worker executors. That is why we can maintain global numbers.

This approach works well if we know, a) the number of counters apriori and b) the amount of global state is small enough to fit within the driver JVM memory. What happens when one or both of these statements is not true? For instance, if we wanted to keep the max and min for each stock value within the data stream but did not know the stock symbols of interest beforehand?

Let’s look at alternatives.

updateStateByKey()

The most obvious choice is updateStateByKey(), which allows us to track the state of keys across RDDs. Using a custom update function, we can reperform the max, min, and count calculation in each invocation. This is exactly what is presented in the code below. (Be sure to set a checkpoint directory.)

In contrast to the previous example, all state manipulation takes place within the update function (line 12-22). The batch-invariant variable for each key is a tuple of the form: (min, max, count). The current value of this tuple is passed to the update function as the second argument. This can further be enhanced to perform any arbitrary computation and hold any collection, such as a map.

Leveraging updateStateByKey() for per key statistics across RDDs:


1. HttpUtils.createStream(ssc, url = "https://query.yahooapis.com /v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where %20symbol%20
in%20(%22IBM,GOOG,MSFT,AAPL,FB,ORCL,YHOO,TWTR,LNKD, INTC%22)%0A%09%09&format=json&diagnostics=true&env=http%3A%2F%2Fdatatables.org%2Falltables.env",   
2.   interval = batchInterval)   
3.   .flatMap(rec => {   
4. implicit val formats = DefaultFormats   
5. val query = parse(rec) \ "query"   
6. ((query \ "results" \ "quote").children)   
7. .map(rec => ((rec \ "symbol").extract[String], ((rec \ "LastTradePriceOnly").extract[String].toFloat, (rec \ "Volume").extract[String].toLong)))   
8.   })   
9.   .updateStateByKey(updateState)   
10. .print()
11.    
12. def updateState(values: Seq[(Float, Long)], state: Option[(Long, Long, Long)]): Option[(Long, Long, Long)] = {   
13. val volumes = values.map(s => s._2)   
14. val localMin = volumes.min   
15. val localMax = volumes.max   
16. val localCount500 = values.map(s => s._1).count(price => price > 500)   
17. val globalValues = state.getOrElse((Long.MaxValue, Long.MinValue, 0L)).asInstanceOf[(Long, Long, Long)]   
18. val newMin = if (localMin < globalValues._1) localMin else globalValues._1   
19. val newMax = if (localMax > globalValues._2) localMax else globalValues._2   
20. val newCount500 = globalValues._3 + localCount500   
21. return Some(newMin, newMax, newCount500)   
22. }

updateStateByKey() works well if the number of keys and the amount of RDD-invariant state is small. This is primarily because RDDs and DStreams are immutable. On the plus side, this simplifies fault tolerance: Spark can regenerate any lost RDD by replaying data from the checkpoint. On the downside, this means that any state maintained via StateDStream (the DStream that makes updateStateByKey operations possible) needs to be regenerated in every batch. Imagine if we are tracking a million keys but only a handful of them need to be updated in every micro-batch. It is clearly overkill and suboptimal to create new copies of keys that have not been mutated. Can we do better?

The answer to that question and more will be presented in Part 2 of this 2-part series. Stay tuned.


You can find more such real-world, data-driven applications, code, and recipes in Zubair’s upcoming book “Pro Spark Streaming: The Zen of Real-time Analytics using Apache Spark”, which is going to be published by Apress Publishing in June, 2016. ISBN-13: 978-1484214800.

You can pre-order it from Amazon as well as the publisher’s website:
Amazon: http://www.amazon.com/Pro-Spark-Streaming-Real-time-Analytics/dp/1484214803
Apress: http://www.apress.com/9781484214800

Spark Technology Center

Newsletter

Subscribe to the Spark Technology Center newsletter for the latest thought leadership in Apache Spark™, machine learning and open source.

Subscribe

Newsletter

You Might Also Enjoy