Lessons from Updating the Twitter Classifier Apache Spark Reference Application

2016-11-15 / All Blog posts

I just updated the official Apache Spark Twitter Classifier Reference Application from Spark 1.4 / Scala 2.10 to Spark 2 / Scala 2.11. This post discusses some of the things I did from the point of view of a Scala programmer. A primary goal was to rewrite the reference application using idiomatic and functional-style Scala. This post briefly discusses two unique aspects that I addressed: command-line parsing and DRYing up the code by importing scopes. I did several other things to improve the reference application, such as modularizing the code and providing run scripts, but this post does not address them because those techniques are generally well understood.

I did not upgrade the reference application to Scala 2.12, which was released a couple of weeks ago, because Spark does not yet support Scala 2.12. Josh Rosen of Databricks wrote me and said “some of Spark’s dependencies and by Scala-version-specific code changes necessary to work around method overloads became ambiguous in 2.12. The umbrella ticket tracking 2.12 support can be found at https://issues.apache.org/jira/browse/SPARK-14220. One of the hardest pieces will be https://issues.apache.org/jira/browse/SPARK-14643 (see the linked design document on that issue). Lack of 2.12 support for Breeze and its dependencies is likely to be another serious blocker, but that might be avoidable by only publishing a subset of the projects with 2.12 to begin with (e.g. only Spark core / SQL at first).”

Command Line Parsing

I modified the reference applications’ command line parsing to use a Scala library that supported idiomatic Scala (Commander Scala), instead of Apache Commons CLI, which is the Java library that was previously used. The result was simple, clean and very terse code that is intuitive to understand and easy to maintain. Commander Scala automatically generates the help message. Take a look at the collect command’s parsing. You’ll notice that it uses some common code for parsing Twitter authentication parameters. This code is much shorter than the previous code, easier to understand and modify, and is more flexible.

import com.github.acrisci.commander.Program
import java.io.File

abstract sealed case class CollectOptions(
  twitterOptions: TwitterOptions,
  overWrite: Boolean = false,
  tweetDirectory: File = new File(System.getProperty("user.home"), "/sparkTwitter/tweets/"),
  numTweetsToCollect: Int = 100,
  intervalInSecs: Int = 1,
  partitionsEachInterval: Int = 1
)

object CollectOptions extends TwitterOptionParser {
  override val _program = super._program
    .option(flags="-w, --overWrite", description="Overwrite all data files from a previous run")
    .usage("Collect [options]    ")

  def parse(args: Array[String]): CollectOptions = {
    val program: Program = _program.parse(args)
    if (program.args.length!=program.usage.split(" ").length-2) program.help

    new CollectOptions(
      twitterOptions = super.apply(args),
      overWrite = program.overWrite,
      tweetDirectory = new File(program.args.head.replaceAll("^~", System.getProperty("user.home"))),
      numTweetsToCollect = program.args(1).toInt,
      intervalInSecs = program.args(2).toInt,
      partitionsEachInterval = program.args(3).toInt
    ){}
  }
}

Here is how to sidestep the Spark help message and display the help message for the collect entry point:

$ spark-shell \
-class com.databricks.apps.twitterClassifier.Collect \
-jars target/scala-2.11/spark-twitter-lang-classifier-assembly-2.0.0.jar \
-- -help
Usage: Collect [options]    
Options:
-h, — help output usage information
 -V, — version output the version number
 -w, — overWrite Overwrite all data files from a previous run
 -v, — accessTokenSecret [type] Twitter OAuth Access Token Secret
 -t, — accessToken [type] Twitter OAuth Access Token
 -s, — consumerSecret [type] Twitter OAuth Consumer Secret
 -c, — consumerKey [type] Twitter OAuth Consumer Key

Importing Inner Scope Into Another Object

Apache Spark is unusual in that you cannot encapsulate a Spark streaming context in a type instance. A memory overflow occurs when you try to instantiate a Scala trait or class that creates a Spark context. The solution is to use a unique Scala feature: the ability to import inner scope from an object into another scope. This meant that the code was made DRY (common code was not repeated), without using classes or traits.

Here is how I took advantage of this little-known Scala technique: first I defined the SparkObject object within a package object so it was easily found:

object SparkSetup {
  val spark = SparkSession
    .builder
    .appName(getClass.getSimpleName.replace("$", ""))
    .getOrCreate()
  val sqlContext = spark.sqlContext
  val sc: SparkContext = spark.sparkContext
  sc.setLogLevel("ERROR")
}

Next I imported all of the variables defined in SparkSetup into the Collect object’s scope, including sc, which was used twice, like this:

object Collect extends App {
  val options = CollectOptions.parse(args)
  import SparkSetup._
  val ssc = new StreamingContext(sc, Seconds(options.intervalInSecs))
  Collector.doIt(options, sc, ssc)
}

Want to learn more practical Scala techniques? Head over to ScalaCourses.com and enroll! The combination of the Introduction to Scala and Intermediate Scala courses will teach you everything you need to know to start your journey with Apache Spark.

Mike Slinn is the lead Scala instructor at ScalaCourses.com.


comments powered by Disqus