REPL

Apache Spark™ Kernel Architecture

In the first part of the Apache Spark™ Kernel series, we stepped through the problem with enabling interactive applications against Apache Spark and how the Spark Kernel solved this problem. This week, we will focus on the Spark Kernel’s architecture: how we achieve fault tolerance and scalability using Akka, why we chose ZeroMQ with the IPython/Jupyter message protocol, what the layers of functionality are in the kernel (see figure 1 below), and elaborate on an interactive API from IPython called the Comm API.

Spark Kernel Architecture Figure 1

Akka

The Spark Kernel achieves its fault tolerance and scalability through the use of the Akka framework. Furthermore, the Akka framework provides an easy-to-extend concurrency model and a developer-friendly way to isolate code logic.

  1. Fault tolerance – Akka’s recovery model (based on Erlang) is “fail fast and restart,” meaning that when an actor encounters an error it should fail immediately (rather than hiding the problem) and then restart in a clean state
  2. Scalability – Akka’s actors are easily scalable and designed such that your code does not need to understand if there is one or one hundred instances of an actor making load balancing and avoiding bottlenecks simple
  3. Concurrency – the message passing system uses actors to process messages and provides a reliable and easy-to-understand form of concurrency
  4. Isolation – code can easily be isolated at the actor level which makes maintaining and extending the kernel easy and painless

Communication

The Spark Kernel uses ZeroMQ as its messaging middleware using TCP sockets and implements the IPython message protocol as a way to easily support code execution and other interactive forms of communication between an application and an Apache Spark cluster.

ZeroMQ was chosen as the middleware due to its responsiveness and behavioral design such as the pub/sub model where a publisher (pub) socket will send messages to all subscriber (sub) sockets. Furthermore, ZeroMQ is the middleware used by IPython/Jupyter to communicate with kernels. This means that the Spark Kernel can be plugged in as the backend for notebooks without any changes to the IPython codebase.

The IPython message protocol was chosen for two main reasons: it enables the kernel to run against IPython and it provides an interactive, language-agnostic protocol. Furthermore, the IPython protocol provides flexibility in the kinds of messages it supported including code execution, data inspection, and data synchronization between frontend and backend components.

Layers

The Spark Kernel is architected in layers, where each layer has a specific purpose in the processing of requests.

  • ZeroMQ handles intercepting ZeroMQ messages and passes them to the message parsing layer

  • Message Parsing and Validation converts between ZeroMQ messages and internally-understood kernel messages as well as ensuring that messages are valid by comparing the included HMAC signature against an HMAC signature generated by the kernel

  • Routing forwards messages to proper handlers (if incoming) or sockets (if outgoing) based on message type

  • Message Handling processes individual messages based on type to perform the associated action such as executing code or inspecting variables

  • Interpreter performs code execution, passing the result back to the associated message handler

ZeroMQ Layer

This layer contains five TCP sockets labelled heartbeat, shell, control, stdin, and iopub. Each socket has its own purpose following the IPython message protocol.

  1. The heartbeat socket is used to ensure that the kernel is still alive. Its only function is to receive messages and immediately echo them back to the sender.
  2. The shell socket is the primary form of communication from the client to the kernel. Typically, the client sends code execute requests and Comm messages (see Comm API section) to this socket.
  3. The control socket enables clients to send high-priority messages, such as shutting down the kernel, that will not be delayed or blocked by processing other messages.
  4. The stdin socket is the reverse of the shell socket, i.e. it is used by the kernel to request information from the client. This socket is typically used when the kernel requires more information from the client to complete a task.
  5. The iopub socket acts as a broadcaster to all listening clients. Its role is to communicate any side effects or Comm messages out to connected notebooks and other clients.

Message Parsing and Validation Layer

This layer both parses and validates incoming messages. Parsing involves translating ZeroMQ bytes into JSON strings and then converting the JSON to an internal kernel message structure. The validation comes in the form of HMAC used to verify data integrity and authentication of messages, using SHA-256 with the incoming message and a secret key to compare against a signature included with the incoming message. Validation is disabled if the secret key provided to the kernel is empty.

Routing Layer

As the smallest layer in the kernel, the routing layer’s only job is to forward a message to the proper Akka actor based on the message type. This involves sending incoming messages to associated message handlers and sending outgoing messages through the proper sockets.

Message Handling Layer

