apache spark 2.0

Apache Spark™ 2.0: Migrating Applications

Many excellent fixes, enhancements and new features are available in Apache SparkTM 2.0 as highlighted in What's New in Apache SparkTM 2.0. High-level descriptions for migrating applications to Apache SparkTM 2.0 can be found at Apache SparkTM SQL Programming Guide and Apache SparkTM MLlib Guide.

This post provides a brief summary of sample code changes to migrate a Java application from Apache SparkTM 1.6 to Apache SparkTM 2.0. The migration effort is dependent upon the Apache SparkTM APIs a given application uses. Note a few breaking API changes introduced in 2.0 release can result in compilation errors for an application compatible with previous releases. The most common compilation errors when initially updating a Java application for 2.0 release are as follows:

  • DataFrame cannot be resolved to a type. The import org.apache.spark.sql.DataFrame cannot be resolved.
  • The methods fit, transform, train must override or implement a supertype method.
  • The return type is incompatible with PairFlatMapFunction>.call(Iterator<...>).

Resolving each one is straightforward by applying a group of code changes as follows.

Replace DataFrame variable declarations and references with Dataset< Row>.

For Java applications, the type org.apache.spark.sql.DataFrame type no longer exists because in Scala it has been redefined as a type alias for Dataset[Row]. So in general, for each Java class that uses DataFrame, apply the following pattern.

Replace:

import org.apache.spark.sql.DataFrame;

With:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Change:

DataFrame df;

To:

Dataset<Row> df;

Additional background on DataFrame and Dataset API changes in Apache SparkTM 2.0 can be found in SPARK-13244 Unify DataFrame and Dataset API.

Replace DataFrame with Dataset< ?> in method overrides of MLlib API subclasses.

Note there are cases where DataFrame needs to be replaced by Dataset< ?> instead of Dataset< Row>. More specifically, subclasses of org.apache.spark.ml.Estimator, Transformer, Predictor, etc. that override methods such as fit(), transform(), and train() need to be updated as follows.

Change:

fit(DataFrame df)
transform(DataFrame df)
train(DataFrame df)

To:

fit(Dataset<?> df)
transform(Dataset<?> df)
train(Dataset<?> df)

This change is related to SPARK-14500 Accept Dataset[] instead of DataFrame in MLlib APIs. In Scala MLlib APIs, DataFrame was replaced by Dataset[_]. For Java, this requires using Dataset< ?> instead.

Replace Iterable<> with Iterator<> for classes implementing PairFlatMapFunction.

If a Java class implements PairFlatMapFunction (or other variations of FlatMapFunction), compiling against 2.0 API reports an error like the following:

The return type is incompatible with PairFlatMapFunction<Iterator<...>>.call(Iterator<...>).

To resolve, change the declared return type from Iterable to Iterator in the call() method override and import java.util.Iterator. In addition, modify the return value to return an iterator() of the collection instead of the collection itself. Below is a partial code fragment to illustrate what to modify for a class that implements FlatMapFunction and corresponding call() method.

Change:

public class CustomFlatMapFunction implements FlatMapFunction<Tuple2<Integer, Iterable<String>>, String> {
    @Override
    public Iterable<String> call(Tuple2<Integer, Iterable<String>> arg0) throws Exception {
        ArrayList = new ArrayList<String>();
        // Collection processing omitted
        return collection;
    }
}

To:

import java.util.Iterator;

public class CustomFlatMapFunction implements FlatMapFunction<Tuple2<Integer, Iterable<String>>, String> {
    @Override
    public Iterator<String> call(Tuple2<Integer, Iterable<String>> arg0) throws Exception {
        ArrayList = new ArrayList<String>();
        // Collection processing omitted
        return collection.iterator();
    }
}

Although this is a breaking API change, the advantage is that functions do not need to instantiate all data. See SPARK-3369 Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator for additional information and discussion.

Update deprecated or removed APIs.

Many deprecated APIs were removed in Apache SparkTM 2.0 as described at Removals, Behavior Changes and Deprecations. It's simpler to fix deprecated API warnings before switching to Apache SparkTM 2.0 since the generated deprecation warning typically identifies what newer method to call instead. Alternatively, the API documentation can be reviewed (e.g.,1.6.2 API Documentation vs. 2.0.0 API Documentation). Detailed API removals for Apache SparkTM SQL can be seen under SPARK-12600 Remove deprecated methods in SQL / DataFrames.

Here's a simple example for org.apache.spark.sql.types.DataType.

Change:

DataType.fromCaseClassString("DoubleType")

To:

DataType.fromJson("DoubleType")

Additional information and a list of JIRAs related to new SparkSession and related API changes can be found under SPARK-13485 (Dataset-oriented) API evolution in Apache SparkTM 2.0.

There is also some refactoring in MLlib from SPARK-13944 Separate out local linear algebra as a standalone module without Spark dependency.

Change:

import org.apache.spark.mllib.linalg.Vector

To:

import org.apache.spark.ml.linalg.Vector

As mentioned in the beginning of this post, the amount of changes needed to update an application varies based on API usage. Although the amount of changes may initially seem large, applying a few patterns as shown in this post can eliminate most of the issues to get an application quickly up and running with 2.0 API. Enjoy the power and flexibility of Apache SparkTM 2.0!

Spark Technology Center

Newsletter

Subscribe to the Spark Technology Center newsletter for the latest thought leadership in Apache Spark™, machine learning and open source.

Subscribe

Newsletter

You Might Also Enjoy