Question? Leave a message!




Big Data Massive Parallel Processing (MapReduce)

Big Data Massive Parallel Processing (MapReduce)
Ghislain Fourny Big Data 6. Massive Parallel Processing (MapReduce) 1Let's begin with a field experiment 2200 blocks, 8 different shapes 3How many of each 4200 pieces distributed to 10 volunteers 5Task 1 (10 people) 6Task 1 (10 people) 1 3 1 2 1 1 7Task 2 (8 people) 8Task 2 (8 people) – part 1 aka "The big mess" 1 2 1 3 1 9Task 2 (8 people) – part 2 1 2 8 1 3 1 10Final summary 10 8 5 6 3 4 7 2 11Let's go 12So far, we have... Storage as file system (HDFS) 13So far, we have... Storage as tables (HBase) Storage as file system (HDFS) 14Data is only useful if we can query it Querying Storage as tables (HBase) Storage as file system (HDFS) 15... in parallel Querying Storage as tables (HBase) Storage as file system (HDFS) 16Data Processing Input data 17Data Processing Input data Query 18Data Processing Input data Query Output data 19MapReduce 20Data Processing: data comes in chunks Query 21Data Processing: the ideal case Query Query Query Query Query Query Query Query 22Data Processing: the worst case 23Data Processing: the typical case 24Data Processing: Map here... 25Data Processing: ... and shuffle there 26A common and useful subcase: MapReduce Input data Map Map Map Map Map Map Map Map Shuffle Reduce Reduce Reduce Reduce Reduce Reduce Reduce Reduce Output data 27Data Processing: Data Model Input data Map Map Map Map Map Map Map Map Intermediate data (shuffled) Reduce Reduce Reduce Reduce Reduce Reduce Reduce Reduce Output data 28Data Processing: Data Shape Keyvalue pairs Map Map Map Map Map Map Map Map Keyvalue pairs Reduce Reduce Reduce Reduce Reduce Reduce Reduce Reduce Keyvalue pairs 29Data Processing: Data Types key type 1 value type 1 Map Map Map Map Map Map Map Map key type I value type I Reduce Reduce Reduce Reduce Reduce Reduce Reduce Reduce key type A value type A 30Data Processing: Most often key type 1 value type 1 Map Map Map Map Map Map Map Map key type A value type A Reduce Reduce Reduce Reduce Reduce Reduce Reduce Reduce key type A value type A 31Splitting key 1 value key 2 value Split key 3 value key 4 value 32Mapping function key I value key 1 value Map key II value 33Mapping function... in parallel key I value key 1 value Map key II value key I value key 2 value Map key III value key II value key 3 value Map key III value 34Put it all together key I value key I value key II value key II value key I value key I value key III value key III value key II value key III value key II value key III value 35Sort by key key I value key I value key II value key I value key I value key I value key III value key II value key III value key II value key II value key III value key I value key III value 36Partition key I value key I value key I value key I value key I value key I value key II value key II value key II value key II value key III value key III value key III value key III value 37Reduce function key I value key I value key A value Reduce key I value 38Reduce function (with identical key sets) key A value A key A value B key A value Reduce key A value C 39Reduce function (most generic) key I value key I value key A value Reduce key I value key B value ( ) More is fine, but uncommon 40Reduce function... in parallel key I value key I value key A value Reduce key I value key II value key B value Reduce key II value key III value key C value Reduce key III value 41Overall 42 Map Sort Partition ReduceInput/Output formats 43Input and output formats From/to tables From/to files 44Formats: tabular Row ID 000 002 0A1 1E0 22A 4A2 RDBMS HBase 45Formats: tabular 46Formats: files (e.g., from HDFS) KeyValue SequenceFile Text 47Text files Lorem ipsum dolor 0 Lorem ipsum dolor sit amet, 18 sit amet, consectetur 27 consectetur adipiscing elit, sed 38 adipiscing elit, sed ... ... 48Text files: NLine Lorem ipsum dolor 0 Lorem ipsum dolor sit amet, sit amet, consectetur consectetur 27 adipiscing elit, sed adipiscing elit, sed ... ... 49KeyValue Lorem ipsum dolor Lorem●ipsum dolor sit amet, sit●amet, consectetur consectetur adipiscing elit, sed adipiscing●elit, sed ... ... 50Sequence files Hadoop binary format Stores generic keyvalues KeyLength Key ValueLength Value 51Optimization 52Optimization key type 1 value type 1 Mapper Map Map Map Map Map Map key type A value type A Reduce Reduce Reduce Reduce Reduce Reduce Reducer key type A value type A 53Optimization How to reduce the amount of key type 1 value type 1 data shuffled around pun intended (Eselsbrücke) Mapper Map Map Map Map Map Map key type A value type A Reduce Reduce Reduce Reduce Reduce Reduce Reducer key type A value type A 54Optimization: Combine key type 1 value type 1 Map Map Map Map Map Map Mapper key type A value type A Combine Combine Combine Combine Combine Combine key type A value type A Reduce Reduce Reduce Reduce Reduce Reduce Reducer key type A value type A 55Combine: the 90 case Often, the combine function is identical to the reduce function. Combine Reduce Disclaimer: there are assumptions 56Combine=Reduce: Assumption 1 Key/Value types must be identical for reduce input and output. key type A value type A Reduce Reduce Reduce Reduce Reduce Reduce key type A value type A 57Combine=Reduce : Assumption 2 Reduce function must be key A value A Commutative key A value B and key A value A Associative key A value B key A value C 58Optimization: Bring the Query to the Data Query Data 59MapReduce: the APIs 60Supported frameworks Hadoop MapReduce Java Streaming 61Supported frameworks Hadoop MapReduce Streaming Java 62Java API: Mapper import org.apache.hadoop.mapreduce.Mapper; public class MyOwnMapper extends MapperK1, V1, K2, V2 public void map(K1 key, V1 value, Context context) throws IOException, InterruptedException ... K2 newkey = ... V2 newvalue = ... context.write(newkey, newvalue); ... 63Java API: Reducer import org.apache.hadoop.mapreduce.Reducer; public class MyOwnReducer extends ReducerK2, V2, K3, V3 public void reduce (K2 key, IterableV2 values, Context context) throws IOException, InterruptedException ... K3 newkey = ... V3 newvalue = ... context.write(newkey, newvalue); ... 64Java API: Job import org.apache.hadoop.mapreduce.Job; public class MyMapReduceJob public static void main(String args) throws Exception Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setMapperClass(MyOwnMapper.class); job.setReducerClass(MyOwnReducer.class); FileInputFormat.addInputPath(job, ...); FileOutputFormat.setOutputPath(job, ...); System.exit(job.waitForCompletion(true) 0 : 1); 65Java API: Combiner (=Reducer) import org.apache.hadoop.mapreduce.Job; public class MyMapReduceJob public static void main(String args) throws Exception Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setMapperClass(MyOwnMapper.class); job.setCombinerClass(MyOwnReducer.class); job.setReducerClass(MyOwnReducer.class); FileInputFormat.addInputPath(job, ...); FileOutputFormat.setOutputPath(job, ...); System.exit(job.waitForCompletion(true) 0 : 1); 66Java API: InputFormat classes InputFormat RDBMS DBInputFormat HBase TableInputFormat FileInputFormat Key value file KeyValueTextInputFormat Sequence file SequenceFileInputFormat TextInputFormat Text FixedLengthInputFormat NLineInputFormat 67Java API: OutputFormat classes OutputFormat RDBMS DBOutputFormat HBase TableOutputFormat FileoutputFormat Sequence file SequenceFileOutputFormat TextOutputFormat Text MapFileOutputFormat 68MapReduce: the physical layer 69Possible storage layers Hadoop MapReduce Azure Local HDFS S3 Blob Filesystem Storage 70Possible storage layers Hadoop MapReduce Azure Local S3 Blob HDFS Filesystem Storage 71Hadoop MapReduce: Numbers Several TBs of data Data Ma Ma p Ma pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa pMa p p 1000s of nodes 72Hadoop infrastructure (version 1) Namenode Datanode Datanode Datanode Datanode Datanode Datanode 73Masterslave architecture Master Slave Slave Slave Slave Slave Slave 74Hadoop infrastructure (version 1) Namenode + JobTracker Datanode Datanode Datanode Datanode Datanode Datanode + + + + + + 75 TaskTracker TaskTracker TaskTracker TaskTracker TaskTracker TaskTrackerHadoop infrastructure (version 1) Namenode + JobTracker Bring the Query to the Data Datanode Datanode Datanode Datanode Datanode Datanode + + + + + + 76 TaskTracker TaskTracker TaskTracker TaskTracker TaskTracker TaskTrackerTasks Task = or 77Splits Map Map Map Map Map Map Map Map Reduce Reduce Reduce Reduce Reduce Reduce Reduce Reduce 78Splits Map Map Map Map Map Map Map Map Reduce Reduce Reduce Reduce Reduce Reduce Reduce Reduce 79Splits vs. map tasks Split 1 split = 1 map task M M M M M 80In practice M M M M Split 1 split = 1 block (subject to Block min and max size) 81Splits vs. blocks: possible confusion Logical Split Level (MapReduce) Record (key/value pair) Bit Physical Level Block (HDFS) 82Records across blocks Logical Split Level (MapReduce) Physical Level Block (HDFS) 83Records across blocks Logical Split Level (MapReduce) Remote read Physical Level Block (HDFS) 84Finetuning to adjust splits to blocks Logical Split Level (MapReduce) Physical Level Block (HDFS) 85Hadoop infrastructure (version 1) Namenode + JobTracker /dir/file Datanode Datanode Datanode Datanode Datanode Datanode + + + + + + 86 TaskTracker TaskTracker TaskTracker TaskTracker TaskTracker TaskTrackerHadoop infrastructure: map tasks Namenode + JobTracker /dir/file As many map tasks as splits M M M M Datanode Datanode Datanode Datanode Datanode Datanode + + + + + + 87 TaskTracker TaskTracker TaskTracker TaskTracker TaskTracker TaskTrackerHadoop infrastructure: map tasks Namenode + JobTracker Occasionally /dir/file As many map tasks not possible to as splits colocate task and block M M M M Datanode Datanode Datanode Datanode Datanode Datanode + + + + + + 88 TaskTracker TaskTracker TaskTracker TaskTracker TaskTracker TaskTrackerHadoop infrastructure: reduce tasks Namenode + JobTracker A few reduce tasks /dir/file R R Datanode Datanode Datanode Datanode Datanode Datanode + + + + + + 89 TaskTracker TaskTracker TaskTracker TaskTracker TaskTracker TaskTrackerHadoop infrastructure: shuffling (inbetween) Namenode + JobTracker M R /dir/file R R M M M M Datanode Datanode Datanode Datanode Datanode Datanode + + + + + + 90 TaskTracker TaskTracker TaskTracker TaskTracker TaskTracker TaskTrackerIssue 1: Tight coupling Namenode + JobTracker Datanode Datanode Datanode Datanode Datanode Datanode + + + + + + 91 TaskTracker TaskTracker TaskTracker TaskTracker TaskTracker TaskTrackerIssue 2: Scalability Namenode + Only one JobTracker Datanode Datanode Datanode Datanode Datanode Datanode + + + + + + 92 TaskTracker TaskTracker TaskTracker TaskTracker TaskTracker TaskTrackerShuffling phase Reducer Mappers 93Shuffling phase Reducer Each mapper sorts its output keyvalue pairs Mappers 94Spilling to disk Keyvalue pairs are spilled to disk if necessary 95Shuffling phase Reducer Gets its key value pairs over HTTP Mappers 96kirtchanut / 123RF Stock Photo YARN 97YARN Yet Another Resource Negotiator 98YARN Job Scheduling Resource Management Monitoring 99YARN architecture ResourceManager Container Container Container NodeManager NodeManager NodeManager NodeManager NodeManager NodeManager 100Remember... It does ring a bell, doesn't it 101Masterslave architecture Master Slave Slave Slave Slave Slave Slave 102HDFS server architecture Namenode /dir/file1 /dir/file2 /file3 Datanode Datanode Datanode Datanode Datanode Datanode 103Yarn ResourceManager Container Container Container NodeManager NodeManager NodeManager NodeManager NodeManager 104ResourceManager ResourceManager Scheduler + Applications Manager 105YARN: Client posts a job Client ResourceManager ApplicationClient Protocol Job Container Container Container NodeManager NodeManager NodeManager NodeManager NodeManager 106YARN: RM allocates an Application Master Client ResourceManager (Scheduler) ApplicationMaster Protocol Job Application Master Container Container NodeManager NodeManager NodeManager NodeManager NodeManager 107YARN: Application Master asks for Containers Client ResourceManager (Applications Manager) ApplicationMaster Protocol Job Application Master Container Container NodeManager NodeManager NodeManager NodeManager NodeManager 108Application Master communicates with containers ContainerManagement Protocol Execute Monitor Container Application Master Container Container Container 109Next week outlook: forward compatibility with DAGs of tasks 110