REPL

From Config to Code with the Apache Spark Kernel Client Library

IBM innovator Chip Senkbeil offers a third post about the Spark Kernel project that IBM open-sourced in December. The project, which grew from the need to migrate and enable interactive applications on Spark, has evolved to include a Scala-based client library for communicating directly with the Kernel. No need to understand ZeroMQ or the IPython message protocol. The client library even lets you treat the Kernel as a remote service that you can run separately from the Spark cluster. Chip gives his step-by-step below.


In this third and final part of the Spark Kernel series (part 1, part 2), we will focus on the client library, a Scala-based library used to interface with the Spark Kernel. This library enables Scala applications to quickly communicate with a Spark Kernel without needing to understand ZeroMQ or the IPython message protocol. Furthermore, using the client library, Scala applications are able to treat the Spark Kernel as a remote service, meaning that they can run separately from a Spark cluster and use the kernel as a remote connection into the cluster.

Spark Kernel Client ArchitectureThe client library shares the majority of its code with the Spark Kernel’s codebase, meaning that it uses ZeroMQ and Akka underneath for message passing.

Before using the client library to submit code, the client needs to be initialized, being provided the connection details of the kernel. Furthermore, any initialized client should be shutdown when finished as this ensures that the underlying actor system and ZeroMQ connections are properly shutdown.

Initializing the Client

To initialize the client library, you need to first define the information needed for the client to connect to the kernel. This is provided via a Typesafe Config object.

The configuration object can be generated from a string in Scala fairly easily. The following is an example of a configuration, specifying the ports of the kernel (these can be any valid port), the valid IP address to listen on for incoming connections, the transport protocol (used by ZeroMQ underneath), and the signature scheme and key used for authentication and validation on both the client and kernel.

val kernelJsonInformation: String = """
    {
        "stdin_port":   48691,
        "control_port": 40544,
        "hb_port":      43462,
        "shell_port":   44808,
        "iopub_port":   49691,
        "ip": "127.0.0.1",
        "transport": "tcp",
        "signature_scheme": "hmac-sha256",
        "key": ""
    }
  """.stripMargin
val config = ConfigFactory.parseString(kernelJsonInformation)

Once a configuration object has been loaded, you need to create an instance of the client bootstrap, which serves as a factory for client connections. It takes the Typesafe Config we created earlier as the single argument.

val clientBootstrap = new ClientBootstrap(config) 
  with StandardSystemInitialization
  with StandardHandlerInitialization

Finally, initializing the client and establishing the connection is performed by executing the createClient method on the client bootstrap as seen below.

client.execute("sc.parallelize(1 to 10).reduce(_ + _)")

Executing Code

To execute code against the Spark Kernel, you specify the code as a string in Scala. The normal Java rules for embedding quotations and including unicode still apply to Scala. The execute method itself is asynchronous, meaning that it will not block the rest of your application waiting for a response.

val client = clientBootstrap.createClient()

However, executing code on its own does not enable you to acquire the result or respond to streaming data or unexpected errors. To react to responses from the kernel, you need to chain callback events to your code execution. The following are the three most common ways to obtain information back from the Spark Kernel after evaluating a code snippet.

client.execute("1 + 1")
    .onResult(result => println("Got result: " + result))
client.execute("1 / 0")
    .onError(err => println("Got error: " + err))
client.execute("""kernel.stream.sendAll("hello")""")
    .onStream(stream => println("Got stream: " + stream))

Comm API

The Comm API (see part two of the series) is available for use on both the client and kernel. The Comm API requires that all expected target names be registered on both the client and kernel before open/msg/close messages will be processed. Therefore, both the client and kernel should register the target names in the manner described below.

// For the client
client.comm.register("my target name")

// For the kernel
kernel.comm.register("my target name")

Once a target has been registered, to be able to respond to open, msg, and close messages, you need to attach handler callbacks to your registered targets. These handler callbacks are invoked whenever a message related to the target is received. The following provides an example of an interaction between the client and kernel using Comm messages.

// When the kernel receives a Comm open with the target "my target", it will
// send a Comm msg back that contains the response "Hello World!"
kernel.comm.register("my target").addOpenHandler { 
    (commWriter, commId, targetName, data) =>
        commWriter.writeMsg(Map("response" -> "Hello World!"))
}

// When the client receives a message for the target "my target", it will
// print out the message locally to standard out and then close the
// association
client.comm.register("my target").addMsgHandler {
    (commWriter, commId, data) =>
        println(data)
        commWriter.close()
}

// This would start a new link (logic for routing Comm messages using their
// specific ids) between the client and kernel for the target called
// "my target" from the client side (this could also be started from
// the kernel)
//
// You can think of creating the link as opening a file, which must be done
// before any other Comm operation, otherwise no messages can be sent
client.comm.open("my target")

Summary

In this post, we focused on the Spark Kernel’s client library, specifically on how it enables Scala applications to utilize the Spark Kernel without understanding ZeroMQ or the IPython message protocol. We stepped through examples of initializing the client library and connecting to a kernel, executing code and retrieving results using the client library, and passing messages between the client and kernel through the Comm API.

Overall, the Spark Kernel is able to provide quite a few benefits to the Apache Spark developer. The kernel enables applications to connect to a Spark cluster remotely without needing to expose the entire cluster if it is behind a firewall. As witnessed by the client library examples above, the Spark Kernel avoids the friction of repackaging and shipping jars such as with Spark Submit. Furthermore, the kernel removes the requirement of needing to store results into an external datastore.

With this, we have completed the third and final part of the Spark Kernel series. Through this series of posts, we hope that you have gained a better understanding of the Spark Kernel project, and we aim to continue to improve the project based on community interest and feedback.

Spark Technology Center

Newsletter

Subscribe to the Spark Technology Center newsletter for the latest thought leadership in Apache Spark™, machine learning and open source.

Subscribe

Newsletter

You Might Also Enjoy