Rapid Data Science with Veikkaus dcontext & Spark

Dcontext was a part of a Veikkaus project for personalizing communication with Veikkaus customers. The project used data science for advancing more responsible playing of lottery in Finland. I wish to thank Veikkaus for advancing culture, sport, science and youth work, and taking the high road around the lottery related communication.

Thanks for the most amazing people in the Veikkaus team. Thanks for Henry Verlin for the responsible personalization vision, Eetu Paloheimo for sponsoring the project, and Janne Vuorenmaa for enabling the contribution of Veikkaus solutions for the wider community. Thanks for Janne Pellikkä for sparring, and Joni Freeman for helping with the SBT spells.

Also: thanks for Li Jang and his Dynacode project for the inspiration and the example: http://www.javaworld.com/article/2071777/design-patterns/add-dynamic-java-code-to-your-application.html?page=2

The problem worth solving

Have you ever faced tight deadlines, while impatiently waiting the massive data processing runs slowly complete? Have you had to rerun large data processing pipelines after trivial errors? 

Sometimes the data scientists’ day may look even like this:

Processing huge amounts of data with complex algorithms can a bit too much of time. Kudos Randall Munroe / XKCD for the original

Data processing can be - and often is - insufferably slow. For example: if you are operating with large customers bases (e.g. million customers) and the related data (impressions, clicks), then even the basic operations like parsing & loading the data can take tens of seconds or minutes. Training deep neural network once can take hours or days.

And this slowness can be huge headache in the data scientist's daily routine, as the time taken by single processing/testing runs can have drastic impact on data scientist productivity. Often, the data scientist has to do certain amount of iterative runs/tests before the project is complete. If the project’s productization requires 1000 separate test runs, and each run takes 6 seconds, the expert will spent 1.5 hour testing the solution, which fine. But if each test/run takes 10 minutes, the testing alone may take weeks or months, and the runs will completely dominate the project’s schedule. As a consequence: a project, which would had taken several days with 6 seconds iteration, can take several months instead. 

I personally see slow data processing runs as a major problem in the data science field. Slow runs do not cause only cause expense, but it's also rather unmotivating and boring to wait runs to complete. We - as human beings - want results fast. I want results fast, my customers wants results fast, and everyone else (colleagues, superiors, stakeholders, end users) wants results fast. This is, why we in Futurice, together with our customer Veikkaus, developed the Veikkaus dcontext.

 

Too big data?

The real problem with the data science is the data: there is simply too much of it.

You may have hundreds of thousands of customers. Each customer may have few thousand events, and each event may have countless features or details. Even if you concentrate the work on the most important & essential information, you may end up with gigabytes of data.

Having lots of data will mean one major thing: all operations on the data will take some time. Loading the data from file system takes some time. Parsing the data takes some time. Preprocessing and preparing the data takes some time, and creating models from the data can take crazy amounts of time. Then you have the next steps, which relate to testing and analyzing the results, which is highly iterative, and require fast iteration-cycle to be effective.

Also you cannot always get rid of this slowness. Even after sampling, the data set may remain overly large. You can try bigger machine or bigger cluster, but in many operations the bottleneck is the IO, not the CPU, and doing the IO over the cluster’s network may only make things worse. And even if you cut the processing time into tens of seconds: taking the delays under a second can hugely boost your productivity. The data processing is inevitably going to take some time; and significant improvements are going to be huge wins for you, the project and the end customer.

 

‘Do not repeat yourself’ - principle to the rescue?

The easiest solution for dealing with slow operations is - of course: not doing them. Running 10 minute operation is not a problem, if you can store the results and reuse it in following the iterations. This is the reason, why many data scientists prefer the consoles of scripting languages, and tools like R or Python. Also, you can have a console for Scala and Spark.

Yet, using scripting language is not always desirable. R and Python are both quite slow execution environments, and quite unattractive platforms for custom algorithms & code. If you want to speed up your test runs, thenn choosing 10x-100x slower scripting environment may not be an option. This especially, if you want to craft custom algorithms.

