Question? Leave a message!




Intro to Apache Spark

Intro to Apache Spark 25
Intro to Apache Spark http://databricks.com/ download slides:
 training.databricks.com/workshop/itasworkshop.pdf Licensed under a Creative Commons AttributionNonCommercial NoDerivatives 4.0 International License00: Getting Started Introduction installs + intros, while people arrive: 20 minIntro: Online Course Materials Resources for the course are available at:
 databricks.com/sparktrainingresourcesitas Download slides+code+data to your laptop: training.databricks.com/workshop/itasworkshop.pdf training.databricks.com/workshop/usb.zip (should have been provided on USB sticks) 3Intro: Success Criteria By end of day, participants will be comfortable 
 with the following: open a Spark Shell • develop Spark apps for typical use cases • tour of the Spark API • explore data sets loaded from HDFS, etc. • review of Spark SQL, Spark Streaming, MLlib • followup courses and certification • developer community resources, events, etc. • return to workplace and demo use of Spark • 401: Getting Started Installation handson lab: 20 minInstallation: Let’s get started using Apache Spark, 
 in just four easy steps… databricks.com/sparktrainingresourcesitas for class, copy from the USB sticks NB: please do not install/run Spark using: Homebrew on MacOSX • Cygwin on Windows • 6Step 1: Install Java JDK 6/7 on MacOSX or Windows oracle.com/technetwork/java/javase/downloads/ jdk7downloads1880260.html follow the license agreement instructions • then click the download for your OS • need JDK instead of JRE (for Maven, etc.) • 7Step 2: Download Spark we will use Spark 1.1.0 1. copy from the USB sticks 2. double click the archive file to open it 3. connect into the newly created directory for a fallback: spark.apache.org/downloads.html 8Step 3: Run Spark Shell we’ll run Spark’s interactive shell… within the “spark” directory, run: ./bin/sparkshell then from the “scala” REPL prompt,
 let’s create some data… val data = 1 to 10000 9Step 4: Create an RDD create an RDD based on that data… val distData = sc.parallelize(data) then use a filter to select values less than 10… distData.filter( 10).collect() 10Step 4: Create an RDD create an val distData = sc.parallelize(data) then use a filter to select values less than 10… d Checkpoint: 
 what do you get for results gist.github.com/ceteri/ f2c3486062c9610eac1dfile01repltxt 11Installation: Optional Downloads: Python For Python 2.7, check out Anaconda by Continuum Analytics for a fullfeatured platform: store.continuum.io/cshop/anaconda/ 12Installation: Optional Downloads: Maven Java builds later also require Maven, which you can download at: maven.apache.org/download.cgi 1302: Getting Started Spark Deconstructed lecture: 20 minSpark Deconstructed: Let’s spend a few minutes on this Scala thing… scalalang.org/ Scala Crash Course
 Holden Karau
 lintool.github.io/SparkTutorial/ slides/day1Scalacrashcourse.pdf 15Spark Deconstructed: Log Mining Example // load error messages from a log into memory // then interactively search for various patterns // https://gist.github.com/ceteri/8ae5b9509a08c08a1132 // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 messages.filter(.contains("mysql")).count() // action 2 messages.filter(.contains("php")).count() 16Spark Deconstructed: Log Mining Example We start with Spark running on a cluster…
 submitting code to be evaluated on it: Worker Worker Driver Worker 17Spark Deconstructed: Log Mining Example // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 messages.filter(.contains("mysql")).count() // action 2 discussing the other part messages.filter(.contains("php")).count() 18Spark Deconstructed: Log Mining Example At this point, take a look at the transformed RDD operator graph: scala messages.toDebugString res5: String = MappedRDD4 at map at console:16 (3 partitions) MappedRDD3 at map at console:16 (3 partitions) FilteredRDD2 at filter at console:14 (3 partitions) MappedRDD1 at textFile at console:12 (3 partitions) HadoopRDD0 at textFile at console:12 (3 partitions) 19Spark Deconstructed: Log Mining Example // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 messages.filter(.contains("mysql")).count() Worker // action 2 messages.filter(.contains("php")).count() discussing the other part Worker Driver Worker 20Spark Deconstructed: Log Mining Example // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 messages.filter(.contains("mysql")).count() Worker // action 2 block 1 messages.filter(.contains("php")).count() discussing the other part Worker Driver block 2 Worker block 3 21Spark Deconstructed: Log Mining Example // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 messages.filter(.contains("mysql")).count() Worker // action 2 block 1 messages.filter(.contains("php")).count() discussing the other part Worker Driver block 2 Worker block 3 22Spark Deconstructed: Log Mining Example // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 messages.filter(.contains("mysql")).count() Worker read HDFS // action 2 block 1 block messages.filter(.contains("php")).count() discussing the other part Worker read HDFS Driver block 2 block Worker read HDFS block 3 block 23Spark Deconstructed: Log Mining Example // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 cache 1 process, messages.filter(.contains("mysql")).count() cache data Worker // action 2 block 1 messages.filter(.contains("php")).count() discussing the other part cache 2 process, cache data Worker Driver block 2 cache 3 process, cache data Worker block 3 24Spark Deconstructed: Log Mining Example // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 cache 1 messages.filter(.contains("mysql")).count() Worker // action 2 block 1 messages.filter(.contains("php")).count() discussing the other part cache 2 Worker Driver block 2 cache 3 Worker block 3 25Spark Deconstructed: Log Mining Example // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) discussing the other part val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 cache 1 messages.filter(.contains("mysql")).count() Worker // action 2 block 1 messages.filter(.contains("php")).count() cache 2 Worker Driver block 2 cache 3 Worker block 3 26Spark Deconstructed: Log Mining Example // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) discussing the other part val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 cache 1 messages.filter(.contains(“mysql")).count() process from cache Worker // action 2 block 1 messages.filter(.contains("php")).count() cache 2 process from cache Worker Driver block 2 cache 3 process from cache Worker block 3 27Spark Deconstructed: Log Mining Example // base RDD val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) discussing the other part val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 cache 1 messages.filter(.contains(“mysql")).count() Worker // action 2 block 1 messages.filter(.contains("php")).count() cache 2 Worker Driver block 2 cache 3 Worker block 3 28Spark Deconstructed: Looking at the RDD transformations and actions from another perspective… // load error messages from a log into memory // then interactively search for various patterns // https://gist.github.com/ceteri/8ae5b9509a08c08a1132 RDD // base RDD RDD RDD action val lines = sc.textFile("hdfs://...") transformations RDD value // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() // action 1 messages.filter(.contains("mysql")).count() // action 2 messages.filter(.contains("php")).count() 29Spark Deconstructed: RDD // base RDD val lines = sc.textFile("hdfs://...") 30Spark Deconstructed: RDD RDD RDD transformations RDD // transformed RDDs val errors = lines.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r = r(1)) messages.cache() 31Spark Deconstructed: RDD RDD RDD action transformations RDD value // action 1 messages.filter(.contains("mysql")).count() 3203: Getting Started A Brief History lecture: 35 minA Brief History: 2004 2010 MapReduce paper Spark paper 2002 2004 2006 2008 2010 2012 2014 2002 2008 2014 MapReduce Google Hadoop Summit Apache Spark toplevel 2006 Hadoop Yahoo 34A Brief History: MapReduce circa 1979 – Stanford, MIT, CMU, etc.
 set/list operations in LISP, Prolog, etc., for parallel processing
 wwwformal.stanford.edu/jmc/history/lisp/lisp.htm circa 2004 – Google
 MapReduce: Simplified Data Processing on Large Clusters
 Jeffrey Dean and Sanjay Ghemawat
 research.google.com/archive/mapreduce.html circa 2006 – Apache
 Hadoop, originating from the Nutch Project
 Doug Cutting
 research.yahoo.com/files/cutting.pdf circa 2008 – Yahoo
 web scale search indexing
 Hadoop Summit, HUG, etc.
 developer.yahoo.com/hadoop/ circa 2009 – Amazon AWS
 Elastic MapReduce
 Hadoop modified for EC2/S3, plus support for Hive, Pig, Cascading, etc.
 aws.amazon.com/elasticmapreduce/ 35A Brief History: MapReduce Open Discussion: Enumerate several changes in data center technologies since 2002… 36A Brief History: MapReduce Rich Freitas, IBM Research pistoncloud.com/2013/04/storage andthemobilitygap/ meanwhile, spinny disks haven’t changed all that much… storagenewsletter.com/rubriques/hard diskdrives/hddtechnologytrendsibm/ 37A Brief History: MapReduce MapReduce use cases showed two major limitations: 1. difficultly of programming directly in MR 2. performance bottlenecks, or batch not fitting the use cases In short, MR doesn’t compose well for large applications Therefore, people built specialized systems as workarounds… 38A Brief History: MapReduce Pregel Giraph Dremel Drill Tez MapReduce Impala GraphLab Storm S4 Specialized Systems: General Batch Processing iterative, interactive, streaming, graph, etc. The State of Spark, and Where We're Going Next Matei Zaharia Spark Summit (2013) youtu.be/nU6vO2EJAb4 39A Brief History: Spark Developed in 2009 at UC Berkeley AMPLab, then open sourced in 2010, Spark has since become 
 one of the largest OSS communities in big data, with over 200 contributors in 50+ organizations “Organizations that are looking at big data challenges –
 including collection, ETL, storage, exploration and analytics –
 should consider Spark for its inmemory performance and
 the breadth of its model. It supports advanced analytics
 solutions on Hadoop clusters, including the iterative model
 required for machine learning and graph analysis.” Gartner, Advanced Analytics and Data Science (2014) spark.apache.org 40A Brief History: Spark 2004 2010 MapReduce paper Spark paper 2002 2004 2006 2008 2010 2012 2014 2002 2008 2014 MapReduce Google Hadoop Summit Apache Spark toplevel 2006 Hadoop Yahoo Spark: Cluster Computing with Working Sets Matei Zaharia, Mosharaf Chowdhury, 
 Michael J. Franklin, Scott Shenker, Ion Stoica USENIX HotCloud (2010)
 people.csail.mit.edu/matei/papers/2010/hotcloudspark.pdf Resilient Distributed Datasets: A FaultTolerant Abstraction for InMemory Cluster Computing Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, 
 Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion Stoica NSDI (2012) usenix.org/system/files/conference/nsdi12/nsdi12final138.pdf 41A Brief History: Spark Unlike the various specialized systems, Spark’s goal was to generalize MapReduce to support new apps within same engine Two reasonably small additions are enough to express the previous models: fast data sharing • general DAGs • This allows for an approach which is more efficient for the engine, and much simpler 
 for the end users 42A Brief History: Spark 43A Brief History: Spark used as libs, instead of specialized systems 44A Brief History: Spark Some key points about Spark: handles batch, interactive, and realtime 
 • within a single framework native integration with Java, Python, Scala • programming at a higher level of abstraction • more general: map/reduce is just one set 
 • of supported constructs 45A Brief History: Key distinctions for Spark vs. MapReduce generalized patterns
 • unified engine for many use cases lazy evaluation of the lineage graph
 • reduces wait states, better pipelining generational differences in hardware
 • offheap use of large memory spaces functional programming / ease of use
 • reduction in cost to maintain large apps lower overhead for starting jobs • less expensive shuffles • 46TL;DR: Smashing The Previous Petabyte Sort Record databricks.com/blog/2014/11/05/sparkofficially setsanewrecordinlargescalesorting.html 47TL;DR: Sustained Exponential Growth Spark is one of the most active Apache projects ohloh.net/orgs/apache 48TL;DR: Spark Just Passed Hadoop in Popularity on Web datanami.com/2014/11/21/sparkjustpassed hadooppopularitywebheres/ In October Apache Spark (blue line) passed Apache Hadoop (red line) in popularity according to Google Trends 49TL;DR: Spark Expertise Tops Median Salaries within Big Data oreilly.com/data/free/2014datascience salarysurvey.csp 5004: Getting Started Simple Spark Apps lab: 20 minSimple Spark Apps: WordCount Definition: count how often each word appears 
 void map (String docid, String text): count how often each word appears 
 in a collection of text documents for each word w in segment(text): in a collection of text documents emit(w, "1"); This simple program provides a good test case 
 for parallel processing, since it: void reduce (String word, Iterator group): requires a minimal amount of code • int count = 0; demonstrates use of both symbolic and 
 • numeric values for each pc in group: count += Int(pc); isn’t many steps away from search indexing • serves as a “Hello World” for Big Data apps • emit(word, String(count)); A distributed computing framework that can run WordCount efficiently in parallel at scale 
 can likely handle much larger and more interesting compute problems 52Simple Spark Apps: WordCount WordCount in 3 lines of Spark WordCount in 50+ lines of Java MR 53Simple Spark Apps: WordCount Scala: val f = sc.textFile("README.md") val wc = f.flatMap(l = l.split(" ")).map(word = (word, 1)).reduceByKey( + ) wc.saveAsTextFile("wcout") Python: from operator import add f = sc.textFile("README.md") wc = f.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add) wc.saveAsTextFile("wcout") 54Simple Spark Apps: WordCount Scala: val f = sc.textFile( val wc wc.saveAsTextFile( Checkpoint: 
 how many “Spark” keywords Python: from operator f = sc wc = f wc.saveAsTextFile( 55Simple Spark Apps: Source Code val format = new java.text.SimpleDateFormat("yyyyMMdd") case class Register (d: java.util.Date, uuid: String, custid: String, lat: Float, lng: Float) case class Click (d: java.util.Date, uuid: String, landingpage: Int) val reg = sc.textFile("reg.tsv").map(.split("\t")).map( r = (r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat)) ) val clk = sc.textFile("clk.tsv").map(.split("\t")).map( c = (c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt)) ) reg.join(clk).collect() 56Simple Spark Apps: Operator Graph scala reg.join(clk).toDebugString res5: String = FlatMappedValuesRDD46 at join at console:23 (1 partitions) MappedValuesRDD45 at join at console:23 (1 partitions) CoGroupedRDD44 at join at console:23 (1 partitions) MappedRDD36 at map at console:16 (1 partitions) MappedRDD35 at map at console:16 (1 partitions) MappedRDD34 at textFile at console:16 (1 partitions) HadoopRDD33 at textFile at console:16 (1 partitions) MappedRDD40 at map at console:16 (1 partitions) MappedRDD39 at map at console:16 (1 partitions) MappedRDD38 at textFile at console:16 (1 partitions) HadoopRDD37 at textFile at console:16 (1 partitions) cached stage 1 partition A: B: RDD E: map() map() stage 2 join() C: D: stage 3 map() map() 57Simple Spark Apps: Operator Graph cached stage 1 partition A: B: RDD E: map() map() stage 2 join() C: D: stage 3 map() map() 58Simple Spark Apps: Assignment Using the README.md and CONTRIBUTING.md
 files in the Spark directory: 1. create RDDs to filter each line for the 
 keyword “Spark” 2. perform a WordCount on each, i.e., so the results are (K, V) pairs of (word, count) 3. join the two RDDs 59Simple Spark Apps: Assignment Using the in the Spark directory: 1. create RDDs to filter each line for the keyword “Spark” Checkpoint: 
 2. perform a WordCount on each, i.e., so the how many “Spark” keywords results are (K, V) pairs of (word, count) 3. join the two RDDs 60(break) break: 15 min05: Intro Spark Apps Spark Essentials lecture/lab: 45 minSpark Essentials: Intro apps, showing examples in both 
 Scala and Python… Let’s start with the basic concepts in: spark.apache.org/docs/latest/scala programmingguide.html using, respectively: ./bin/sparkshell ./bin/pyspark alternatively, with IPython Notebook: IPYTHONOPTS="notebook pylab inline" ./bin/pyspark 63Spark Essentials: SparkContext First thing that a Spark program does is create a SparkContext object, which tells Spark how to access a cluster In the shell for either Scala or Python, this is the sc variable, which is created automatically Other programs must use a constructor to instantiate a new SparkContext Then in turn SparkContext gets used to create other variables 64Spark Essentials: SparkContext Scala: scala sc res: spark.SparkContext = spark.SparkContext470d1f30 Python: sc pyspark.context.SparkContext object at 0x7f7570783350 65Spark Essentials: Master The master parameter for a SparkContext determines which cluster to use master description run Spark locally with one worker thread 
 local (no parallelism) run Spark locally with K worker threads 
 localK (ideally set to cores) connect to a Spark standalone cluster; 
 spark://HOST:PORT PORT depends on config (7077 by default) connect to a Mesos cluster; 
 mesos://HOST:PORT PORT depends on config (5050 by default) 66Spark Essentials: Master spark.apache.org/docs/latest/cluster overview.html Worker Node Executor cache task task Driver Program Cluster Manager SparkContext Worker Node Executor cache task task 67Spark Essentials: Clusters 1. master connects to a cluster manager to allocate resources across applications 2. acquires executors on cluster nodes – processes run compute tasks, cache data 3. sends app code to the executors 4. sends tasks for the executors to run Worker Node Executor cache task task Driver Program Cluster Manager SparkContext Worker Node Executor cache task task 68Spark Essentials: RDD Resilient Distributed Datasets (RDD) are the primary abstraction in Spark – a faulttolerant collection of elements that can be operated on 
 in parallel There are currently two types: parallelized collections – take an existing Scala • collection and run functions on it in parallel Hadoop datasets – run functions on each record • of a file in Hadoop distributed file system or any other storage system supported by Hadoop 69Spark Essentials: RDD two types of operations on RDDs: 
 • transformations and actions transformations are lazy 
 • (not computed immediately) the transformed RDD gets recomputed 
 • when an action is run on it (default) however, an RDD can be persisted into 
 • storage in memory or disk 70Spark Essentials: RDD Scala: scala val data = Array(1, 2, 3, 4, 5) data: ArrayInt = Array(1, 2, 3, 4, 5) scala val distData = sc.parallelize(data) distData: spark.RDDInt = spark.ParallelCollection10d13e3e Python: data = 1, 2, 3, 4, 5 data 1, 2, 3, 4, 5 distData = sc.parallelize(data) distData ParallelCollectionRDD0 at parallelize at PythonRDD.scala:229 71Spark Essentials: RDD Spark can create RDDs from any file stored in HDFS or other storage systems supported by Hadoop, e.g., local file system, Amazon S3, Hypertable, HBase, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat, and can also take a directory or a glob (e.g. /data/201404) RDD RDD RDD transformations action RDD value 72Spark Essentials: RDD Scala: scala val distFile = sc.textFile("README.md") distFile: spark.RDDString = spark.HadoopRDD1d4cee08 Python: distFile = sc.textFile("README.md") 14/04/19 23:42:40 INFO storage.MemoryStore: ensureFreeSpace(36827) called with curMem=0, maxMem=318111744 14/04/19 23:42:40 INFO storage.MemoryStore: Block broadcast0 stored as values to memory (estimated size 36.0 KB, free 303.3 MB) distFile MappedRDD2 at textFile at NativeMethodAccessorImpl.java:2 73Spark Essentials: Transformations Transformations create a new dataset from 
 an existing one All transformations in Spark are lazy: they 
 do not compute their results right away – instead they remember the transformations applied to some base dataset optimize the required calculations • recover from lost data partitions • 74Spark Essentials: Transformations transformation description return a new distributed dataset formed by passing 
 map(func) each element of the source through a function func return a new dataset formed by selecting those elements of the source on which func returns true filter(func) similar to map, but each input item can be mapped 
 to 0 or more output items (so func should return a 
 flatMap(func) Seq rather than a single item) sample a fraction fraction of the data, with or without sample(withReplacement, replacement, using a given random number generator seed fraction, seed) return a new dataset that contains the union of the elements in the source dataset and the argument union(otherDataset) return a new dataset that contains the distinct elements of the source dataset distinct(numTasks)) 75Spark Essentials: Transformations transformation description when called on a dataset of (K, V) pairs, returns a groupByKey(numTasks) dataset of (K, SeqV) pairs when called on a dataset of (K, V) pairs, returns 
 reduceByKey(func, a dataset of (K, V) pairs where the values for each 
 numTasks) key are aggregated using the given reduce function when called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) 
 sortByKey(ascending, pairs sorted by keys in ascending or descending order, numTasks) as specified in the boolean ascending argument when called on datasets of type (K, V) and (K, W), join(otherDataset, returns a dataset of (K, (V, W)) pairs with all pairs 
 numTasks) of elements for each key when called on datasets of type (K, V) and (K, W), cogroup(otherDataset, returns a dataset of (K, SeqV, SeqW) tuples – numTasks) also called groupWith when called on datasets of types T and U, returns a cartesian(otherDataset) dataset of (T, U) pairs (all pairs of elements) 76Spark Essentials: Transformations Scala: val distFile = sc.textFile("README.md") distFile is a collection of lines distFile.map(l = l.split(" ")).collect() distFile.flatMap(l = l.split(" ")).collect() Python: distFile = sc.textFile("README.md") distFile.map(lambda x: x.split(' ')).collect() distFile.flatMap(lambda x: x.split(' ')).collect() 77Spark Essentials: Transformations Scala: val distFile = sc.textFile("README.md") distFile.map(l = l.split(" ")).collect() distFile.flatMap(l = l.split(" ")).collect() closures Python: distFile = sc.textFile("README.md") distFile.map(lambda x: x.split(' ')).collect() distFile.flatMap(lambda x: x.split(' ')).collect() 78Spark Essentials: Transformations Scala: val distFile = sc.textFile("README.md") distFile.map(l = l.split(" ")).collect() distFile.flatMap(l = l.split(" ")).collect() closures Python: distFile = sc.textFile("README.md") distFile.map(lambda x: x.split(' ')).collect() distFile.flatMap(lambda x: x.split(' ')).collect() looking at the output, how would you 
 compare results for map() vs. flatMap() 79Spark Essentials: Transformations Using closures is now possible in Java 8 with lambda expressions support, see the tutorial: databricks.com/blog/2014/04/14/Sparkwith Java8.html RDD RDD RDD transformations action RDD value 80Spark Essentials: Transformations Java 7: JavaRDDString distFile = sc.textFile("README.md"); // Map each line to multiple words JavaRDDString words = distFile.flatMap( new FlatMapFunctionString, String() public IterableString call(String line) return Arrays.asList(line.split(" ")); ); Java 8: JavaRDDString distFile = sc.textFile("README.md"); JavaRDDString words = distFile.flatMap(line Arrays.asList(line.split(" "))); 81Spark Essentials: Actions action description aggregate the elements of the dataset using a function func (which takes two arguments and returns one), 
 reduce(func) and should also be commutative and associative so 
 that it can be computed correctly in parallel return all the elements of the dataset as an array at 
 the driver program – usually useful after a filter or collect() other operation that returns a sufficiently small subset of the data return the number of elements in the dataset count() return the first element of the dataset – similar to first() take(1) return an array with the first n elements of the dataset – currently not executed in parallel, instead the driver take(n) program computes all the elements return an array with a random sample of num elements takeSample(withReplacement, of the dataset, with or without replacement, using the fraction, seed) given random number generator seed 82Spark Essentials: Actions action description write the elements of the dataset as a text file (or set 
 of text files) in a given directory in the local filesystem, HDFS or any other Hadoopsupported file system. saveAsTextFile(path) Spark will call toString on each element to convert 
 it to a line of text in the file write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoopsupported file system. 
 Only available on RDDs of keyvalue pairs that either saveAsSequenceFile(path) implement Hadoop's Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). only available on RDDs of type (K, V). Returns a 
 countByKey() `Map` of (K, Int) pairs with the count of each key run a function func on each element of the dataset – usually done for side effects such as updating an foreach(func) accumulator variable or interacting with external storage systems 83Spark Essentials: Actions Scala: val f = sc.textFile("README.md") val words = f.flatMap(l = l.split(" ")).map(word = (word, 1)) words.reduceByKey( + ).collect.foreach(println) Python: from operator import add f = sc.textFile("README.md") words = f.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)) words.reduceByKey(add).collect() 84Spark Essentials: Persistence Spark can persist (or cache) a dataset in memory across operations Each node stores in memory any slices of it that it computes and reuses them in other actions on that dataset – often making future actions more than 10x faster The cache is faulttolerant: if any partition 
 of an RDD is lost, it will automatically be recomputed using the transformations that originally created it 85Spark Essentials: Persistence transformation description Store RDD as deserialized Java objects in the JVM. 
 If the RDD does not fit in memory, some partitions 
 MEMORYONLY will not be cached and will be recomputed on the fly each time they're needed. This is the default level. Store RDD as deserialized Java objects in the JVM. 
 If the RDD does not fit in memory, store the partitions MEMORYANDDISK that don't fit on disk, and read them from there when they're needed. Store RDD as serialized Java objects (one byte array 
 per partition). This is generally more spaceefficient 
 MEMORYONLYSER than deserialized objects, especially when using a fast serializer, but more CPUintensive to read. Similar to MEMORYONLYSER, but spill partitions that don't fit in memory to disk instead of recomputing MEMORYANDDISKSER them on the fly each time they're needed. Store the RDD partitions only on disk. DISKONLY MEMORYONLY2, Same as the levels above, but replicate each partition 
 on two cluster nodes. MEMORYANDDISK2, etc See: 
 http://spark.apache.org/docs/latest/programmingguide.htmlrddpersistence 86Spark Essentials: Persistence Scala: val f = sc.textFile("README.md") val w = f.flatMap(l = l.split(" ")).map(word = (word, 1)).cache() w.reduceByKey( + ).collect.foreach(println) Python: from operator import add f = sc.textFile("README.md") w = f.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).cache() w.reduceByKey(add).collect() 87Spark Essentials: Broadcast Variables Broadcast variables let programmer keep a readonly variable cached on each machine rather than shipping a copy of it with tasks For example, to give every node a copy of 
 a large input dataset efficiently Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost 88Spark Essentials: Broadcast Variables Scala: val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value Python: broadcastVar = sc.broadcast(list(range(1, 4))) broadcastVar.value 89Spark Essentials: Accumulators Accumulators are variables that can only be “added” to through an associative operation Used to implement counters and sums, efficiently in parallel Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can extend 
 for new types Only the driver program can read an accumulator’s value, not the tasks 90Spark Essentials: Accumulators Scala: val accum = sc.accumulator(0) sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x) accum.value Python: accum = sc.accumulator(0) rdd = sc.parallelize(1, 2, 3, 4) def f(x): global accum accum += x rdd.foreach(f) accum.value 91Spark Essentials: Accumulators Scala: val accum = sc.accumulator(0) sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x) accum.value driverside Python: accum = sc.accumulator(0) rdd = sc.parallelize(1, 2, 3, 4) def f(x): global accum accum += x rdd.foreach(f) accum.value 92Spark Essentials: (K, V) pairs Scala: val pair = (a, b)   pair.1 // = a pair.2 // = b Python: pair = (a, b)   pair0 = a pair1 = b Java: Tuple2 pair = new Tuple2(a, b);   pair.1 // = a pair.2 // = b 93Spark Essentials: API Details For more details about the Scala/Java API: spark.apache.org/docs/latest/api/scala/ index.htmlorg.apache.spark.package For more details about the Python API: spark.apache.org/docs/latest/api/python/ 9406: Intro Spark Apps Spark Examples lecture/lab: 10 minSpark Examples: Estimate Pi Next, try using a Monte Carlo method to estimate the value of Pi ./bin/runexample SparkPi 2 local wikipedia.org/wiki/MonteCarlomethod 96Spark Examples: Estimate Pi import scala.math.random import org.apache.spark. / Computes an approximation to pi / object SparkPi def main(args: ArrayString) val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length 0) args(0).toInt else 2 val n = 100000 slices val count = spark.parallelize(1 to n, slices).map i = val x = random 2 1 val y = random 2 1 if (xx + yy 1) 1 else 0 .reduce( + ) println("Pi is roughly " + 4.0 count / n) spark.stop() 97Spark Examples: Estimate Pi val count = sc.parallelize(1 to n, slices) base RDD .map i = val x = random 2 1 transformed RDD val y = random 2 1 if (xx + yy 1) 1 else 0 .reduce( + ) action RDD RDD RDD transformations action value RDD 98Spark Examples: Estimate Pi val count base RDD .map val transformed RDD val if .reduce action Checkpoint: 
 what estimate do you get for Pi RDD RDD RDD transformations action value RDD 99Spark Examples: KMeans Next, try using KMeans to cluster a set of 
 vector values: cp ../data/examplesdata/kmeansdata.txt . ./bin/runexample SparkKMeans kmeansdata.txt 3 0.01 local Based on the data set: 0.0 0.0 0.0 0.1 0.1 0.1 0.2 0.2 0.2 9.0 9.0 9.0 9.1 9.1 9.1 9.2 9.2 9.2 Please refer to the source code in: examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala 100Spark Examples: PageRank Next, try using PageRank to rank the relationships 
 in a graph: cp ../data/examplesdata/pagerankdata.txt . ./bin/runexample SparkPageRank pagerankdata.txt 10 local Based on the data set: 1 2 1 3 1 4 2 1 3 1 4 1 Please refer to the source code in: examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala 101(lunch) lunch: 60 min ishLunch: Depending on the venue: if not catered, we’re off to find food • we’ll lock the room to secure valuables • Let’s take an hour or so… Networking is some of the best part 
 of these workshops 10307: Data Workflows Unifying the Pieces lecture/demo: 45 minData Workflows: Again, unlike the various specialized systems, Spark’s goal was to generalize MapReduce to support new apps within same engine Two reasonably small additions allowed the previous specialized models to be expressed within Spark: fast data sharing • Pregel Giraph general DAGs • Dremel Drill Tez MapReduce Impala GraphLab Storm S4 Specialized Systems: General Batch Processing iterative, interactive, streaming, graph, etc. 105Data Workflows: Unifying the pieces into a single app: 
 Spark SQL, Streaming, MLlib, GraphX, etc. discuss how the same business logic can 
 • be deployed across multiple topologies demo Spark SQL, Spark Streaming • discuss MLlib, GraphX • Spark Spark MLlib GraphX SQL Streaming Spark Tachyon 106Data Workflows: Spark SQL blurs the lines between RDDs and relational tables spark.apache.org/docs/latest/sqlprogramming guide.html intermix SQL commands to query external data, along with complex analytics, in a single app: allows SQL extensions based on MLlib • Shark is being migrated to Spark SQL • Spark SQL: Manipulating Structured Data Using Spark
 Michael Armbrust, Reynold Xin (20140324)
 databricks.com/blog/2014/03/26/SparkSQL manipulatingstructureddatausingSpark.html 107Data Workflows: Spark SQL val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext. // Define the schema using a case class. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/ people.txt").map(.split(",")).map(p = Person(p(0), p(1).trim.toInt)) people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sql("SELECT name FROM people WHERE age = 13 AND age = 19") // The results of SQL queries are SchemaRDDs and support all the // normal RDD operations. // The columns of a row in the result can be accessed by ordinal. teenagers.map(t = "Name: " + t(0)).collect().foreach(println) 108Data Workflows: Spark SQL val sqlContext import // Define the schema using a case class. case class // Create an RDD of Person objects and register it as a table. val people people.txt" Checkpoint: 
 people what name do you get // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers // The results of SQL queries are SchemaRDDs and support all the // normal RDD operations. // The columns of a row in the result can be accessed by ordinal. teenagers 109Data Workflows: Spark SQL: queries in HiveQL //val sc: SparkContext // An existing SparkContext. //NB: example on laptop lacks a Hive MetaStore val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) // Importing the SQL context gives access to all the // public SQL functions and implicit conversions. import hiveContext.   hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")   // Queries are expressed in HiveQL hql("FROM src SELECT key, value").collect().foreach(println) 110Data Workflows: Spark SQL: Parquet Parquet is a columnar format, supported 
 by many different Big Data frameworks http://parquet.io/ Spark SQL supports read/write of parquet files, 
 automatically preserving the schema of the 
 original data (HUGE benefits) Modifying the previous example… 111Data Workflows: Spark SQL: Parquet val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext. // Define the schema using a case class. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/ people.txt").map(.split(",")).map(p = Person(p(0), p(1).trim.toInt)) people.registerTempTable("people") // The RDD is implicitly converted to a SchemaRDD
 allowing it to be stored using parquet. people.saveAsParquetFile("people.parquet") // Read in the parquet file created above. Parquet files are // selfdescribing so the schema is preserved. // The result of loading a parquet file is also a JavaSchemaRDD. val parquetFile = sqlContext.parquetFile("people.parquet") //Parquet files can also be registered as tables and then used in // SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sql("SELECT name FROM parquetFile WHERE age = 13 AND age = 19") teenagers.collect().foreach(println)Data Workflows: Spark SQL: Parquet In particular, check out the query plan in the 
 console output: == Query Plan == Project name4:0 Filter ((age5:1 = 13) (age5:1 = 19)) ParquetTableScan name4,age5, (ParquetRelation people.parquet), None generated from the SQL query: SELECT name FROM parquetFile WHERE age = 13 AND age = 19 113Data Workflows: Spark SQL: Parquet An output directory get created for 
 each Parquet “file”: ls people.parquet/ .SUCCESS.crc .partr1.parquet.crc SUCCESS partr1.parquet .metadata.crc .partr2.parquet.crc metadata partr2.parquet   file people.parquet/partr1.parquet people.parquet/partr1.parquet: Par archive data gist.github.com/ceteri/ f2c3486062c9610eac1dfile05sparksqlparquettxt 114Data Workflows: Spark SQL: DSL Spark SQL also provides a DSL for queries Scala symbols represent columns in the underlying table, which are identifiers prefixed with a tick (') For a full list of the functions supported, see: spark.apache.org/docs/latest/api/scala/ index.htmlorg.apache.spark.sql.SchemaRDD …again, modifying the previous example For a comparison, check out LINQ:
 linqpad.net/WhyLINQBeatsSQL.aspx 115Data Workflows: Spark SQL: DSL val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext. // Define the schema using a case class. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/ people.txt").map(.split(",")).map(p = Person(p(0), p(1).trim.toInt)) // The following is the same as // 'SELECT name FROM people WHERE age = 13 AND age = 19' val teenagers = people.where('age = 13).where('age = 19).select('name) // The results of SQL queries are SchemaRDDs and support all the // normal RDD operations. // The columns of a row in the result can be accessed by ordinal. teenagers.map(t = "Name: " + t(0)).collect().foreach(println) 116Data Workflows: Spark SQL: PySpark Let’s also take a look at Spark SQL in PySpark, using IPython Notebook… spark.apache.org/docs/latest/api/scala/ index.htmlorg.apache.spark.sql.SchemaRDD To launch: IPYTHONOPTS="notebook pylab inline" ./bin/pyspark 117Data Workflows: Spark SQL: PySpark from pyspark.sql import SQLContext, Row sqlCtx = SQLContext(sc) Load a text file and convert each line to a dictionary lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p0, age=int(p1))) Infer the schema, and register the SchemaRDD as a table. In future versions of PySpark we would like to add support for registering RDDs with other datatypes as tables peopleTable = sqlCtx.inferSchema(people) peopleTable.registerTempTable("people") SQL can be run over SchemaRDDs that have been registered as a table teenagers = sqlCtx.sql("SELECT name FROM people WHERE age = 13 AND age = 19") teenNames = teenagers.map(lambda p: "Name: " + p.name) teenNames.collect() 118Data Workflows: Spark Streaming Spark Streaming extends the core API to allow highthroughput, faulttolerant stream processing of live data streams spark.apache.org/docs/latest/streaming programmingguide.html Discretized Streams: A FaultTolerant Model for Scalable Stream Processing Matei Zaharia, Tathagata Das, Haoyuan Li, 
 Timothy Hunter, Scott Shenker, Ion Stoica Berkeley EECS (20121214) www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS2012259.pdf 119Data Workflows: Spark Streaming Data can be ingested from many sources: 
 Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. Results can be pushed out to filesystems, databases, live dashboards, etc. Spark’s builtin machine learning algorithms and graph processing algorithms can be applied to data streams 120Data Workflows: Spark Streaming Comparisons: Twitter Storm • Yahoo S4 • Google MillWheel • 121Data Workflows: Spark Streaming in one terminal run the NetworkWordCount example in Spark Streaming expecting a data stream on the localhost:9999 TCP socket ./bin/runexample org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 in another terminal use Netcat http://nc110.sourceforge.net/ to generate a data stream on the localhost:9999 TCP socket nc lk 9999 hello world hi there fred what a nice world there 122Data Workflows: Spark Streaming // http://spark.apache.org/docs/latest/streamingprogrammingguide.html import org.apache.spark.streaming. import org.apache.spark.streaming.StreamingContext. // create a StreamingContext val ssc = new StreamingContext(sc, Seconds(10)) // create a DStream that will connect to serverIP:serverPort val lines = ssc.socketTextStream(serverIP, serverPort) // split each line into words val words = lines.flatMap(.split(" ")) // count each word in each batch val pairs = words.map(word = (word, 1)) val wordCounts = pairs.reduceByKey( + ) // print a few of the counts to the console wordCounts.print() ssc.start() // start the computation ssc.awaitTermination() // wait for the computation to terminate 123Data Workflows: Spark Streaming What the stream analysis produced: 14/04/19 13:41:28 INFO scheduler.TaskSetManager: Finished TID 3 in 17 ms on localhost (progress: 1/1) 14/04/19 13:41:28 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 14/04/19 13:41:28 INFO scheduler.DAGScheduler: Completed ResultTask(3, 1) 14/04/19 13:41:28 INFO scheduler.DAGScheduler: Stage 3 (take at DStream.scala:583) finished in 0.019 s 14/04/19 13:41:28 INFO spark.SparkContext: Job finished: take at DStream.scala:583, took 0.034258 s Time: 1397940088000 ms (hello,1) (what,1) (world,2) (there,2) (fred,1) (hi,1) (a,1) (nice,1) 124Data Workflows: MLlib // http://spark.apache.org/docs/latest/mllibguide.html val traindata = // RDD of Vector val model = KMeans.train(traindata, k=10) // evaluate the model val testdata = // RDD of Vector testdata.map(t = model.predict(t)).collect().foreach(println) MLI: An API for Distributed Machine Learning Evan Sparks, Ameet Talwalkar, et al. International Conference on Data Mining (2013) http://arxiv.org/abs/1310.5426 125Data Workflows: MLlib demo: Twitter Streaming Language Classifier
 databricks.gitbooks.io/databrickssparkreferenceapplications/ twitterclassifier/README.html 126Data Workflows: GraphX GraphX amplab.github.io/graphx/ extends the distributed faulttolerant collections API and interactive console of Spark with a new graph API which leverages recent advances in graph systems 
 (e.g., GraphLab) to enable users to easily and interactively build, transform, and reason about graph structured data at scale 127Data Workflows: GraphX unifying graphs and tables spark.apache.org/docs/latest/graphxprogramming guide.html ampcamp.berkeley.edu/bigdataminicourse/graph analyticswithgraphx.html 128Data Workflows: GraphX // http://spark.apache.org/docs/latest/graphxprogrammingguide.html import org.apache.spark.graphx. import org.apache.spark.rdd.RDD case class Peep(name: String, age: Int) val nodeArray = Array( (1L, Peep("Kim", 23)), (2L, Peep("Pat", 31)), (3L, Peep("Chris", 52)), (4L, Peep("Kelly", 39)), (5L, Peep("Leslie", 45)) ) val edgeArray = Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 5L, 3), Edge(4L, 1L, 1), Edge(5L, 3L, 9) ) val nodeRDD: RDD(Long, Peep) = sc.parallelize(nodeArray) val edgeRDD: RDDEdgeInt = sc.parallelize(edgeArray) val g: GraphPeep, Int = Graph(nodeRDD, edgeRDD) val results = g.triplets.filter(t = t.attr 7) for (triplet results.collect) println(s"triplet.srcAttr.name loves triplet.dstAttr.name") 129Data Workflows: GraphX demo: Simple Graph Query
 gist.github.com/ceteri/c2a692b5161b23d92ed1 130Data Workflows: GraphX Introduction to GraphX Joseph Gonzalez, Reynold Xin youtu.be/mKEn9C5bRck 131(break) break: 15 min08: Spark in Production The Full SDLC lecture/lab: 75 minSpark in Production: In the following, let’s consider the progression through a full software development lifecycle, step by step: 1. build 2. deploy 3. monitor 134Spark in Production: Build builds: build/run a JAR using Java + Maven • SBT primer • build/run a JAR using Scala + SBT • 135Spark in Production: Build: Java The following sequence shows how to build 
 a JAR file from a Java app, using Maven maven.apache.org/guides/introduction/ introductiontothepom.html First, connect into a different directory 
 • where you have space to create several files Then run the following commands… • 136Spark in Production: Build: Java Java source (cutpaste 1st following slide) mkdir p src/main/java cat src/main/java/SimpleApp.java project model (cutpaste 2nd following slide) cat pom.xml copy a file to use for data cp SPARKHOME/README.md . build the JAR mvn clean package run the JAR mvn exec:java Dexec.mainClass="SimpleApp" 137Spark in Production: Build: Java / SimpleApp.java / import org.apache.spark.api.java.; import org.apache.spark.api.java.function.Function; public class SimpleApp public static void main(String args) String logFile = "README.md"; JavaSparkContext sc = new JavaSparkContext("local", "Simple App", "SPARKHOME", new String"target/simpleproject1.0.jar"); JavaRDDString logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new FunctionString, Boolean() public Boolean call(String s) return s.contains("a"); ).count(); long numBs = logData.filter(new FunctionString, Boolean() public Boolean call(String s) return s.contains("b"); ).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); 138Spark in Production: Build: Java project groupIdedu.berkeley/groupId artifactIdsimpleproject/artifactId modelVersion4.0.0/modelVersion nameSimple Project/name packagingjar/packaging version1.0/version repositories repository idAkka repository/id urlhttp://repo.akka.io/releases/url /repository /repositories dependencies dependency Spark dependency groupIdorg.apache.spark/groupId artifactIdsparkcore2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoopclient/artifactId version2.2.0/version /dependency /dependencies /project 139Spark in Production: Build: Java Source files, commands, and expected output are shown in this gist: gist.github.com/ceteri/ f2c3486062c9610eac1dfile04javamaventxt …and the JAR file that we just used: ls target/simpleproject1.0.jar 140Spark in Production: Build: SBT builds: build/run a JAR using Java + Maven • SBT primer • build/run a JAR using Scala + SBT • 141Spark in Production: Build: SBT SBT is the Simple Build Tool for Scala: www.scalasbt.org/ This is included with the Spark download, and 
 does not need to be installed separately. Similar to Maven, however it provides for incremental compilation and an interactive shell, 
 among other innovations. SBT project uses StackOverflow for QA, 
 that’s a good resource to study further: stackoverflow.com/tags/sbt 142Spark in Production: Build: SBT command description delete all generated files 
 clean (in the target directory) create a JAR file package run the JAR 
 run (or main class, if named) compile the main sources 
 compile (in src/main/scala and src/main/java directories) compile and run all tests test launch a Scala interpreter console display detailed help for specified commands help 143Spark in Production: Build: Scala builds: build/run a JAR using Java + Maven • SBT primer • build/run a JAR using Scala + SBT • 144Spark in Production: Build: Scala The following sequence shows how to build 
 a JAR file from a Scala app, using SBT First, this requires the “source” download, • not the “binary” Connect into the SPARKHOME directory • Then run the following commands… • 145Spark in Production: Build: Scala Scala source + SBT build script on following slides cd simpleapp ../sbt/sbt Dsbt.ivy.home=../sbt/ivy package ../spark/bin/sparksubmit \ class "SimpleApp" \ master local \ target/scala2.10/simpleproject2.101.0.jar 146Spark in Production: Build: Scala / SimpleApp.scala / import org.apache.spark.SparkContext import org.apache.spark.SparkContext. object SimpleApp def main(args: ArrayString) val logFile = "README.md" // Should be some file on your system val sc = new SparkContext("local", "Simple App", "SPARKHOME", List("target/scala2.10/simpleproject2.101.0.jar")) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains("a")).count() val numBs = logData.filter(line = line.contains("b")).count() println("Lines with a: s, Lines with b: s".format(numAs, numBs)) 147Spark in Production: Build: Scala name := "Simple Project" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" "sparkcore2.10" "1.2.0" resolvers += "Akka Repository" at "http://repo.akka.io/releases/" 148Spark in Production: Deploy deploy JAR to Hadoop cluster, using these alternatives: discuss how to run atop Apache Mesos • discuss how to install on CM • discuss how to run on HDP • discuss how to run on MapR • discuss how to run on EC2 • discuss using SIMR (run shell within MR job) • …or, simply run the JAR on YARN • 149Spark in Production: Deploy: Mesos deploy JAR to Hadoop cluster, using these alternatives: discuss how to run atop Apache Mesos • discuss how to install on CM • discuss how to run on HDP • discuss how to run on MapR • discuss how to run on EC2 • discuss using SIMR (run shell within MR job) • …or, simply run the JAR on YARN • 150Spark in Production: Deploy: Mesos Apache Mesos, from which Apache Spark 
 originated… Running Spark on Mesos
 spark.apache.org/docs/latest/runningonmesos.html Run Apache Spark on Apache Mesos
 tutorial based on Mesosphere + Google Cloud
 ceteri.blogspot.com/2014/09/sparkatopmesosongooglecloud.html Getting Started Running Apache Spark on Apache Mesos
 O’Reilly Media webcast
 oreilly.com/pub/e/2986 151Spark in Production: Deploy: CM deploy JAR to Hadoop cluster, using these alternatives: discuss how to run atop Apache Mesos • discuss how to install on CM • discuss how to run on HDP • discuss how to run on MapR • discuss how to run on EC2 • discuss using SIMR (run shell within MR job) • …or, simply run the JAR on YARN • 152Spark in Production: Deploy: CM Cloudera Manager 4.8.x: cloudera.com/content/clouderacontent/cloudera docs/CM4Ent/latest/ClouderaManagerInstallation Guide/cmigsparkinstallationstandalone.html 5 steps to install the Spark parcel • 5 steps to configure and start the Spark service • Also check out Cloudera Live: cloudera.com/content/cloudera/en/productsand services/clouderalive.html 153Spark in Production: Deploy: HDP deploy JAR to Hadoop cluster, using these alternatives: discuss how to run atop Apache Mesos • discuss how to install on CM • discuss how to run on HDP • discuss how to run on MapR • discuss how to run on EC2 • discuss using SIMR (run shell within MR job) • …or, simply run the JAR on YARN • 154Spark in Production: Deploy: HDP Hortonworks provides support for running 
 Spark on HDP: spark.apache.org/docs/latest/hadoopthirdparty distributions.html hortonworks.com/blog/announcinghdp21tech previewcomponentapachespark/ 155Spark in Production: Deploy: MapR deploy JAR to Hadoop cluster, using these alternatives: discuss how to run atop Apache Mesos • discuss how to install on CM • discuss how to run on HDP • discuss how to run on MapR • discuss how to run on EC2 • discuss using SIMR (run shell within MR job) • …or, simply run the JAR on YARN • 156Spark in Production: Deploy: MapR MapR Technologies provides support for running 
 Spark on the MapR distros: mapr.com/products/apachespark slideshare.net/MapRTechnologies/mapr databrickswebinar4x3 157Spark in Production: Deploy: EC2 deploy JAR to Hadoop cluster, using these alternatives: discuss how to run atop Apache Mesos • discuss how to install on CM • discuss how to run on HDP • discuss how to run on MapR • discuss how to run on EC2 • discuss using SIMR (run shell within MR job) • …or, simply run the JAR on YARN • 158Spark in Production: Deploy: EC2 Running Spark on Amazon AWS EC2: blogs.aws.amazon.com/bigdata/post/Tx15AY5C50K70RV/ InstallingApacheSparkonanAmazonEMRCluster 159Spark in Production: Deploy: SIMR deploy JAR to Hadoop cluster, using these alternatives: discuss how to run atop Apache Mesos • discuss how to install on CM • discuss how to run on HDP • discuss how to run on MapR • discuss how to run on EC2 • discuss using SIMR (run shell within MR job) • …or, simply run the JAR on YARN • 160Spark in Production: Deploy: SIMR Spark in MapReduce (SIMR) – quick way 
 for Hadoop MR1 users to deploy Spark: databricks.github.io/simr/ sparksummit.org/talk/reddysimrletyour sparkjobssimmerinsidehadoopclusters/ Sparks run on Hadoop clusters without 
 • any install or required admin rights SIMR launches a Hadoop job that only 
 • contains mappers, includes Scala+Spark ./simr jarfile mainclass parameters 
 —outdir= —slots=N —unique 161Spark in Production: Deploy: YARN deploy JAR to Hadoop cluster, using these alternatives: discuss how to run atop Apache Mesos • discuss how to install on CM • discuss how to run on HDP • discuss how to run on MapR • discuss how to rum on EMR • discuss using SIMR (run shell within MR job) • …or, simply run the JAR on YARN • 162Spark in Production: Deploy: YARN spark.apache.org/docs/latest/runningonyarn.html Simplest way to deploy Spark apps in production • Does not require admin, just deploy apps to your • Hadoop cluster Apache Hadoop YARN
 Arun Murthy, et al.
 amazon.com/dp/0321934504 163Spark in Production: Deploy: HDFS examples Exploring data sets loaded from HDFS… 1. launch a Spark cluster using EC2 script 2. load data files into HDFS 3. run Spark shell to perform WordCount NB: be sure to use internal IP addresses on 
 AWS for the “hdfs://…” URLs 164Spark in Production: Deploy: HDFS examples http://spark.apache.org/docs/latest/ec2scripts.html cd SPARKHOME/ec2   export AWSACCESSKEYID=AWSACCESSKEY export AWSSECRETACCESSKEY=AWSSECRETKEY ./sparkec2 k spark i /spark.pem s 2 z useast1b launch foo   can review EC2 instances and their security groups to identify master ssh into master ./sparkec2 k spark i /spark.pem s 2 z useast1b login foo   use ./ephemeralhdfs/bin/hadoop to access HDFS /root/ephemeralhdfs/bin/hadoop fs mkdir /tmp /root/ephemeralhdfs/bin/hadoop fs put CHANGES.txt /tmp   now is the time when we Spark cd /root/spark export SPARKHOME=(pwd) SPARKHADOOPVERSION=1.0.4 sbt/sbt assembly /root/ephemeralhdfs/bin/hadoop fs put CHANGES.txt /tmp ./bin/sparkshell 165Spark in Production: Deploy: HDFS examples / NB: replace host IP with EC2 internal IP address / val f = sc.textFile("hdfs://10.72.61.192:9000/foo/CHANGES.txt") val counts = f.flatMap(line = line.split(" ")).map(word = (word, 1)).reduceByKey( + ) counts.collect().foreach(println) counts.saveAsTextFile("hdfs://10.72.61.192:9000/foo/wc") 166Spark in Production: Deploy: HDFS examples Let’s check the results in HDFS… root/ephemeralhdfs/bin/hadoop fs cat /tmp/wc/part (Adds,1) (alpha,2) (ssh,1) (graphite,1) (canonical,2) (ASF,3) (display,4) (synchronization,2) (instead,7) (javadoc,1) (hsaputra/updatepomasf,1) … 167Spark in Production: Monitor review UI features spark.apache.org/docs/latest/monitoring.html http://master:8080/ http://master:50070/ verify: is my job still running • drilldown into workers and stages • examine stdout and stderr • discuss how to diagnose / troubleshoot • 168Spark in Production: Monitor: AWS Console 169Spark in Production: Monitor: Spark Console 17009: Summary Case Studies discussion: 30 minhttp://databricks.com/certifiedonspark 172Summary: Case Studies Spark at Twitter: Evaluation Lessons Learnt
 Sriram Krishnan
 slideshare.net/krishflix/seattlesparkmeetup sparkattwitter Spark can be more interactive, efficient than MR • Support for iterative algorithms and caching • More generic than traditional MapReduce • Why is Spark faster than Hadoop MapReduce • Fewer I/O synchronization barriers • Less expensive shuffle • More complex the DAG, greater the • performance improvement 173Summary: Case Studies Using Spark to Ignite Data Analytics
 
 ebaytechblog.com/2014/05/28/usingsparkto ignitedataanalytics/ 174Summary: Case Studies Hadoop and Spark Join Forces in Yahoo
 Andy Feng
 sparksummit.org/talk/fenghadoopandspark joinforcesatyahoo/ 175Summary: Case Studies Collaborative Filtering with Spark
 Chris Johnson
 slideshare.net/MrChrisJohnson/collaborative filteringwithspark collab filter (ALS) for music recommendation • Hadoop suffers from I/O overhead • show a progression of code rewrites, converting • a Hadoopbased app into efficient use of Spark 176Summary: Case Studies Stratio Streaming: a new approach to 
 Spark Streaming David Morales, Oscar Mendez 20140630 sparksummit.org/2014/talk/stratio streaminganewapproachtospark streaming Stratio Streaming is the union of a realtime • messaging bus with a complex event processing engine using Spark Streaming allows the creation of streams and queries on the fly • paired with Siddhi CEP engine and Apache Kafka • added global features to the engine such as auditing • and statistics 177Summary: Case Studies Open Sourcing Our Spark Job Server
 Evan Chan
 engineering.ooyala.com/blog/opensourcingour sparkjobserver github.com/ooyala/sparkjobserver • REST server for submitting, running, managing • Spark jobs and contexts company vision for Spark is as a multiteam big • data service shares Spark RDDs in one SparkContext among • multiple jobs 178Summary: Case Studies Sharethrough Uses Spark Streaming to Optimize Bidding in Real Time Russell Cardullo, Michael Ruggier
 20140325 databricks.com/blog/2014/03/25/ sharethroughandsparkstreaming.html the profile of a 24 x 7 streaming app is different than • an hourly batch job… take time to validate output against the input… • confirm that supporting objects are being serialized… • the output of your Spark Streaming job is only as • reliable as the queue that feeds Spark… integration of Algebird • 179Summary: Case Studies Guavus Embeds Apache Spark 
 into its Operational Intelligence Platform 
 Deployed at the World’s Largest Telcos Eric Carr 20140925 databricks.com/blog/2014/09/25/guavusembedsapachespark intoitsoperationalintelligenceplatformdeployedatthe worldslargesttelcos.html 4 of 5 top mobile network operators, 3 of 5 top • Internet backbone providers, 80 MSOs in NorAm analyzing 50 of US mobile data traffic, +2.5 PB/day • latency is critical for resolving operational issues • before they cascade: 2.5 MM transactions per second “analyze first” not “store first ask questions later” • 180Summary: Case Studies One platform for all: realtime, nearrealtime, 
 and offline video analytics on Spark
 Davis Shepherd, Xi Liu
 sparksummit.org/talk/oneplatformforall realtimenearrealtimeandofflinevideo analyticsonspark 18110: Summary FollowUp discussion: 20 mincertification: Apache Spark developer certificate program • http://oreilly.com/go/sparkcert • defined by Spark experts Databricks • assessed by O’Reilly Media • establishes the bar for Spark expertiseMOOCs: Anthony Joseph
 UC Berkeley begins 20150223 edx.org/course/ucberkeleyx/uc berkeleyxcs1001x introductionbig6181 Ameet Talwalkar
 UCLA begins 20150414 edx.org/course/ucberkeleyx/ ucberkeleyxcs1901x scalablemachine6066community: spark.apache.org/community.html events worldwide: goo.gl/2YqJZK video+preso archives: sparksummit.org resources: databricks.com/sparktrainingresources workshops: databricks.com/sparktrainingbooks: Learning Spark
 Fast Data Processing 
 Holden Karau, 
 with Spark
 Andy Konwinski, Holden Karau
 Matei Zaharia
 Packt (2013)
 O’Reilly (2015)
 shop.oreilly.com/product/ shop.oreilly.com/product/ 9781782167068.do 0636920028512.do Spark in Action
 Chris Fregly
 Manning (2015)
 sparkinaction.com/Strata CA
 San Jose, Feb 1820
 events: strataconf.com/strata2015 Spark Summit East
 NYC, Mar 1819
 sparksummit.org/east Strata EU
 London, May 57
 strataconf.com/bigdataconferenceuk2015 Spark Summit 2015
 SF, Jun 1517
 sparksummit.org
Website URL
Comment