For each incoming message type in the IPython message protocol, there is an associated Akka actor whose only job is to process messages of that type. For example, code execution is sent to the execute request handler while a Comm open would be sent to the Comm open handler. The message handling layer provides a simple way to isolate the responsibilities of code in the kernel and provide an easier way to scale out more frequent incoming messages.

Interpreter Layer

The interpreter layer contains an interpreter interface that executes code and retrieves the contents of variables to be sent back to the client. Currently, the Spark Kernel has only a single interpreter for Scala, although we are investigating adding additional interpreters for other languages.

The interpreter layer is separate from the message handling layer to better protect against failures by executing the code in different Akka actors. Furthermore, placing the logic in separate actors reduces the complexity of the code by following the “single responsibility” principle of object oriented programming.

When the Spark cluster needs access to code generated by the interpreter, the cluster accesses an HTTP server serving class files created by the Scala interpreter.

Finally, objects like the Spark Context – used to construct and manipulate RDDs on the Spark cluster – are bound inside the interpreter for users of the kernel to reference in their code snippets.

Summary of Layers

The five layers of the Spark Kernel work together to provide a fault tolerant and scalable system to receive code requests to run again an Apache Spark cluster. Messages are received and sent via the ZeroMQ Layer with messages being interpreted and validated by the Message Parsing and Validation Layer. Requests and responses are sent by the Routing Layer to the appropriate handler in the Message Handling Layer to perform the task. Finally, the Interpreter Layer performs the code compilation and execution, providing gateways like the Spark Context to tap into the resources of the Spark cluster.

Comm API

The Spark Kernel provides another form of communication than code execution through the Comm API. As part of the IPython message protocol, the Comm API allows developers to specify custom messages to communicate data and perform actions on both the frontend (client) and backend (kernel). See Figure 2 below for a diagram of the types of Comm API message passing between the frontend and backend.

Figure 2 Figure 2

Benefits

The Comm API provides two main benefits over normal code execution:

  1. Flexibility through the following: 1. Bidirectional communication (client or kernel can start sending messages)
  2. Ability to programmatically define messages and their interactions (discussed in the upcoming client library post)
  3. Performance thanks to the following: 1. Avoids recompiling code
  4. Does not keep execution state

Comm Open

The first message type of the Comm API is open. The goal of this message is to establish an association between the frontend and backend using the specified id to sort messages.

When an open is received, the target name will be checked to ensure it has been registered. If so, the association will be established and any callbacks tied to open messages for the target will be invoked, provided with JSON data if included in the message. If the target has not been registered, a close message will be sent immediately.

The following is an illustration of the JSON structure of a Comm “open” message.

{
    "comm_id" : "u-u-i-d",
    "target_name" : "my_comm",
    "data" : {}
}

One way to picture this message is the same as creating an object in OOP. When you send an open message, the other side treats it like new target_name. If the target name does not exist, a close is sent back immediately. If the target name does exist, a link is established where any future messages with a matching id will be processed.

Comm Msg

The second message type of the Comm API is message. The goal of this message is to send data from one side to the other and acts as the primary form of communication using the Comm API.

The message requires a data field that will be passed to the other side. When a message is received, any associated callbacks will be invoked and provided the data as an argument.

The following is an illustration of the JSON structure of a Comm “msg” message.

{
    "comm_id" : "u-u-i-d",
    "data" : {}
}

The easiest way to view this message is as the invocation of a method for an object marked by the specified id. If the object has not been created (opened), then the message is ignored.

Comm Close

The third and final message type of the Comm API is close. The goal of this message is to sever the association between the frontend and backend for the specified id.

One side will initiate the close event, while the other side will receive the message (indicated by the JSON) and cut the link, invoking any associated close callbacks and providing those callbacks with optional data.

The following is an illustration of the JSON structure of a Comm “close” message.

{
    "comm_id" : "u-u-i-d",
    "data" : {}
}

This message can be viewed as the cleanup when an object is being deallocated. Once the close is received, the instance matching the specified id will be removed (link severed) and the optional data will be provided to any callbacks (like invoking a destructor in a language like C++).

Coming up

For the last post in this Spark Kernel series, we will cover the Spark Kernel client library. Specifically, the post will explain how to initialize the client library against a remote Spark Kernel and how to use the library to communicate with the kernel.

You can find the Spark Kernel project on Github here: ibm-et/spark-kernel

Spark Technology Center

Newsletter

Subscribe to the Spark Technology Center newsletter for the latest thought leadership in Apache Spark™, machine learning and open source.

Subscribe

Newsletter

You Might Also Enjoy