There are also problems related to the console-driven development approach. One problem is that you have to maintain the state within the console. If you want to run unit tests with the shared state, you have to first construct the state, and then provide it as a parameter for the newly modified code. This may be feasible (if clumsy) in R and Python console. Still: in Scala/Spark console, you cannot really reload code from your modified project. The only way to test your modifications in the console is to copy paste the changes to the console. And if you have modified dependencies: you need to copy paste those also. So it's lots of copy pasting. The Jupyter notebooks may help here, but in the long term: can you really maintain your production models in a notebook?

There has to be a better way!

 

Meet Veikkaus DContext

Veikkaus DContext was partly inspired by the Roman Census principle inherent in the Big Data solutions. The basic idea is that the data shouldn’t be taken where the processing is, but the processing should be taken to, where the data is.

In the big data solutions, the data is located in individual nodes of the cluster, and the algorithms may be sent to the nodes via JAR packaging. In DContext, the data/state is always kept prepared in the RAM, and the code changes are reloaded realtime by a dynamic class loader. Alternatively, the dcontext can be used to maintain a SparkContext, which means that the data is kept prepared in the RAM of the Spark clusters’ nodes

DRY and roman census principles to the rescue! With dcontext you do JVM boot, data loading and preliminary processing steps only once. The updated code is reloaded dynamically, while the processed data remains fully prepared in the JVM heap. This optimization can cut the iteration cycle from minutes or tens of seconds to seconds.

DContext consist of few simple parts:

  1. Dynamic class loader for reloading the compiled class files to the JVM session.

  2. DContext, which is basically a glorified HashMap, for maintaining the data in an easily accessible form in the JVM heap.

  3. And console (of course), for letting the user ran tasks in the JVM session.

 

Using dcontext with fast-spark-test

To demonstrate, how Dcontext works with data, we are going to examine fast-spark-test available here. Fast-spark-test contains stock index data (SPY, DJIA, SPX), that can be rapidly processed/examined within a local spark context.

Dcontext can be taken to use by cloning the repository, running "sbt publishLocal" in the dcontext directory, and adding following snippet into the build.sbt file:

libraryDependencies ++= Seq("fi.veikkaus" %% "dcontext" % "0.2-SNAPSHOT")

There is also separate step in starting the dcontext console so, that it can reload your dynamic class paths, and drive your tests tasks. Fast-spark-test contains following SBT task for starting the dcontext console session:

testsh := {
 // Get the SBT test configuration class path
 val classpath =
   (fullClasspath in Test)
     .value
     .map(i=>i.data)

 // Separate SBT class path into A) static and B) dynamic parts
 //  - Dynamic paths are paths, that contain following string
 val dynamicPathFilter = (path:File) => {
   val p = path.getPath
   p.contains(f"target/scala-${scalaMajorVersion}")
 }
 val staticCP = classpath.filter(!dynamicPathFilter(_))
 val dynamicCP = classpath.filter(dynamicPathFilter)

 // The console arguments contains
 //   1) dynamic class path
 //   2) mounted run context (containing tasks or unit tests)
 //   3) command for starting the console in interactive mode
 val args = Seq(
   f"-p ${dynamicCP.mkString(":")}",
    "-m fi.futurice.fastsparktest.TestContext",
    "-i")

 val mainClass = "fi.veikkaus.dcontext.Console"
 (runner in run).value.run(mainClass, staticCP, args, streams.value.log)
}

The script does 3 separate things:

  1. It separates the JVM classpath into a) static classpath and b) dynamic classpath. The static class path is given as the argument for JVM. The dynamic class path is given as an argument DContext Console ‘-p’ flag. Console will then use the dynamic class loader for loading the classes dynamically, and reload them as the classes are modified.

  2. It mounts fi.futurice.fastsparktest.TestContext as the run context using -m flag. This context provides test cases to run in console

  3. It starts the console in interactive mode with -i flag.

 

Running dcontext 

Running the task starts the console with following prompt:

type -h for instructions:
$

Now, DContext console contains things called 'dcontext layers'. These layers are basically hashmaps that can contain executable tasks or data; very much like filesystem directories. DContext mounts 3 dcontext layers, which  are:

  1. System layer (a bit like /sbin), which contains basic console tasks like -h -m and -i

  2. Mount layer (a bit like /usr/lib/myapp/bin), which contains the mounted application's tasks and tests 

  3. Data layer (like /tmp or /var/lib/myapp), which contains data created by the tasks started from dcontext.

