MNIST Demo

We will construct an ML Pipeline comprised of a Vector Assembler, a Binarizer, PCA and a Random Forest Model for handwritten image classification on the MNIST dataset. The goal of this exercise is not to train the optimal model, but rather to demonstrate the simplicity of going from training a pipeline in Spark and deploying that same pipeline (data processing + the algorithm) outside of Spark.

The code for this tutorial is split up into two parts:

  • Spark ML Pipeline Code: Vanilla/out-of-the-box Spark code to train the ML Pipeline, which we serialize to Bundle.ML
  • MLeap Code: Load the serialized Bundle to Mleap and transform Leap Frames

Nouns

  • Estimator: The actual learning algorithms that train/fit the transformer against the data frame and produces a Model
  • Model: In Spark, the model is the code and metadata needed to score against an already trained algorithm
  • Transformer: Anything that transforms a data frame, does not necessarily be trained by an estimator (i.e. a Binarizer)
  • LeapFrame: A dataframe structure used for storing your data and the associated schema

Load the data

You can download the training and dataset (gzipped from s3) and of course you’ll have to adjust the and testDatasetPath.

The original data is hosted on Yann LeCun’s website.

Build the ML Data Pipeline

  1. // Define Dependent and Independent Features
  2. val predictionCol = "label"
  3. val labels = Seq("0","1","2","3","4","5","6","7","8","9")
  4. val pixelFeatures = (0 until 784).map(x => s"x$x").toArray
  5. val layers = Array[Int](pixelFeatures.length, 784, 800, labels.length)
  6. val vector_assembler = new VectorAssembler()
  7. .setInputCols(pixelFeatures)
  8. val stringIndexer = { new StringIndexer()
  9. .setInputCol(predictionCol)
  10. .setOutputCol("label_index")
  11. .fit(dataset)
  12. }
  13. .setInputCol(vector_assembler.getOutputCol)
  14. .setThreshold(127.5)
  15. .setOutputCol("binarized_features")
  16. val pca = new PCA().
  17. setInputCol(binarizer.getOutputCol).
  18. setOutputCol("pcaFeatures").
  19. setK(10)
  20. val featurePipeline = new Pipeline().setStages(Array(vector_assembler, stringIndexer, binarizer, pca))
  21. // Transform the raw data with the feature pipeline and persist it
  22. val featureModel = featurePipeline.fit(dataset)
  23. // Select only the data needed for training and persist it
  24. val datasetPcaFeaturesOnly = datasetWithFeatures.select(stringIndexer.getOutputCol, pca.getOutputCol)

Train a Random Forest Model

Serialize the ML Data Pipeline and RF Model to Bundle.ML

  1. import org.apache.spark.ml.mleap.SparkUtil
  2. val pipeline = SparkUtil.createPipelineModel(uid = "pipeline", Array(featureModel, rfModel))
  3. val sbc = SparkBundleContext().withDataset(rfModel.transform(datasetWithFeatures))
  4. for(bf <- managed(BundleFile("jar:file:/tmp/mnist-spark-pipeline.zip"))) {
  5. pipeline.writeBundle.save(bf)(sbc).get
  6. }

Deserialize to MLeap and Score New Data

The goal of this step is to show how to deserialize a bundle and use it to score LeapFrames without any Spark dependencies. You can download the from our s3 bucket.

Load the sample LeapFrame from the mleap-demo git repo (data/mnist.json)

  1. import ml.combust.mleap.runtime.serialization.FrameReader
  2. val s = scala.io.Source.fromURL("file:///./mleap-demo/mnist.json").mkString
  3. val bytes = s.getBytes("UTF-8")
  4. val frame = FrameReader("ml.combust.mleap.json").fromBytes(bytes)
  5. // transform the dataframe using our pipeline
  6. val frame2 = mleapPipeline.transform(frame).get