Custom Spark Streaming DStreams and Receivers

At the very heart of Apache Spark Streaming lies the concept of DStream, which is a wrapper around a continuous flow of RDDs. Each DStream contains a list of other DStreams that it depends on, a function to convert its input RDDs into output ones, and finally, a time interval at which to invoke the function. DStreams are created by either manipulating existing ones, by say applying a map or filter function (which internally create MappedDStreams and FilteredDStreams, respectively), or by reading from an external source, such as a file or message queue (the base class for which is InputDStream).

InputDStreams expose start() and stop() methods which need to be implemented by subclasses to sign-post the start and end of the data stream. It is important to highlight that an InputDStream is executed on the same node as the driver program. Therefore, data sources that can be handled by one node can directly inherit from this class. For instance, FileDStream that monitors an HDFS directory and creates an RDD for all of the files that were added to the directory in that batch interval inherits directly from this class. On the other hand, high volume data sources can easily stress the driver node, turning it into a bottleneck for the processing pipeline. For such sources, Spark Streaming exposes the ReceiverInputDStream interface (which inherits from InputDStream). The Spark codebase contains out of the box receivers for popular sources such as Kafka, Flume, and MQTT, which employ the ReceiverInputDStream interface under the hood.

ReceiverInputDStreams are executed on worker nodes. Internally they rely on Receiver objects that are shipped out to worker nodes (Custom ReceiverInputDStreams need to override the getReceiver() method to return the associated Receiver object). Each custom Receiver, in turn, needs to override onStart() and onStop() methods, which are invoked by the subsystem at receiver initialization and shutdown time. In addition, receivers can override an optional preferredLocation() method with the hostname of a preferred execution node. Furthermore, they can also control their own lifecycle by invoking stop() and restart() methods or by reporting errors to the driver program via reportError(). Data items are pushed to the receiver via store(), which internally aggregates items together before clumping them into a data block in the worker’s memory.

In this post we will create a custom ReceiverInputDStream and Receiver for Apache Qpid[1], which is a messaging system with support for AMQP (Advanced Message Queuing Protocol). We will use its Java client and Java JMS to interface with the Qpid broker using the Scala API for Spark Streaming.

The first step is to create a QpidReceiver object that extends Receiver:

class QpidReceiver (
    storageLevel: StorageLevel,
    brokerURI: String,
    topic: String,
    create: Boolean
) extends Receiver[String](storageLevel) with Logging {
    var connection: Connection = _
    var consumer: MessageConsumer = _

The constructor accepts 4 arguments:

  1. storageLevel: The storage level of the generated InputDStream (and in turn its RDDs)
  2. brokerURI: The location of the Qpid broker. Example: “amqp://guest:guest@test/?brokerlist=’tcp://localhost:5672′”
  3. topic: Name of the Qpid topic
  4. create: Whether to create the Qpid topic if it does not exist at the broker

We then override the onStart() method that will be invoked when the receiver is initialized. This method sets up the connection to the broker and initializes all associated objects.

def onStart() {
    var addr = "ADDR:" + topic
    if (create == true) {
        addr += "; {create: always}"
    try {
        connection = new AMQConnection(brokerURI)
        var session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
        var queue: Destination = new AMQAnyDestination(addr)
        consumer = session.createConsumer(queue)
        logInfo("Connected to Qpid Server")
        logInfo("Qpid Connection started")
    } catch {
    case e: org.apache.qpid.AMQConnectionFailureException =>
        //try restarting
        restart("Error! Problems while connecting", e)

This piece of code sets up the connection to the broker (line 8) and then creates a session (line 9) with AUTO_ACKNOWLEDGE, which will enable the client to acknowledge each received message to the broker. A message consumer for the particular queue is then created (lines 10-11). In case of a connection exception, we restart the receiver (line 18).

Ordinarily, we would need to create a separate thread to listen to messages and then store() them, but the Qpid API exposes an asynchronous consumer out of the box that we will employ.

setMessageListener(newMessageListener() {
    override def onMessage(message: Message) {
        var bm: BytesMessage = message.asInstanceOf[BytesMessage]
        var data: Array[Byte] = new Array[Byte](bm.getBodyLength().intValue)
        store(new String(data))

We simply create a new MessageListener and register it with the Qpid consumer (line 1). Its overridden onMessage() method enables us to receive a message (line 5) and then simply store it within the receiver instance (line 6). Note that this registration needs to be performed within the onStart() method.

To complete the state machine, we also override the onStop() method of the receiver:

def onStop() {
    logInfo("Disconnected from Qpid Server")

Users do not need to directly manipulate this receiver so we wrap it up in a ReceiverInputDStream:

class QpidInputDStream (
    @transient ssc_ : StreamingContext,
    brokerURI: String,
    topic: String,
    create: Boolean
) extends ReceiverInputDStream[String](ssc_) with Logging {
    def getReceiver(): Receiver[String] = {
        new QpidReceiver(storageLevel, brokerURI, topic, create)

As mentioned before, each custom ReceiverInputDStream needs to return its receiver object within the getReceiver() method which is what QpidInputDStream does. So far we have only implemented an InputDStream for Scala applications. Extending it to support Java is trivial as the Java interface only varies in terms of StreamingContext. The following piece of code will enable us to invoke the same QpidInputDStream from within both Scala and Java:

object QpidUtils {
    def createStream(
        ssc: StreamingContext,
        storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
        brokerURI: String,
        queue: String,
        create: Boolean
    ): DStream[String] = {
        new QpidInputDStream(ssc, storageLevel, brokerURI, queue, create)
    def createStream(
        jssc: JavaStreamingContext,
        storageLevel: StorageLevel,
        brokerURI: String,
        queue: String,
        create: Boolean
    ): JavaDStream[String] = {
        createStream(jssc.ssc, storageLevel, brokerURI, queue, create)

As an illustrative usage example, the following snippet of code uses this QpidInputDStream and then employs the canonical stateful word count application:

val lines = QpidUtils.createStream(ssc, StorageLevel.MEMORY_ONLY, brokerUri,tName, true)
val words = lines.flatMap(_.split(" "))
val wordEmit = => (x, 1))
val wordCounts = wordEmit.updateStateByKey(…)

We wrap up on the note that some extremely high volume data sources might require more than one receiver to keep up with the data rate, in that case, it might be a good practice to load balance the data across many topics at the producer end and then employ one receiver per topic. These *n *InputDStreams can be then be merged into a single one using the union() transform for collective consumption by the subsequent processing pipeline.

You can download the entire source code here.


Spark Technology Center


Subscribe to the Spark Technology Center newsletter for the latest thought leadership in Apache Spark™, machine learning and open source.



You Might Also Enjoy