Typing -h prints possible console commands and reveals the system layer content:

$ -h
-h                             this help
--reload                       reload classes
-q                             quits console
-p                             adds classpath
-m                             mounts context
-l                             lists context values
-i                             interactive console
-r                             remove data (e.g. '-r fooX' or '-r .*')
-d                             display class loader info

Typing -l prints the mount layer content and the data layer content.

$ -l
test fi.futurice.fastsparktest.ExampleTest

These two layers now contain only a single task, that was defined in fi.futurice.fastsparktest.TestContext:

package fi.futurice.fastsparktest

import fi.veikkaus.dcontext.ImmutableDContext

class TestContext
  extends ImmutableDContext(
    Map("test" -> new ExampleTest)) {}

ExampleTest is a unit test suite, that uses Futurice testtoys- behavioral unit test toolkit and dcontext for dynamic-code unit testing.

The test suite has a number of test cases, that can be examined with -l flag:

$ test -l
config
setup
spy
djia
spyDf

Let’s try ‘spyDf’-test, which prints information of the SPY Spark Dataframe:

$ test spyDf

...lots of Spark log...

columns:
  symbol
  date
  time
  open
  high
  low
  close
  volume

rows: 1128951

5029 ms.

The test takes 5 seconds, because loading and preparing the data frame takes some time. Now, we can rerun the test again:

$ test spyDf

...lots of Spark log...

columns:
  symbol
  date
  time
  open
  high
  low
  close
  volume

rows: 1128951

81 ms.

The test now takes only 81ms, because all the data was already loaded in the dcontext. We can see the loaded data by running ‘-l’ command:

 

$ -l
fastsparktest.sparkContext     org.apache.spark.SparkContext
fastsparktest.spy              org.apache.spark.rdd.MapPartitionsRDD
fastsparktest.spyDf            org.apache.spark.sql.DataFrame
fastsparktest.sqlContext       org.apache.spark.sql.SQLContext
test                           fi.futurice.fastsparktest.ExampleTest

The data context not only contains the spy data (text RDD and dataframe), but also the Spark infrastructure (sql and spark contexts).

 

Modifying code dynamically

spyDf test uses datacontext available in the DataContext.scala file. The spark context definition is defined as a context variable (cvalc) like this:

val sparkContext = cvalc("sparkContext") { case c =>
 org.apache.log4j.PropertyConfigurator.configure("io/in/config.properties")

 val config =
   new SparkConf()
     .set("spark.driver.memory", "4g")
     .set("spark.executor.memory", "4g")
     .setMaster("local[4]")
     .setAppName("fast-spark-test")
 val sc = new SparkContext(config)
 sc.setLogLevel("WARN" )

 sc.addJar("target/scala-2.10/fast-spark-test_2.10-0.0.1-tests.jar")
 sc.addJar("target/scala-2.10/fast-spark-test_2.10-0.0.1.jar")

 sc
}{ case sc => sc.stop }

The “{ case sc => sc.stop }” snippet in run, when the context variable gets closed.

The spy dataframe on the other hand is defined like this:

val spy = cval("spy") { case c =>
 sparkContext(c).textFile(spyFile.getPath)
}
val spyDf = cval("spyDf") { case c =>
 val sqlc = sqlContext(c)
 import sqlc.implicits._
 val first = spy(c).take(0)
 val rv =
   spy(c)
     .filter(!_.contains("Open"))
     .map { s =>
       val split = s.split(",")
       DataRow(split(0), split(1), split(2), split(3).toDouble,
         split(4).toDouble,split(5).toDouble, split(6).toDouble, split(7).toDouble)
     }.toDF()
 rv.cache
 rv
}

Now let’s go and examine the actual test case. The spyDf test case looks like this in ExampleTest.scala file:

test("spyDf")((c, t) => {
 val df : DataFrame = spyDf(c)
 val count = df.count
 t.tln("columns:")
 df.columns.foreach { c => t.tln("  " + c) }
 t.tln
 t.tln("rows: " + count)
})

Now, in order to modify the code dynamically, and see the changes: we need to A) compile the project in the background and B) repackage the JAR packages monitored by Spark. To do this, let’s start a separate SBT process in a separate terminal with following instruction:

