Question? Leave a message!




WORKING WITH SPARK

WORKING WITH SPARK 32
Using Apache Spark Pat McDonough Databricks Apache Spark spark.incubator.apache.org github.com/apache/incubatorspark userspark.incubator.apache.org The Spark Community +You  INTRODUCTION TO APACHE SPARK 2­‐5×  less  code   Up  to  10×  faster  on  disk,   100×  in  memory   What is Spark Fast and Expressive Cluster Computing System Compatible with Apache Hadoop     Efficient Usable •  Rich APIs in Java, •  General execution Scala, Python graphs •  Interactive shell •  Inmemory storage Key Concepts Write programs in terms of transformations on distributed datasets Resilient Distributed Datasets Operations •  Collections of objects spread •  Transformations" across a cluster, stored in RAM (e.g. map, filter, or on Disk groupBy) •  Built through parallel •  Actions" transformations (e.g. count, collect, save) •  Automatically rebuilt on failure Working With RDDs textFile = sc.textFile(”SomeFile.txt”) RDD RDD RDD RDD Action Value Transformations linesWithSpark.count() 74 linesWithSpark.first() Apache Spark linesWithSpark = textFile.filter(lambda line: "Spark” in line)Example: Log Mining Load error messages from a log into memory, then interactively search for various patterns Base  RDD   Transformed  RDD   Cache  1   lines = spark.textFile(“hdfs://...”) Worker   results   errors = lines.filter(lambda s: s.startswith(“ERROR”)) tasks   Block  1   messages = errors.map(lambda s: s.split(“\t”)2) Driver   messages.cache() Acon   Cache  2   messages.filter(lambda s: “mysql” in s).count() Worker   messages.filter(lambda s: “php” in s).count() Cache  3   . . . Block  2   Worker   Full­‐text  search  of  Wikipedia   •  60GB  on  20  EC2  machine   Block  3   •  0.5  sec  vs.  20s  for  on­‐disk  Scaling Down 100   80   60   40   20   0   Cache   25   50   75   Fully   disabled   cached    of  working  set  in  cache   Execution  time  (s)   69   58   41   30   12  Fault Recovery RDDs track lineage information that can be used to efficiently recompute lost data msgs = textFile.filter(lambda s: s.startsWith(“ERROR”)) .map(lambda s: s.split(“\t”)2) HDFS  File   Filtered  RDD   Mapped  RDD   filter   map   (func  =  startsWith(…))   (func  =  split(...))  Language Support Standalone Programs Python lines = sc.textFile(...) • Python, Scala, Java lines.filter(lambda s: “ERROR” in s).count() Interactive Shells Scala •  Python Scala val lines = sc.textFile(...) lines.filter(x = x.contains(“ERROR”)).count() Performance Java •  Java Scala are faster due JavaRDDString lines = sc.textFile(...); to static typing lines.filter(new FunctionString, Boolean() Boolean call(String s) •  …but Python is often fine return s.contains(“error”); ).count(); Interactive Shell •  The Fastest Way to Learn Spark •  Available in Python and Scala •  Runs as an application on an existing Spark Cluster… •  OR Can run locally Administrative GUIs h5p://Standalone  Master:8080  (by  default)  JOB EXECUTION Software Components Your  applicaon   •  Spark runs as a library in your SparkContext   program (1 instance per app) •  Runs tasks locally or on Cluster   Local   cluster manager   threads   –  Mesos, YARN or standalone Worker   Worker   mode Spark   Spark   executor   executor   •  Accesses storage systems via Hadoop InputFormat API HDFS  or  other  storage   –  Can use HBase, HDFS, S3, … Task Scheduler •  General task B:   A:   graphs F:   •  Automatically Stage  1   groupBy   pipelines functions C:   D:   E:   •  Data locality aware join   •  Partitioning aware" to avoid shuffles map   filter   Stage  2   Stage  3   =  RDD   =  cached  partition  Advanced Features •  Controllable partitioning – Speed up joins against a dataset •  Controllable storage formats – Keep data serialized for efficiency, replicate to multiple nodes, cache on disk •  Shared variables: broadcasts, accumulators •  See online docs for details Local Execution •  Just pass local or localk as master URL •  Debug using local debuggers – For Java / Scala, just run your program in a debugger – For Python, use an attachable debugger (e.g. PyDev) •  Great for development unit tests Cluster Execution •  Easiest way to launch is EC2:" " ./sparkec2 k keypair –i idrsa.pem –s slaves \" launchstopstartdestroy clusterName •  Several options for private clusters: –  Standalone mode (similar to Hadoop’s deploy scripts) –  Mesos –  Hadoop YARN •  Amazon EMR: tinyurl.com/sparkemr WORKING WITH SPARK Using the Shell Launching: sparkshell pyspark (IPYTHON=1) Modes: MASTER=local ./sparkshell local, 1 thread MASTER=local2 ./sparkshell local, 2 threads MASTER=spark://host:port ./sparkshell cluster SparkContext •  Main entry point to Spark functionality •  Available in shell as variable sc •  In standalone programs, you’d make your own (see later for details) Creating RDDs Turn a Python collection into an RDD   sc.parallelize(1, 2, 3) Load text file from local FS, HDFS, or S3   sc.textFile(“file.txt”)   sc.textFile(“directory/.txt”)   sc.textFile(“hdfs://namenode:9000/path/file”) Use existing Hadoop InputFormat (Java/Scala only)   sc.hadoopFile(keyClass, valClass, inputFmt, conf) Basic Transformations   nums = sc.parallelize(1, 2, 3) Pass each element through a function   squares = nums.map(lambda x: xx) // 1, 4, 9 Keep elements passing a predicate   even = squares.filter(lambda x: x 2 == 0) // 4 Map each element to zero or more others   nums.flatMap(lambda x: = range(x))   = 0, 0, 1, 0, 1, 2 Range  object  (sequence   of  numbers  0,  1,  …,  x­‐1)  Basic Actions   nums = sc.parallelize(1, 2, 3) Retrieve RDD contents as a local collection   nums.collect() = 1, 2, 3 Return first K elements   nums.take(2) = 1, 2 Count number of elements   nums.count() = 3 Merge elements with an associative function   nums.reduce(lambda x, y: x + y) = 6 Write elements to a text file   nums.saveAsTextFile(“hdfs://file.txt”) Working with KeyValue Pairs Spark’s “distributed reduce” transformations operate on RDDs of keyvalue pairs Python:    pair  =  (a,  b)                              pair0    =  a              pair1    =  b   Scala:      val  pair  =  (a,  b)            pair.1  //  =  a            pair.2  //  =  b   Java:    Tuple2  pair  =  new  Tuple2(a,  b);              pair.1  //  =  a            pair.2  //  =  b  Some KeyValue Operations   pets = sc.parallelize( (“cat”, 1), (“dog”, 1), (“cat”, 2))   pets.reduceByKey(lambda x, y: x + y) = (cat, 3), (dog, 1)   pets.groupByKey() = (cat, 1, 2), (dog, 1)   pets.sortByKey() = (cat, 1), (cat, 2), (dog, 1) reduceByKey also automatically implements combiners on the map side Example: Word Count   lines = sc.textFile(“hamlet.txt”)   counts = lines.flatMap(lambda line: line.split(“ ”)) .map(lambda word = (word, 1)) .reduceByKey(lambda x, y: x + y) “to”   (to,  1)   (be,  2)   “be”   (be,  1)   “to  be  or”   (not,  1)   “or”   (or,  1)   “not”   (not,  1)   (or,  1)   “not  to  be”   “to”   (to,  1)   (to,  2)   “be”   (be,  1)  Other KeyValue Operations   visits = sc.parallelize( (“index.html”, “1.2.3.4”), (“about.html”, “3.4.5.6”), (“index.html”, “1.3.3.1”) )   pageNames = sc.parallelize( (“index.html”, “Home”), (“about.html”, “About”) )   visits.join(pageNames) (“index.html”, (“1.2.3.4”, “Home”)) (“index.html”, (“1.3.3.1”, “Home”)) (“about.html”, (“3.4.5.6”, “About”))   visits.cogroup(pageNames) (“index.html”, (“1.2.3.4”, “1.3.3.1”, “Home”)) (“about.html”, (“3.4.5.6”, “About”)) Setting the Level of Parallelism All the pair RDD operations take an optional second parameter for number of tasks  words.reduceByKey(lambda x, y: x + y, 5)  words.groupByKey(5)  visits.join(pageViews, 5) Using Local Variables Any external variables you use in a closure will automatically be shipped to the cluster:   query = sys.stdin.readline()   pages.filter(lambda x: query in x).count() Some caveats: •  Each task gets a new copy (updates aren’t sent back) •  Variable must be Serializable / Pickleable •  Don’t use fields of an outer object (ships all of it) Closure Mishap Example This is a problem: How to get around it: class MyCoolRddApp class MyCoolRddApp val param = 3.14 ... val log = new Log(...) ... ... def work(rdd: RDDInt) def work(rdd: RDDInt) val param = param rdd.map(x = x + param) rdd.map(x = x + param) .reduce(...) .reduce(...) References  only  local  variable   NotSerializableExcepon:   instead  of  this.param MyCoolRddApp  (or  Log)  More RDD Operators •  map •  reduce sample •  filter •  count take •  groupBy •  fold first •  sort •  reduceByKey partitionBy •  union •  groupByKey •  join •  cogroup mapWith •  leftOuterJoin •  cross pipe •  rightOuterJoin •  zip save ... CREATING SPARK APPLICATIONS Add Spark to Your Project •  Scala / Java: add a Maven dependency on groupId: org.sparkproject" artifactId: sparkcore2.9.3" version: 0.8.0 •  Python: run program with our pyspark script Create a SparkContext import org.apache.spark.SparkContext import org.apache.spark.SparkContext. val sc = new SparkContext(“url”, “name”, “sparkHome”, Seq(“app.jar”)) Cluster  URL,  or   App   Spark  install   List  of  JARs  with   import org.apache.spark.api.java.JavaSparkContext; local  /  localN   name   path  on  cluster   app  code  (to  ship)   JavaSparkContext sc = new JavaSparkContext( “masterUrl”, “name”, “sparkHome”, new String “app.jar”)); from pyspark import SparkContext sc = SparkContext(“masterUrl”, “name”, “sparkHome”, “library.py”)) Python   Java   Scala  Complete App import sys from pyspark import SparkContext if name == "main": sc = SparkContext( “local”, “WordCount”, sys.argv0, None) lines = sc.textFile(sys.argv1) counts = lines.flatMap(lambda s: s.split(“ ”)) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda x, y: x + y) counts.saveAsTextFile(sys.argv2) EXAMPLE APPLICATION: PAGERANK Example: PageRank •  Good example of a more complex algorithm – Multiple stages of map reduce •  Benefits from Spark’s inmemory caching – Multiple iterations over the same data Basic Idea Give pages ranks (scores) based on links to them •  Links from many pages è high rank •  Link from a highrank page è high rank Image:  en.wikipedia.org/wiki/File:PageRank­‐hi­‐res­‐2.png    Algorithm 1.  Start each page at a rank of 1 2.  On each iteration, have page p contribute" rank / neighbors to its neighbors p p 3.  Set each page’s rank to 0.15 + 0.85 × contribs 1.0   1.0   1.0   1.0  Algorithm 1.  Start each page at a rank of 1 2.  On each iteration, have page p contribute" rank / neighbors to its neighbors p p 3.  Set each page’s rank to 0.15 + 0.85 × contribs 1.0   0.5   1   1   1.0   1.0   0.5   0.5   0.5   1.0  Algorithm 1.  Start each page at a rank of 1 2.  On each iteration, have page p contribute" rank / neighbors to its neighbors p p 3.  Set each page’s rank to 0.15 + 0.85 × contribs 1.85   1.0   0.58   0.58  Algorithm 1.  Start each page at a rank of 1 2.  On each iteration, have page p contribute" rank / neighbors to its neighbors p p 3.  Set each page’s rank to 0.15 + 0.85 × contribs 1.85   0.5   0.58   1.85   1.0   0.58   0.29   0.5   0.29   0.58  Algorithm 1.  Start each page at a rank of 1 2.  On each iteration, have page p contribute" rank / neighbors to its neighbors p p 3.  Set each page’s rank to 0.15 + 0.85 × contribs 1.31   1.72   0.39   .  .  .   0.58  Algorithm 1.  Start each page at a rank of 1 2.  On each iteration, have page p contribute" rank / neighbors to its neighbors p p 3.  Set each page’s rank to 0.15 + 0.85 × contribs 1.44   Final  state:   1.37   0.46   0.73  Scala Implementation val links = // load RDD of (url, neighbors) pairs var ranks = // load RDD of (url, rank) pairs for (i 1 to ITERATIONS) val contribs = links.join(ranks).flatMap case (url, (links, rank)) = links.map(dest = (dest, rank/links.size)) ranks = contribs.reduceByKey( + ) .mapValues(0.15 + 0.85 ) ranks.saveAsTextFile(...) PageRank Performance 200   Hadoop   150   Spark   100   50   0   30   60   Number  of  machines   IteraHon  Hme  (s)   171   23   80   14  Other Iterative Algorithms 155   Hadoop   K­‐Means  Clustering   4.1   Spark   0   30   60   90   120   150   180   110   Logisc  Regression   0.96   0   25   50   75   100   125   Time  per  Iteraon  (s)  CONCLUSION Conclusion •  Spark offers a rich API to make data analytics fast: both fast to write and fast to run •  Achieves 100x speedups in real applications •  Growing community with 25+ companies contributing Get Started Up and Running in a Few Steps •  Download •  Unzip •  Shell Project Resources •  Examples on the Project Site •  Examples in the Distribution •  Documentation hhp://spark.incubator.apache.org