How to Extend Spark with H2O

how to extend spark rdd and how to extend a spark plug wire and extend spark plug life and how to spark extend serializable pdf free
HalfoedGibbs Profile Pic
HalfoedGibbs,United Kingdom,Professional
Published Date:02-08-2017
Your Website URL(Optional)
Comment
Extending Spark with H2O H2O is an open source system, developed in Java byhttp://h2o.ai/ for machine learning. It offers a rich set of machine learning algorithms, and a web-based data processing user interface. It offers the ability to develop in a range of languages: Java, Scala, Python, and R. It also has the ability to interface to Spark, HDFS, Amazon S3, SQL, and NoSQL databases. This chapter will concentrate on H2O's integration with Apache Spark using the Sparkling Water component of H2O. A simple example, developed in Scala, will be used, based on real data to create a deep-learning model. This chapter will: • Examine the H2O functionality • Consider the necessary Spark H2O environment • Examine the Sparkling Water architecture • Introduce and use the H2O Flow interface • Introduce deep learning with an example • Consider performance tuning • Examine data quality The next step will be to provide an overview of the H2O functionality, and the Sparkling Water architecture that will be used in this chapter. 189 Extending Spark with H2O Overview Since it is only possible to examine, and use, a small amount of H2O's functionality in this chapter, I thought that it would be useful to provide a list of all of the functional areas that it covers. This list is taken fromhttp://h2o.ai/ website at http://h2o.ai/product/algorithms/ and is based upon munging/wrangling data, modeling using the data, and scoring the resulting models: Process Model The score tool Data profiling Generalized Linear Models (GLM) Predict Summary statistics Decision trees Confusion Matrix Aggregate, filter, bin, Gradient Boosting (GBM) AUC and derive columns Slice, log transform, K-Means Hit Ratio and anonymize Variable creation Anomaly detection PCA Score PCA Deep learning Multi Model Scoring Training and validation Naïve Bayes sampling plan Grid search The following section will explain the environment used for the Spark and H2O examples in this chapter and it will also explain some of the problems encountered. The processing environment If any of you have examined my web-based blogs, or read my first book, Big Data Made Easy, you will see that I am interested in Big Data integration, and how the big data tools connect. None of these systems exist in isolation. The data will start upstream, be processed in Spark plus H2O, and then the result will be stored, or moved to the next step in the ETL chain. Given this idea in this example, I will use Cloudera CDH HDFS for storage, and source my data from there. I could just as easily use S3, an SQL or NoSQL database. At the point of starting the development work for this chapter, I had a Cloudera CDH 4.1.3 cluster installed and working. I also had various Spark versions installed, and available for use. They are as follows: • Spark 1.0 installed as CentOS services • Spark 1.2 binary downloaded and installed • Spark 1.3 built from a source snapshot 190 I thought that I would experiment to see which combinations of Spark, and Hadoop I could get to work together. I downloaded Sparkling water athttp://h2o-release. s3.amazonaws.com/sparkling-water/master/98/index.html and used the 0.2.12-95 version. I found that the 1.0 Spark version worked with H2O, but the Spark libraries were missing. Some of the functionality that was used in many of the Sparkling Water-based examples was available. Spark versions 1.2 and 1.3 caused the following error to occur: 15/04/25 17:43:06 ERROR netty.NettyTransport: failed to bind to /192.168.1.103:0, shutting down Netty transport 15/04/25 17:43:06 WARN util.Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. The Spark master port number, although correctly cong fi ured in Spark, was not being picked up, and so the H2O-based application could not connect to Spark. After discussing the issue with the guys at H2O, I decided to upgrade to an H2O certie fi d version of both Hadoop and Spark. The recommended system versions that should be used are available athttp://h2o.ai/product/recommended-systems-for-h2o/. I upgraded my CDH cluster from version 5.1.3 to version 5.3 using the Cloudera Manager interface parcels page. This automatically provided Spark 1.2—the version that has been integrated into the CDH cluster. This solved all the H2O-related issues, and provided me with an H2O-certified Hadoop and Spark environment. Installing H2O For completeness, I will show you how I downloaded, installed, and used H2O. Although, I finally settled on version 0.2.12-95, I first downloaded and used 0.2.12- 92. This section is based on the earlier install, but the approach used to source the software is the same. The download link changes over time so follow the Sparkling Water download option athttp://h2o.ai/download/. This will source the zipped Sparkling water release, as shown by the CentOS Linux long file listing here: hadoophc2r1m2 h2o pwd ; ls -l /home/hadoop/h2o total 15892 -rw-rr 1 hadoop hadoop 16272364 Apr 11 12:37 sparkling- water-0.2.12-92.zip 191 Extending Spark with H2O This zipped release file is unpacked using the Linux unzip command, and it results in a sparkling water release file tree: hadoophc2r1m2 h2o unzip sparkling-water-0.2.12-92.zip hadoophc2r1m2 h2o ls -d sparkling-water sparkling-water-0.2.12-92 sparkling-water-0.2.12-92.zip I have moved the release tree to the/usr/local/ area using the root account, and created a simple symbolic link to the release calledh2o. This means that my H2O- based build can refer to this link, and it doesn't need to change as new versions of sparkling water are sourced. I have also made sure, using the Linuxchmod command, that my development account, hadoop, has access to the release: hadoophc2r1m2 h2o su - roothc2r1m2 cd /home/hadoop/h2o roothc2r1m2 h2o mv sparkling-water-0.2.12-92 /usr/local roothc2r1m2 h2o cd /usr/local roothc2r1m2 local chown -R hadoop:hadoop sparkling-water-0.2.12-92 roothc2r1m2 local ln –s sparkling-water-0.2.12-92 h2o roothc2r1m2 local ls –lrt grep sparkling total 52 drwxr-xr-x 6 hadoop hadoop 4096 Mar 28 02:27 sparkling-water-0.2.12-92 lrwxrwxrwx 1 root root 25 Apr 11 12:43 h2o - sparkling- water-0.2.12-92 The release has been installed on all the nodes of my Hadoop CDH clusters. The build environment From past examples, you will know that I favor SBT as a build tool for developing Scala source examples. I have created a development environment on the Linux CentOS 6.5 server calledhc2r1m2 using the hadoop development account. The development directory is calledh2o_spark_1_2: hadoophc2r1m2 h2o_spark_1_2 pwd /home/hadoop/spark/h2o_spark_1_2 192 My SBT build configuration file named h2o.sbt is located here; it contains the following: hadoophc2r1m2 h2o_spark_1_2 more h2o.sbt name := "H 2 O" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.3.0" libraryDependencies += "org.apache.spark" % "spark-core" % "1.2.0" from "file:///opt/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/jars/spark- assembly-1.2.0-cdh5.3.3-hadoop2.5.0-cdh5.3.3.jar" libraryDependencies += "org.apache.spark" % "mllib" % "1.2.0" from "file:///opt/cloudera/parcels/CDH-5.3-1.cdh5.3.3.p0.5/jars/spark- assembly-1.2.0-cdh5.3.3-hadoop2.5.0-cdh5.3.3.jar" libraryDependencies += "org.apache.spark" % "sql" % "1.2.0" from "file:///opt/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/jars/spark- assembly-1.2.0-cdh5.3.3-hadoop2.5.0-cdh5.3.3.jar" libraryDependencies += "org.apache.spark" % "h2o" % "0.2.12-95" from "file:///usr/local/h2o/assembly/build/libs/sparkling-water-assembly- 0.2.12-95-all.jar" libraryDependencies += "hex.deeplearning" % "DeepLearningModel" % "0.2.12-95" from "file:///usr/local/h2o/assembly/build/libs/sparkling- water-assembly-0.2.12-95-all.jar" libraryDependencies += "hex" % "ModelMetricsBinomial" % "0.2.12-95" from "file:///usr/local/h2o/assembly/build/libs/sparkling-water-assembly- 0.2.12-95-all.jar" libraryDependencies += "water" % "Key" % "0.2.12-95" from "file:///usr/ local/h2o/assembly/build/libs/sparkling-water-assembly-0.2.12-95-all.jar" libraryDependencies += "water" % "fvec" % "0.2.12-95" from "file:///usr/ local/h2o/assembly/build/libs/sparkling-water-assembly-0.2.12-95-all.jar" 193 www.finebook.irExtending Spark with H2O I have provided SBT configuration examples in the previous chapters, so I won't go into the line-by line-detail here. I have used the file-based URLs to define the library dependencies, and have sourced the Hadoop JAR files from the Cloudera parcel path for the CDH install. The Sparkling Water JAR path is defined as /usr/local/h2o/ that was just created. I use a Bash script calledrun_h2o.bash within this development directory to execute my H2O-based example code. It takes the application class name as a parameter, and is shown below: hadoophc2r1m2 h2o_spark_1_2 more run_h2o.bash /bin/bash SPARK_HOME=/opt/cloudera/parcels/CDH SPARK_LIB=SPARK_HOME/lib SPARK_BIN=SPARK_HOME/bin SPARK_SBIN=SPARK_HOME/sbin SPARK_JAR=SPARK_LIB/spark-assembly-1.2.0-cdh5.3.3-hadoop2.5.0- cdh5.3.3.jar H2O_PATH=/usr/local/h2o/assembly/build/libs H2O_JAR=H2O_PATH/sparkling-water-assembly-0.2.12-95-all.jar PATH=SPARK_BIN:PATH PATH=SPARK_SBIN:PATH export PATH cd SPARK_BIN ./spark-submit \ class 1 \ master spark://hc2nn.semtech-solutions.co.nz:7077 \ executor-memory 85m \ total-executor-cores 50 \ jars H2O_JAR \ /home/hadoop/spark/h2o_spark_1_2/target/scala-2.10/h-2-o_2.10-1.0.jar 194 www.finebook.ir This example of Spark application submission has already been covered, so again, I won't get into the detail. Setting the executor memory at a correct value was critical to avoiding out-of-memory issues and performance problems. This will be examined in the Performance Tuning section. As in the previous examples, the application Scala code is located in thesrc/main/ scala subdirectory, under thedevelopment directory level. The next section will examine the Apache Spark, and the H2O architecture. Architecture The diagrams in this section have been sourced from thehttp://h2o.ai/ web site athttp://h2o.ai/blog/2014/09/how-sparkling-water-brings-h2o-to-spark/ to provide a clear method of describing the way in which H2O Sparkling Water can be used to extend the functionality of Apache Spark. Both, H2O and Spark are open source systems. Spark MLlib contains a great deal of functionality, while H2O extends this with a wide range of extra functionality, including deep learning. It offers tools to munge (transform), model, and score the data. It also offers a web-based user interface to interact with. The next diagram, borrowed fromhttp://h2o.ai/, shows how H2O integrates with Spark. As we already know, Spark has master and worker servers; the workers create executors to do the actual work. The following steps occur to run a Sparkling water-based application: 1. Spark'ssubmit command sends the sparkling water JAR to the Spark master. 2. The Spark master starts the workers, and distributes the JAR file. 3. The Spark workers start the executor JVMs to carry out the work. 4. The Spark executor starts an H2O instance. 195 www.finebook.irExtending Spark with H2O The H2O instance is embedded with the Executor JVM, and so it shares the JVM heap space with Spark. When all of the H2O instances have started, H2O forms a cluster, and then the H2O flow web interface is made available. Sparkling Water Cluster Spark Spark H O Worker Executor 2 JVM JVM (3) (4) (2) Spark Spark Spark Sparkling spark-submit H O Master Worker Executor 2 App JVM JVM JVM jar file (1) Spark Spark H O Worker Executor 2 JVM JVM The preceding diagram explains how H2O fits into the Apache Spark architecture, and how it starts, but what about data sharing? How does data pass between Spark and H2O? The following diagram explains this: 196 www.finebook.irSparkling Water Cluster Spark Executor JVM Data Source H O (e.g. 2 HDFS) (1) Spark H O 2 RDD RDD (2) H O 2 (3) Spark Executor JVM H O 2 Spark Executor JVM A new H2O RDD data structure has been created for H2O and Sparkling Water. It is a layer, based at the top of an H2O frame, each column of which represents a data item, and is independently compressed to provide the best compression ratio. In the deep learning example, Scala code presented later in this chapter you will see that a data frame has been created implicitly from a Spark schema RDD and a columnar data item, income has been enumerated. I won't dwell on this now as it will be explained later but this is a practical example of the above architecture: val testFrame:DataFrame = schemaRddTest testFrame.replace( testFrame.find("income"), testFrame. vec("income").toEnum) In the Scala-based example that will be tackled in this chapter, the following actions will take place: 1. Data is being sourced from HDFS, and is being stored in a Spark RDD. 2. Spark SQL is used to filter data. 3. The Spark schema RDD is converted into an H2O RDD. 4. The H2O-based processing and modeling occurs. 5. The results are passed back to Spark for accuracy checking. 197 www.finebook.irExtending Spark with H2O To this point, the general architecture of H2O has been examined, and the product has been sourced for use. The development environment has been explained, and the process by which H2O and Spark integrate has been considered. Now, it is time to delve into a practical example of the use of H2O. First though, some real-world data must be sourced for modeling purposes. Sourcing the data Since I have already used the Artificial Neural Net (ANN) functionality in Chapter 2, Apache Spark MLlib, to classify images, it seems only fitting that I use H2O deep learning to classify data in this chapter. In order to do this, I need to source data sets that are suitable for classification. I need either image data with associated image labels, or the data containing vectors and a label that I can enumerate, so that I can force H2O to use its classification algorithm. The MNIST test and training image data was sourced fromann.lecun.com/exdb/ mnist/. It contains 50,000 training rows, and 10,000 rows for testing. It contains digital images of numbers 0 to 9 and associated labels. I was not able to use this data as, at the time of writing, there was a bug in H2O Sparkling water that limited the record size to 128 elements. The MNIST data has a record size of 28 x 28 + 1 elements for the image plus the label: 15/05/14 14:05:27 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 256, hc2r1m4.semtech-solutions.co.nz): java.lang. ArrayIndexOutOfBoundsException: -128 This issue should have been fixed and released by the time you read this, but in the short term I sourced another data set called income fromhttp://www.cs.toronto. edu/delve/data/datasets.html, which contains Canadian employee income data. The following information shows the attributes and the data volume. It also shows the list of columns in the data, and a sample row of the data: Number of attributes: 16 Number of cases: 45,225 age workclass fnlwgt education educational-num marital-status occupation relationship race gender capital-gain capital-loss hours-per-week native- country income 39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in- family, White, Male, 2174, 0, 40, United-States, =50K 198 www.finebook.ir I will enumerate the last column in the data—the income bracket, so=50k will enumerate to0. This will allow me to force the H2O deep learning algorithm to carry out classification rather than regression. I will also use Spark SQL to limit the data columns, and filter the data. Data quality is absolutely critical when creating an H2O-based example like that described in this chapter. The next section examines the steps that can be taken to improve the data quality, and so save time. Data Quality When I import CSV data files from HDFS to my Spark Scala H2O example code, I can filter the incoming data. The following example code contains two filter lines; the first checks that a data line is not empty, while the second checks that the final column in each data row (income), which will be enumerated, is not empty: val testRDD = rawTestData .filter(_.isEmpty) .map(_.split(",")) .filter( rawRow = rawRow(14).trim.isEmpty ) I also needed to clean my raw data. There are two data sets, one for training and one for testing. It is important that the training and testing data have the following: • The same number of columns • The same data types • The null values must be allowed for in the code • The enumerated type values must match—especially for the labels I encountered an error related to the enumerated label column income and the values that it contained. I found that my test data set rows were terminated with a full stop character "." When processed, this caused the training and the test data values to mismatch when enumerated. So, I think that time and effort should be spent safeguarding the data quality, as a pre-step to training, and testing machine learning functionality so that time is not lost, and extra cost incurred. 199 www.finebook.irExtending Spark with H2O Performance tuning It is important to monitor the Spark application error and the standard output logs in the Spark web user interface if you see errors like the following: 05-15 13:55:38.176 192.168.1.105:54321 6375 Thread-10 ERRR: Out of Memory and no swap space left from hc2r1m1.semtech-solutions. co.nz/192.168.1.105:54321 If you encounter instances where application executors seem to hang without response, you may need to tune your executor memory. You need to do so if you see an error like the following in your executor log: 05-19 13:46:57.300 192.168.1.105:54321 10044 Thread-11 WARN: Unblock allocations; cache emptied but memory is low: OOM but cache is emptied: MEM_MAX = 89.5 MB, DESIRED_CACHE = 96.4 MB, CACHE = N/A, POJO = N/A, this request bytes = 36.4 MB This can cause a loop, as the application requests more memory than is available, and so waits until the next iteration retries. The application can seem to hang until the executors are killed, and the tasks re-executed on alternate nodes. A short task's run time can extend considerably due to such problems. Monitor the Spark logs for these types of error. In the previous example, changing the executor memory setting in thespark-submit command removes the error, and reduces the runtime substantially. The memory value requested has been reduced to a figure below that which is available. executor-memory 85m Deep learning Neural networks were introduced in Chapter 2, Apache Spark MLlib. This chapter builds upon this understanding by introducing deep learning, which uses deep neural networks. These are neural networks that are feature-rich, and contain extra hidden layers, so that their ability to extract data features is increased. These networks are generally feed-forward networks, where the feature characteristics are inputs to the input layer neurons. These neurons then fire and spread the activation through the hidden layer neurons to an output layer, which should present the feature label values. Errors in the output are then propagated back through the network (at least in back propagation), adjusting the neuron connection weight matrices so that classification errors are reduced during training. 200 www.finebook.ir The previous example image, described in the H2O booklet athttps://leanpub. com/deeplearning/read ,shows a deep learning network with four input neurons to the left, two hidden layers in the middle, and two output neurons. The arrows show both the connections between neurons and the direction that activation takes through the network. These networks are feature-rich because they provide the following options: • Multiple training algorithms • Automated network configuration • The ability to configure many options ° Structure Hidden layer structure ° Training Learning rate, annealing, and momentum So, after giving this brief introduction to deep learning, it is now time to look at some of the sample Scala-based code. H2O provides a great deal of functionality; the classes that are needed to build and run the network have been developed for you. You just need to do the following: • Prepare the data and parameters • Create and train the model 201 www.finebook.irExtending Spark with H2O • Validate the model with a second data set • Score the validation data set output When scoring your model, you must hope for a high value in percentage terms. Your model must be able to accurately predict and classify your data. Example code – income This section examines the Scala-based H2O Sparkling Water deep learning example using the previous Canadian income data source. First, the Spark (Context,Conf, mllib, andRDD), and H2O (h2o,deeplearning, andwater) classes are imported: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import hex.deeplearning.DeepLearningModel, DeepLearning import hex.deeplearning.DeepLearningModel.DeepLearningParameters import org.apache.spark.h2o._ import org.apache.spark.mllib import org.apache.spark.mllib.feature.IDFModel, IDF, HashingTF import org.apache.spark.rdd.RDD import water.Key Next an application class calledh2o_spark_dl2 is defined, the master URL is created, and then a configuration object is created, based on this URL, and the application name. The Spark context is then created using the conguration fi object: object h2o_spark_dl2 extends App val sparkMaster = "spark://hc2nn.semtech-solutions.co.nz:7077" val appName = "Spark h2o ex1" val conf = new SparkConf() conf.setMaster(sparkMaster) conf.setAppName(appName) val sparkCxt = new SparkContext(conf) 202 www.finebook.irAn H2O context is created from the Spark context, and also an SQL context: import org.apache.spark.h2o._ implicit val h2oContext = new org.apache.spark.h2o. H2OContext(sparkCxt).start() import h2oContext._ import org.apache.spark.sql._ implicit val sqlContext = new SQLContext(sparkCxt) The H2O Flow user interface is started with theopenFlow command: import sqlContext._ openFlow The training and testing of the data files are now defined (on HDFS) using the server URL, path, and the file names: val server = "hdfs://hc2nn.semtech-solutions.co.nz:8020" val path = "/data/spark/h2o/" val train_csv = server + path + "adult.train.data" // 32,562 rows val test_csv = server + path + "adult.test.data" // 16,283 rows The CSV based training and testing data is loaded using the Spark context's textFile method: val rawTrainData = sparkCxt.textFile(train_csv) val rawTestData = sparkCxt.textFile(test_csv) Now, the schema is defined in terms of a string of attributes. Then, a schema variable is created by splitting the string using a series ofStructField, based on each column. The data types are left as String, and the true value allows for the Null values in the data: val schemaString = "age workclass fnlwgt education “ + “educationalnum maritalstatus " + "occupation relationship race gender “ + “capitalgain capitalloss " + hoursperweek nativecountry income" val schema = StructType( schemaString.split(" ") .map(fieldName = StructField(fieldName, StringType, true))) 203 www.finebook.irExtending Spark with H2O The raw CSV linetraining and testing data is now split by commas into columns. The data is filtered on empty lines to ensure that the last column (income) is not empty. The actual data rows are created from the fifteen (0-14) trimmed elements in the raw CSV data. Both, the training and the test data sets are processed: val trainRDD = rawTrainData .filter(_.isEmpty) .map(_.split(",")) .filter( rawRow = rawRow(14).trim.isEmpty ) .map(rawRow = Row( rawRow(0).toString.trim, rawRow(1).toString.trim, rawRow(2).toString.trim, rawRow(3).toString.trim, rawRow(4).toString.trim, rawRow(5).toString.trim, rawRow(6).toString.trim, rawRow(7).toString.trim, rawRow(8).toString.trim, rawRow(9).toString.trim, rawRow(10).toString.trim, rawRow(11).toString.trim, rawRow(12).toString.trim, rawRow(13).toString.trim, rawRow(14).toString.trim ) ) val testRDD = rawTestData .filter(_.isEmpty) .map(_.split(",")) .filter( rawRow = rawRow(14).trim.isEmpty ) .map(rawRow = Row( rawRow(0).toString.trim, rawRow(1).toString.trim, rawRow(2).toString.trim, rawRow(3).toString.trim, rawRow(4).toString.trim, rawRow(5).toString.trim, rawRow(6).toString.trim, rawRow(7).toString.trim, rawRow(8).toString.trim, rawRow(9).toString.trim, rawRow(10).toString.trim, rawRow(11).toString.trim, rawRow(12).toString.trim, rawRow(13).toString.trim, rawRow(14).toString.trim ) ) Spark Schema RDD variables are now created for the training and test data sets by applying the schema variable, created previously for the data using the Spark context'sapplySchema method: val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)val testSchemaRDD = sqlContext.applySchema(testRDD, schema) 204 www.finebook.irTemporary tables are created for the training and testing data: trainSchemaRDD.registerTempTable("trainingTable") testSchemaRDD.registerTempTable("testingTable") Now, SQL is run against these temporary tables, both to filter the number of columns, and to potentially limit the data. I could have added aWHERE orLIMIT clause. This is a useful approach that enables me to manipulate both the column and row-based data: val schemaRddTrain = sqlContext.sql( """SELECT age,workclass,education,maritalstatus, occupation,relationship,race, gender,hoursperweek,nativecountry,income FROM trainingTable """.stripMargin) val schemaRddTest = sqlContext.sql( """SELECT age,workclass,education,maritalstatus, occupation,relationship,race, gender,hoursperweek,nativecountry,income FROM testingTable """.stripMargin) The H2O data frames are now created from the data. The final column in each data set (income) is enumerated, because this is the column that will form the deep learning label for the data. Also, enumerating this column forces the deep learning model to carry out classification rather than regression: val trainFrame:DataFrame = schemaRddTrain trainFrame.replace( trainFrame.find("income"), trainFrame. vec("income").toEnum) trainFrame.update(null) val testFrame:DataFrame = schemaRddTest testFrame.replace( testFrame.find("income"), testFrame. vec("income").toEnum) testFrame.update(null) The enumerated results data income column is now saved so that the values in this column can be used to score the tested model prediction values: val testResArray = schemaRddTest.collect() val sizeResults = testResArray.length 205 www.finebook.irExtending Spark with H2O var resArray = new ArrayDouble(sizeResults) for ( i - 0 to ( resArray.length - 1)) resArray(i) = testFrame.vec("income").at(i) The deep learning model parameters are now set up in terms of the number of epochs, or iterations—the data sets for training and validation and the label column income, which will be used to classify the data. Also, we chose to use variable importance to determine which data columns are most important in the data. The deep learning model is then created: val dlParams = new DeepLearningParameters() dlParams._epochs = 100 dlParams._train = trainFrame dlParams._valid = testFrame dlParams._response_column = 'income dlParams._variable_importances = true val dl = new DeepLearning(dlParams) val dlModel = dl.trainModel.get The model is then scored against the test data set for predictions, and these income predictions are compared to the previously stored enumerated test data income values. Finally, an accuracy percentage is output from the test data: val testH2oPredict = dlModel.score(schemaRddTest )('predict) val testPredictions = toRDDDoubleHolder(testH2oPredict) .collect.map(_.result.getOrElse(Double.NaN)) var resAccuracy = 0 for ( i - 0 to ( resArray.length - 1)) if ( resArray(i) == testPredictions(i) ) resAccuracy = resAccuracy + 1 println() println( "" ) println( " Model Test Accuracy = " + 100resAccuracy / resArray.length + " % " ) println( "" ) println() 206 www.finebook.irIn the last step, the application is stopped, the H2O functionality is terminated via a shutdown call, and then the Spark context is stopped: water.H2O.shutdown() sparkCxt.stop() println( " Script Finished " ) // end application Based upon a training data set of 32,000, and a test data set of 16,000 income records, this deep learning model is quite accurate. It reaches an accuracy level of83 percent, which is impressive for a few lines of code, small data sets, and just 100 epochs, as the run output shows: Model Test Accuracy = 83 % In the next section, I will examine some of the coding needed to process the MNIST data, even though that example could not be completed due to an H2O limitation at the time of coding. The example code – MNIST Since the MNIST image data record is so big, it presents problems while creating a Spark SQL schema, and processing a data record. The records in this data are in CSV format, and are formed from a 28 x 28 digit image. Each line is then terminated by a label value for the image. I have created my schema by defining a function to create the schema string to represent the record, and then calling it: def getSchema(): String = var schema = "" val limit = 2828 for (i - 1 to limit) schema += "P" + i.toString + " " schema += "Label" schema // return value val schemaString = getSchema() val schema = StructType( schemaString.split(" ") .map(fieldName = StructField(fieldName, IntegerType, false))) 207 www.finebook.irExtending Spark with H2O The same general approach to deep learning can be taken to data processing as the previous example, apart from the actual processing of the raw CSV data. There are too many columns to process individually, and they all need to be converted into integers to represent their data type. This can be done in one of two ways. In the first example, var args can be used to process all the elements in the row: val trainRDD = rawTrainData.map( rawRow = Row( rawRow.split(","). map(_.toInt): _ )) The second example uses thefromSeq method to process the row elements: val trainRDD = rawTrainData.map(rawRow = Row.fromSeq(rawRow. split(",") .map(_.toInt))) In the next section, the H2O Flow user interface will be examined to see how it can be used to both monitor H2O and process the data. H2O Flow H2O Flow is a web-based open source user interface for H2O, and given that it is being used with Spark, Sparkling Water. It is a fully functional H2O web interface for monitoring the H2O Sparkling Water cluster plus jobs, and also for manipulating data and training models. I have created some simple example code to start the H2O interface. As in the previous Scala-based code samples, all I need to do is create a Spark, an H2O context, and then call theopenFlow command, which will start the Flow interface. The following Scala code example just imports classes for Spark context, cong fi uration, and H2O. It then den fi es the cong fi uration in terms of the application name and the Spark cluster URL. A Spark context is then created using the cong fi uration object: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.h2o._ object h2o_spark_ex2 extends App val sparkMaster = "spark://hc2nn.semtech-solutions.co.nz:7077" val appName = "Spark h2o ex2" val conf = new SparkConf() conf.setMaster(sparkMaster) conf.setAppName(appName) val sparkCxt = new SparkContext(conf) 208 www.finebook.ir