$ sbt ~;package;test:package

Then, let’s modify the spyDf code, by also printing data frames:

test("spyDf")((c, t) => {
 val df : DataFrame = spyDf(c)
 val count = df.count
 t.tln("columns:")
 df.columns.foreach { c => t.tln("  " + c) }
 t.tln
 t.tln("rows: " + count)
 t.tln // new
 t.tln("the data frame:") // new
 tDf(t, df) // prints first lines of the data frame
})

After waiting the background process to finish (it likely takes less than second), we can run the test case on the console to produce different results:

$ test spyDf
…lots of Spark log..
  columns:
    symbol
    date
    time
    open
    high
    low
    close
    volume
  
  rows: 1128951

! the data frame:
! 1128951 entries
! 8 columns

! symbol        |date          |time          |open          |high          |low           |close         |volume        
…lots of Spark log..
! SPY           |20040701      |0931          |114.25        |114.32        |114.24        |114.27        |216400.0      
! SPY           |20040701      |0932          |114.26        |114.33        |114.24        |114.31        |207200.0      
! SPY           |20040701      |0933          |114.3         |114.34        |114.28        |114.3         |83900.0       
! SPY           |20040701      |0934          |114.3         |114.32        |114.29        |114.32        |245500.0      
! SPY           |20040701      |0935          |114.29        |114.31        |114.29        |114.3         |69400.0       
! SPY           |20040701      |0936          |114.31        |114.34        |114.31        |114.32        |218200.0      
! SPY           |20040701      |0937          |114.33        |114.36        |114.32        |114.34        |59600.0       
! SPY           |20040701      |0938          |114.34        |114.34        |114.26        |114.28        |143300.0      
! ...

187 ms. 15 errors! [d]iff, [c]ontinue or [f]reeze?

The test was run in 187 ms, instead of the original 5 seconds, because the SparkContext and SpyDf still remain in memory. If you have bigger files (e.g. few gigabytes instead of 60MB), this difference between the first run and the following iterations can be dramatic.

The run informed us about 15 errors, as the changes broke the behavioral unit test. In behavioral testing the test suite merely compares new results to old ones, and you can now inspect the differences (with d key, if you have Meld) or freeze/accept the new version.

 

The current state

In our experience, dcontext can speed up the development cycle radically, if you operate with big enough data (or just Spark). In such situations, dcontext is worth the extra effort.

Yet, dcontext requires extra effort, as it requires special SBT tasks for running it and you need to organizing the code base and tests somewhat differently. Also you probably want separate mechanisms for starting the production environment (without dcontext) and running the test runs (with dcontext) to avoid dynamic class loading overhead. DContext can also be fragile, because the class loading related oddities. Dcontext can and will break...

  1. If you put classes in dcontext, which can be dynamically reloaded. The class version 1 (loaded previously) will be incompatible with the class version 2 (loaded afterwards and required by the newly modified code), even if the classes would otherwise be identical

  2. If you dynamically load classes, while the background compilation is still running: the dynamic class loader will attempt to load the class files, while they are in inconsistent/incomplete state, and fail irreversibly.

  3. You modify the SBT file.

Whenever the dcontext breaks down, the dcontext console/JVM has to be restarted (-q quits the console).

There are major improvements that could be made to make dcontext more effective. One basic improvement relates to better monitoring of the classpath content, so that the dcontext won’t break so easily. Another improvement could relate to replacing/expanding current console with Scala-console.

 

More resources:

Check out the dcontext and fast-spark-test in github: https://github.com/VeikkausOy/dcontext. Fast-spark-test is available in the examples directory. Use get_data.sh script to fetch the stockmarket data used by the test application. You will need testtoys

Also, check out the dcolossus here. Dcolossus is a simple (and intermediate) wrapper on top of the colossus web server library. Dcolossus makes it possible to use dcontext inside a web server so, that the codes can become dynamically loaded on every HTTP request. This makes it possible to rapidly develop big data web servers and services. (Of course, in production, you probably want to disable this kind of dynamic class loading.)

All these projects are intermediate and under active development. There are issues and flaws. So please: if you spot issues or have feature wishes: feel free to send patches & pull requests!