apache spark

0 to Life-Changing App: New Apache SystemML API on Spark Shell

SystemML on Spark Shell? Yes!

A very simple way of using SystemML for all of your machine learning and big data needs. This tutorial will get you set up and running SystemML on the Spark Shell like a star. But first, to refresh your memory, let me remind you that I am on a quest to create a life-changing app! I am new to the world of data science and am currently tackling the challenge of building an app using Apache SystemML and Apache Spark one step at a time. If you haven't already, make sure to check out my previous tutorials, which start here.

So far we've daydreamed about delightful data, complained about how hard it is to find good data, found good data, learned how to write Scala and NOW we will learn how to access SystemML from the Spark Shell.

Not familiar with the Spark shell? Here's a great tutorial. Not sure what SystemML is? Look here!

At a high-level, SystemML is what is used for the machine learning and mathematical part of your data science project. You can log into Spark Shell, load SystemML on the shell, load your data and write your linear algebra, statistical equations, matrices, etc. in code much shorter than it would be in the Spark shell syntax. It helps not only with mathematical exploration and machine learning algorithms, but it allows you to be on Spark where you can do all of the above with really big data that you couldn't use on your local computer. Focusing on this step of your project, let's walk through how to set your computer up for all of SystemML's assumptions, how to load Spark Shell, load SystemML, load data and do a few examples in scala. (I promise a PySpark tutorial will come in the future!)

Now let's get going on our learning. First step: assumptions for SystemML.

Have Java, Scala, wget and Spark installed on your computer.

brew tap caskroom/cask  
brew install Caskroom/cask/java  
brew install scala  
brew install wget  
brew install apache-spark  

Now let's set up SystemML!

Download SystemML.

wget https://sparktc.ibmcloud.com/repo/latest/SystemML.jar  

Now type the following code to access the Spark Shell with SystemML.

spark-shell --executor-memory 4G --driver-memory 4G --jars SystemML.jar  

Now, using the Spark Shell (Scala), import the MLContext for SystemML.

import org.apache.sysml.api.mlcontext._  
import org.apache.sysml.api.mlcontext.ScriptFactory._  
val ml = new MLContext(sc)  

Congratulations!! NOW YOU ARE IN APACHE SYSTEMML!!

In the future you will just need to do the last two steps to get this going.

Let's figure out how to load a script and run it as well as load data and run some examples.

These examples and tons of documentation can also be found here.

Here's a quick example: Script from a URL.
Here s1 is created by reading Univar-Stats.dml from a URL address.

val uniUrl = "https://raw.githubusercontent.com/apache/incubator-systemml/master/scripts/algorithms/Univar-Stats.dml"  
val s1 = ScriptFactory.dmlFromUrl(uniUrl)  

More examples of how to load scripts can be found here.

Our next step is to parallelize the information, read in two matrices as RDDs, getting the sum of the first, the sum of the second and a message.

scala> val data1 = sc.parallelize(Array("1.0,2.0", "3.0,4.0”))  
scala> val data2 = sc.parallelize(Array("5.0,6.0", "7.0,8.0”))  
scala>val s = """  
     | s1 = sum(m1);
     | s2 = sum(m2);
     | if (s1 > s2) {
     |  message = "s1 is greater"
     | } else if (s2 > s1) {
     |  message = "s2 is greater"
     | } else {
     |  message = "s1 and s2 are equal"
     | }
     | """

scala> val script = dml(s).in("m1",data1).in("m2", data2).out("s1","s2", "message”)  

Your should get:

script: org.apache.sysml.api.mlcontext.Script =  
Inputs:  
[1] (RDD) m1: ParallelCollectionRDD[0] at parallelize at <console>:33
[2] (RDD) m2: ParallelCollectionRDD[1] at parallelize at <console>:33

Outputs:  
[1] s1
[2] s2
[3] message

Now print your script info. You should see:

scala> println(script.info)  
Script Type: DML

Inputs:  
[1] (RDD) m1: ParallelCollectionRDD[0] at parallelize at <console>:33
[2] (RDD) m2: ParallelCollectionRDD[1] at parallelize at <console>:33

Outputs:  
[1] s1
[2] s2
[3] message

Input Parameters:  
None

Input Variables:  
[1] m1
[2] m2

Output Variables:  
[1] s1
[2] s2
[3] message

Symbol Table:  
[1] (Matrix) m1: Matrix: null, [-1 x -1, nnz=-1, blocks (1 x 1)], csv, not-dirty
[2] (Matrix) m2: Matrix: null, [-1 x -1, nnz=-1, blocks (1 x 1)], csv, not-dirty

Script String:

s1 = sum(m1);  
s2 = sum(m2);  
if (s1 > s2) {  
 message = "s1 is greater"
} else if (s2 > s1) {
 message = "s2 is greater"
} else {
 message = "s1 and s2 are equal"
}

Script Execution String:  
m1 = read('');  
m2 = read('');

s1 = sum(m1);  
s2 = sum(m2);  
if (s1 > s2) {  
 message = "s1 is greater"
} else if (s2 > s1) {
 message = "s2 is greater"
} else {
 message = "s1 and s2 are equal"
}
write(s1, '');  
write(s2, '');  
write(message, '');  

Execute your script and get your results!

scala> val results = ml.execute(script)  
results: org.apache.sysml.api.mlcontext.MLResults =  
[1] (Double) s1: 10.0
[2] (Double) s2: 26.0
[3] (String) message: s2 is greater

Just as an example, you can set your value as x and get your results in Double form.
Not familiar with Scala? Check this tutorial out!

scala> val x = results.getDouble("s1")  
x: Double = 10.0

scala> val y = results.getDouble("s2")  
y: Double = 26.0

scala> x + y  
res1: Double = 36.0  

Here is another version. Because the API is very Scala friendly, you can pull out your results as a Scala tuple.

scala> val (firstSum, secondSum, sumMessage) = results.getTuple[Double, Double, String]("s1", "s2", "message")  
firstSum: Double = 10.0  
secondSum: Double = 26.0  
sumMessage: String = s2 is greater  

Here is the really handy part. As another example you can load in your data, type the short code and get a whole table of standard statistical measures for each feature!

Let's first get our data into Spark.
Because this step of our awesome life-changing, data science project/app is about focusing on the mathematical exploration (very soon it will be about machine learning algorithms), we want to make sure our data is clean and ready to go. Let's load in some data and run a SystemML script.

scala> val habermanUrl = "http://archive.ics.uci.edu/ml/machine-learning-databases/haberman/haberman.data"

scala> val habermanList = scala.io.Source.fromURL(habermanUrl).mkString.split("\n")

scala> val habermanRDD = sc.parallelize(habermanList)

scala> val typesRDD = sc.parallelize(Array("1.0,1.0,1.0,2.0"))

scala> val scriptUrl = "https://raw.githubusercontent.com/apache/incubator-systemml/master/scripts/algorithms/Univar-Stats.dml"

scala> val script = dmlFromUrl(scriptUrl).in("A", habermanRDD, habermanMetadata).in("K", typesRDD, typesMetadata).in("$CONSOLE_OUTPUT", true)  
scala> val results = ml.execute(script)

-------------------------------------------------
Feature [1]: Scale  
 (01) Minimum             | 30.0
 (02) Maximum             | 83.0
 (03) Range               | 53.0
 (04) Mean                | 52.45751633986928
 (05) Variance            | 116.71458266366658
 (06) Std deviation       | 10.803452349303281
 (07) Std err of mean     | 0.6175922641866753
 (08) Coeff of variation  | 0.20594669940735139
 (09) Skewness            | 0.1450718616532357
 (10) Kurtosis            | -0.6150152487211726
 (11) Std err of skewness | 0.13934809593495995
 (12) Std err of kurtosis | 0.277810485320835
 (13) Median              | 52.0
 (14) Interquartile mean  | 52.16013071895425
-------------------------------------------------
Feature [2]: Scale  
 (01) Minimum             | 58.0
 (02) Maximum             | 69.0
 (03) Range               | 11.0
 (04) Mean                | 62.85294117647059
 (05) Variance            | 10.558630665380907
 (06) Std deviation       | 3.2494046632238507
 (07) Std err of mean     | 0.18575610076612029
 (08) Coeff of variation  | 0.051698529971741194
 (09) Skewness            | 0.07798443581479181
 (10) Kurtosis            | -1.1324380182967442
 (11) Std err of skewness | 0.13934809593495995
 (12) Std err of kurtosis | 0.277810485320835
 (13) Median              | 63.0
 (14) Interquartile mean  | 62.80392156862745
-------------------------------------------------
Feature [3]: Scale  
 (01) Minimum             | 0.0
 (02) Maximum             | 52.0
 (03) Range               | 52.0
 (04) Mean                | 4.026143790849673
 (05) Variance            | 51.691117539912135
 (06) Std deviation       | 7.189653506248555
 (07) Std err of mean     | 0.41100513466216837
 (08) Coeff of variation  | 1.7857418611299172
 (09) Skewness            | 2.954633471088322
 (10) Kurtosis            | 11.425776549251449
 (11) Std err of skewness | 0.13934809593495995
 (12) Std err of kurtosis | 0.277810485320835
 (13) Median              | 1.0
 (14) Interquartile mean  | 1.2483660130718954
-------------------------------------------------
Feature [4]: Categorical (Nominal)  
 (15) Num of categories   | 2
 (16) Mode                | 1
 (17) Num of modes        | 1
results: org.apache.sysml.api.mlcontext.MLResults =  
[1] (Matrix) baseStats: Matrix: scratch_space/_p5250_9.31.116.229/parfor/2_resultmerge1, [17 x 4, nnz=44, blocks (1000 x 1000)], binaryblock, dirty

You can also ask for the base stats.

scala> val baseStats = results.getMatrix("baseStats")  
baseStats: org.apache.sysml.api.mlcontext.Matrix = org.apache.sysml.api.mlcontext.Matrix@237cd4e5

scala> baseStats.  
asDataFrame          asDoubleMatrix       asInstanceOf         asJavaRDDStringCSV   asJavaRDDStringIJV   asMLMatrix           asMatrixObject       asRDDStringCSV  
asRDDStringIJV       isInstanceOf         toString             

You can also get the base stats as an RDD. Note: IJV leaves out non values and CSV includes them. Here's an example of both:

scala> baseStats.asRDDString  
asRDDStringCSV   asRDDStringIJV   

scala> baseStats.asRDDStringCSV.collect  
res4: Array[String] = Array(30.0,58.0,0.0,0.0, 83.0,69.0,52.0,0.0, 53.0,11.0,52....1.0)

scala> baseStats.asRDDStringIJV.collect  
res5: Array[String] = Array(1 1 30.0, 1 2 58.0, 1 3 0.0, 1 4 0.0, 2 1 83.0, 2 2 69.0, 2 3 52.0, 2 4 0.0, ... 1...  

I think that's a great start to using SystemML with Spark Shell! Once you're done you can quit to exit.

:quit

You have successfully set up your computer for running SystemML and Spark, loaded the Spark shell, ran scripts, loaded data and run some examples!! Congrats!!

Stay tuned for more tutorials and next steps on our life-changing app!

By Madison J. Myers

Spark Technology Center

Newsletter

Subscribe to the Spark Technology Center newsletter for the latest thought leadership in Apache Spark™, machine learning and open source.

Subscribe

Newsletter

You Might Also Enjoy