Big data processing - mapreduce programming

hadoop mapreduce big data analytics and mapreduce algorithms for big data analysis and big data mapreduce tutorial
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Comment
Applying MapReduce patterns to big data This chapter covers ■ Learning how to join data with map-side and reduce-side joins ■ Understanding how a secondary sort works ■ Discovering how partitioning works and how to globally sort data With your data safely in HDFS, it’s time to learn how to work with that data in MapReduce. Previous chapters showed you some MapReduce snippets in action when working with data serialization. In this chapter we’ll look at how to work effectively with big data in MapReduce to solve common problems. MAPREDUCE BASICS If you want to understand the mechanics of MapReduce and how to write basic MapReduce programs, it’s worthwhile to read Hadoop in Action by Chuck Lam. 139 140 CHAPTER 4 Applying MapReduce patterns to big data MapReduce contains many powerful features, but in this chapter we’ll focus on join- ing, sorting, and sampling. These three patterns are important because they’re natu- ral operations you’ll want to perform on your big data, and the goal of your clusters should be to squeeze as much performance as possible from your MapReduce jobs. The ability to join disparate and sparse data is a powerful MapReduce feature, but an awkward one in practice, so we’ll also look at advanced techniques to optimize join operations with large datasets. Examples of joins include combining log files with ref- erence data from a database and inbound link calculations on web graphs. Sorting in MapReduce is also a black art, and we’ll dive into the depths of MapReduce to understand how it works by examining two techniques that everyone will encounter at some point, secondary sort and total order sorting. We’ll wrap things up with a look at sampling in MapReduce, which provides the opportunity to quickly iterate over a large dataset by working with a small subset of that data. 4.1 Joining Joins are relational constructs you use to combine relations together (you’re probably familiar with them in the context of databases). In MapReduce joins are applicable in situations where you have two or more datasets you want to combine. An example would be when you want to combine your users (which you extracted from your OLTP database) with your log files that contain user activity details. Scenarios where it would be useful to combine these datasets together include these: ■ Data aggregations based on user demographics (such as differences in user habits between teenagers and users in their 30s) ■ To send an email to users who haven’t used the website for a prescribed num- ber of days ■ A feedback loop that examines a user’s browsing habits, allowing your system to recommend previously unexplored site features to the user All of these scenarios require you to join datasets together, and the two most common types of joins are inner joins and outer joins. Inner joins compare all tuples in relations L and R, and produce a result if a join predi- cate is satisfied. In contrast, outer joins don’t require both tuples to match based on a join predicate, and instead can retain a record MRLQ,QQHUHUMRRXWLQ/HIW from L or R even if no match exists. Figure 4.1 shows the different types of joins. In this section we’ll look at three joining strategies in MapReduce that support the two most common types of joins (inner and outer). The three strategies perform the join RXWHUMRL5LQJKWRXWHUMRL)XOOQ either in the map phase or in the reduce Figure 4.1 Different types of joins shown phase by taking advantage of the MapReduce as Venn diagrams used to combine relations sort-merge architecture: together Joining 141 1 Repartition join—A reduce-side join for situations where you’re joining two or more large datasets together 2 Replication join—A map-side join that works in situations where one of the data- sets is small enough to cache 3 Semi-join—Another map-side join where one dataset is initially too large to fit into memory, but after some filtering can be reduced down to a size that can fit in memory After we cover these joining strategies, we’ll include a decision tree so you can see what the best join strategy is for your situation. 4.1.1 Repartition join A repartition join is a reduce-side join that takes advantage of MapReduce’s sort- merge to group together records. It’s implemented as a single MapReduce job, and can support an N-way join, where N is the number of datasets being joined. The map phase is responsible for reading the data from the various datasets, deter- mining the join value for each record, and emitting that join value as the output key. The output value contains data that you’ll want to include when you combine datasets together in the reducer to produce the job output. A single reducer invocation receives all of the values for a join key emitted by the map function and partitions the data into N partitions. After the reducer has read all of the input records for the join value and partitioned them in memory, it performs a Cartesian product across all partitions and emits the results of each join. Figure 4.2 shows the repartition join at a high level. Reduce task Reducer values partitioned by data source A B Partition values  Map task Reduce function based on source  Map function Filter + Project  A B  Perform a Cartesian Emit key/value where product key is the join field Distinct datasets Map Reduce A Map B Reduce Map Figure 4.2 A basic MapReduce implementation of a repartition join 142 CHAPTER 4 Applying MapReduce patterns to big data FILTERING AND PROJECTION With a repartition join, and with MapReduce in general, it’s a good idea to cut down on the amount of data sent between the map and reduce phases, because it’s expensive to sort and transfer data between the two phases over a network. If reduce-side work can’t be avoided, as in the case of the repartition join, a good practice is to filter and project as much as possible in the map phase. Filtering is the act of discarding map input records that you don’t need to include in the job output. Projection is a relational alge- bra term and is used to cut down the fields sent to the reducer. For exam- ple, if you’re working with user data and you only care about the results of the join containing the age of a user, your map task should only project (or emit) the age field, and not any of the other fields for the user. TECHNIQUE 19 Optimized repartition joins The book Hadoop in Action contains an example of how to implement a repartition join using the org.apache.hadoop.contrib.utils.join Hadoop contrib package. The contrib package does all of the heavy lifting and only requires a handful of methods to be implemented. The contrib implementation of the repartition join is not space efficient; it requires all of the output values for a given join value to be loaded into memory before it can perform the multiway join. It’s more efficient to load the smaller of the datasets into memory and then iterate over the larger of datasets, performing the join along the way. Problem You want to perform a repartition join in MapReduce, but you want to do so without the overhead of caching all the records in the reducer. Solution This technique uses an optimized repartition join framework that caches just one of the datasets being joined to reduce the amount of data cached in the reducers. Discussion Appendix D includes an implementation of an optimized repartition join framework that’s modeled after the org.apache.hadoop.contrib.utils.join contrib package. This optimized framework only caches records from the smaller of the two datasets to cut down on the memory overhead of caching all the records. Figure 4.3 shows the improved repartition join in action. Figure 4.4 shows a class diagram broken into two parts, with a generic framework and some sample implementation classes Users of the join framework must provide concrete implementations of the OptimizedDataJoinMapperBase and the OptimizedDataJoinReducerBase classes. TECHNIQUE 19 Optimized repartition joins 143 5HWDVNGXFH B &DYDOXHVFKH 1 b KHVPDOOIURPW LRQ5HIXQFWGXFH B 2 0DSWDVN %GDWDVHW B 3 c b B 0DSIXQFWLRQ)LOWHU3URMHFW 1 A B 1 cRYHUHUDWH,W 2 IURPYDOXHV B 3 (PLWNH\YDOXHZKHUH ZDQGMRLLWKQ NH\LVDFRPSRVLWHNH\ B FDFKHG% 1 FRQWDLQQLJWKHMRLQILHOG A B 2 2 DQGWKHGDWDVHWLGHQWLILHU . B 3 . . 'LVWLQFW 3DUWLWLRQHG5HGXFHU GDWDVHWV E\MRLQNH\YDOXHV 0DSE\RUGHUHG GDWDVHW 5HGXFH 0DS 6PDHUOO %5HGXFH GDWDVHW 0DS Figure 4.3 An optimized MapReduce implementation of a repartition join Let’s say you want to join together some user details and logs that contain information about user activity. The first step is to determine which of the two datasets is smaller in size. For a reasonably sized website with these two datasets it’s likely that the user data- set will be smaller than the activity logs. Mapper Reducer Join framework OptimizedJobBase OptimizedDataJoinMapperBase OptimizedDataJoinReducerBase abstract Text generateInputTag(String inputFile) abstract OutputValue combine(String key, abstract OutputValue genMapOutputValue(Object o); OutputValue smallValue, abstract String genGroupKey(Object key, OutputValue ov); OutputValue largeValue) void map(Object key, Object value, ...) void reduce(Object key, Iterator values, ...) Join implementation SampleMap SampleReduce Figure 4.4 Class diagram showing main classes in the framework and sample implementation 144 CHAPTER 4 Applying MapReduce patterns to big data The user data in the following example consists of the user’s name, age, and state: cat test-data/ch4/users.txt anne 22 NY joe 39 CO alison 35 NY mike 69 VA marie 27 OR jim 21 OR bob 71 CA mary 53 NY dave 36 VA dude 50 CA. The user activity logs contain the user’s name, the action performed, and the source IP address. This file would normally be a much larger file than the user’s file: cat test-data/ch4/user-logs.txt jim logout 93.24.237.12 mike new_tweet 87.124.79.252 bob new_tweet 58.133.120.100 mike logout 55.237.104.36 jim new_tweet 93.24.237.12 marie view_user 122.158.130.90 First, you must provide the implementation of the OptimizedDataJoinMapperBase abstract class, which is called on the map side. The implementation class is respon- sible for creating the map output key and value, as well as informing the frame work whether the current input split being worked on is the smaller of the datasets 1 being joined: public class SampleMap extends OptimizedDataJoinMapperBase private boolean smaller; You hardcode the fact that the user’s file is the smaller file. Override protected Text generateInputTag(String inputFile) smaller = inputFile.contains("users.txt"); return new Text(inputFile); This MapReduce job will use This method needs to return a unique the KeyValueTextInputFormat, identifier for the supplied input file, so so the key contains the you echo back the filename. Override username, which is the join protected String generateGroupKey(Object key, field. OptimizedTaggedMapOutput output) return key.toString(); 1 GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/ manning/hip/ch4/joins/improved/SampleMap.java TECHNIQUE 19 Optimized repartition joins 145 Override protected boolean isInputSmaller(String inputFile) Indicate if this input split is return smaller; from the smaller file. Generate the output that will be sent Override to the reducer. Again, because this job protected OptimizedTaggedMapOutput generateTaggedMapOutput( will use KeyValueTextInputFormat, Object value) the value will be the user details, return new TextTaggedMapOutput((Text) value); which you echo back to the caller. Next up, you’ll write an implementation of the OptimizedDataJoinReducerBase abstract class, which is called on the reduce side. In this class you’re passed a map output key andtwomapoutputvaluesfromdifferentdatasets,andyouneedtoreturnthereduce 2 outputtuple: public class Reduce extends OptimizedDataJoinReducerBase private TextTaggedMapOutput output = new TextTaggedMapOutput(); private Text textOutput = new Text(); Override protected OptimizedTaggedMapOutput combine(String key, OptimizedTaggedMapOutput value1, You’re performing an inner OptimizedTaggedMapOutput value2) join, so if any of the values if(value1 == null value2 == null) are NULL, return a NULL, return null; which will result in no reducer output. Object values = smallValue.getData(), largeValue.getData() ; Combine both of textOutput.set(StringUtils.join(values, "\t")); the values as the output.setData(textOutput); reducer output return output; for the key. Finally, the job driver code needs to indicate the InputFormat class and set up the sec- 3 ondarysort: job.setInputFormat(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(CompositeKey.class); job.setMapOutputValueClass(TextTaggedMapOutput.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setPartitionerClass(CompositeKeyPartitioner.class); job.setOutputKeyComparatorClass(CompositeKeyComparator.class); job.setOutputValueGroupingComparator( CompositeKeyOnlyComparator.class); 2 GitHubsource—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/ manning/hip/ch4/joins/improved/SampleReduce.java 3 GitHubsource—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/ manning/hip/ch4/joins/improved/SampleMain.java 146 CHAPTER 4 Applying MapReduce patterns to big data You’re ready to run the join: hadoop fs -put test-data/ch4/users.txt users.txt hadoop fs -put test-data/ch4/user-logs.txt user-logs.txt bin/run.sh com.manning.hip.ch4.joins.improved.SampleMain \ users.txt,user-logs.txt \ The files being joined output hadoop fs -cat output/part bob 71 CA new_tweet 58.133.120.100 jim 21 OR logout 93.24.237.12 jim 21 OR new_tweet 93.24.237.12 jim 21 OR login 198.184.237.49 marie 27 OR login 58.133.120.100 marie 27 OR view_user 122.158.130.90 mike 69 VA new_tweet 87.124.79.252 mike 69 VA logout 55.237.104.36 If you refer back to the original files you joined, you can see that because you imple- mented an inner join, the output doesn’t include entries for the users anne, alison, and others that weren’t in the log file. Summary My join implementation improves on the Hadoop contrib join by buffering only the val- ues of the smaller dataset. But it still suffers from the problem of all the data being trans- mitted between the map and reduce phases, which is an expensive network cost to incur. Further, while the Hadoop contrib join package can support N-way joins, my implementation only supports two-way joins. A simple mechanism to further reduce the memory footprint of the reduce-side join is to be aggressive about projections in the map function. Projection is the act of cutting down on the fields that the map emits. For example, if you’re working with user data, and you only care about the result of the join containing the age of a user, then the map task should only project (or emit) the age field, and not any of the other fields for the user. This results in less network traffic between the map and reduce tasks, and also cuts down on the reducer memory overhead when performing the join. My repartition join implementation supports filtering and projections, like the original join contrib package. Filtering is supported by allowing the genMapOutputValue method to return NULL, and projections are supported by allowing this same method to define the contents of the output value. What if you want to avoid the overhead of sorting and transferring all of your data over the network to the reducer? The solution for this brings us to the two next join strategies, the replicated join and semi-join. 4.1.2 Replicated joins A replicated join is a map-side join, and gets its name from its function—the smallest of the datasets is replicated to all the map hosts. The replicated join is predicated on TECHNIQUE 19 Optimized repartition joins 147 the fact that one of the datasets being joined is small enough to be cached in memory. 4 You’ll use the distributed cache to copy the small dataset to the nodes running the map tasks, and use the initialization method of each map task to load the small dataset into a hashtable. Use the key from each record fed to the map function from the large dataset to look up the small dataset hashtable, and perform a join between the large dataset record and all of the records from the small dataset that match the join value. Figure 4.5 shows how the replicated join works in MapReduce. The implementation of the replicated join is straightforward, and you can see a demonstration in Hadoop in Action. Appendix D provides a generalized framework to perform replicated joins, which can work with data from any InputFormat and Output- Format. (We’ll use this framework in the next technique.) This join framework dynam- ically determines whether the contents of the distributed cache or the input split should be cached, depending on which is smaller. Is there a way to utilize map-side joins in cases where initially neither of the data- sets are small enough to fit in memory? Time to look at semi-joins. 4.1.3 Semi-joins Imagine a situation where you’re working with two large datasets that you want to join, such as user logs and user data from an OLTP database. Neither of these datasets is small enough to cache in a map task’s memory, so it would seem you’ll have to resign Read users from the Distributed cache  Map task distributed cache Map initialization Users Create cache  User hashtable Small dataset Look up and join users being  fed into the map function  Emit combined user values Map function Map Large dataset Map User logs Map Figure 4.5 Map-only replicated join 4 Hadoop’s distributed cache copies files located on the MapReduce client host, or files in HDFS, to the slave nodes before any map or reduce tasks are executed on the nodes. Tasks can read these files from their local disk to use as part of their work. 148 CHAPTER 4 Applying MapReduce patterns to big data yourself to performing a reduce-side join. Not necessarily—ask yourself this question: would one of the datasets fit into memory if you were to remove all records that didn’t match a record from the other dataset? In the example there’s a good chance that the users that appear in your logs are a small percentage of the overall set of users in your OLTP database, so by removing all the OLTP users that don’t appear in your logs, you could get the dataset down to a size that fits into memory. If this is the case, the semi- join is the solution. Figure 4.6 shows the three MapReduce jobs you’ll execute to perform a semi-join. Let’s look at what’s involved in writing a semi-join. TECHNIQUE 20 Implementing a semi-join When faced with the challenge of joining two large datasets together, the obvious choice is to go with a repartition join, which leverages the full MapReduce framework to perform the join on the reduce-side. In fact, this may be your only option if you can’t filter one of the datasets to a size that can be cached on the map side. However, if you believe that you could pare down one dataset to a manageable size, you may not have to use a repartition join. Problem You want to join large datasets together and at the same time avoid the overhead of the shuffle and sort phases. Solution In this technique you will use three MapReduce jobs to join two datasets together to avoid the overhead of a reducer-side join. This technique is useful in situations where you’re working with large datasets, but a job can be reduced down to a size that can fit into the memory of a task by filtering out records that don’t match the other dataset. Discussion For this technique you’ll leverage the replicated join code I wrote (see appendix D) to implement the last two steps in the MapReduce job. In this technique you’ll break down the three jobs illustrated in figure 4.6. JOB 1 The function of the first MapReduce job is to produce a set of unique user names that exist in the logs files. You do this by having the map function perform a projection of the user name, and in turn use the reducers to emit the user name. To cut down on the amount of data transferred between the map and reduce phases, have the map task cache all of the user names in a HashSet and emit the values of the HashSet in the cleanup method. Figure 4.7 shows the flow of this job.TECHNIQUE 20 Implementing a semi-join 149 Datasets to be joined 8VHUVVORJ8VHU Job 1 WDVHHODUHGDWJWKMRERSHRLQUVW7KHIUDWHV —JVUORXVHWKHHFDVRXULQ —GDQ VWLRQDWHLVWKPHQDVHUIXSUWRDXQLRGXFHVTXHVHRJVXUO Map Reduce 6LQJHGXHOHUUF Unique user 8VHUORJVVXQLTXH(PLW XSURGXFHVHWQLTXHVLOHJIVLQORPHQD XVHUQDPHV QRIXVHUDPHV Job 2 XUGRQVVHUDJXHORXLQTXWHKHQWZHEMHMRLQGDUHSOLFDWH7KHVHFRQGRESHUIRUPV JVORKHLVWLQಬWHQLWWGGKDGHXVHUVWFOXZKLFKZLOOHXVHUVಬGDWDVHW 'LVWULEXWHG FDFKH UX8QLTVHXH ILOHLQORQDPHVJ Map DWUVWK8VH 8VHUV Replicated join to JVWLQORLHV retain users that have LOHQWULHHLQORJIV Job 3 RGWQHHHZHHUZKMRLQDDHGLOLFDUH7KHIQDOMRLQOVRLVWS VVHUWXILOWHUHKHFDFKHWGRX 'LVWULEXWHG FDFKH KDW8VHUVW VLQORJHLVW Map Replicated join to join 8VHUORJV Figure 4.6 The VXOWVLUH-RQ original user logs with three MapReduce VILOWHUHGXVHURXW jobs that comprise a semi-join 150 CHAPTER 4 Applying MapReduce patterns to big data Job 1 The first job operates on the large dataset—in our case, the user logs—and produces a unique set of user names that exist in our logs. Map Reduce Single reducer Unique user User logs Emits unique produces unique set names in log file user names of user names Figure 4.7 The first job in the semi-join produces a unique set of user names that exist in the log files. 5 The following code shows the MapReduce job: public static class Map extends MapperText, Text, Text, NullWritable private SetString keys = new HashSetString(); Create the HashSet to cache Override the user names. protected void map(Text key, Text value, Context context) throws IOException, InterruptedException keys.add(key.toString()); Add the user name to the cache. Override protected void cleanup( Context context) throws IOException, InterruptedException Text outputKey = new Text(); for(String key: keys) outputKey.set(key); context.write(outputKey, NullWritable.get()); Write out the user names. public static class Reduce extends ReducerText, NullWritable, Text, NullWritable Override protected void reduce(Text key, IterableNullWritable values, Context context) throws IOException, InterruptedException context.write(key, NullWritable.get()); Write out the user names. The result of the first job is a unique set of users that appears in the log files. JOB 2 The second step is an elaborate filtering MapReduce job, where the goal is to remove users from the user dataset that don’t exist in the log data. This is a map-only job that 5 GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/ manning/hip/ch4/joins/semijoin/UniqueHashedKeyJob.javaTECHNIQUE 20 Implementing a semi-join 151 Job 2 URXDQGUVXVHJORTXHXQLKHQWWZHHEHMGMRLQRESHUIRUP7KHVHFRQGDUHSOLFDWHV RJVOHKLQWVWLQಬWHGLGDWVWKFOXXVHUHZKLFKZLOOHGXVHUVಬGDWDVHW 'LVWULEXWHG FDFKH U8QLTXVHXH LOORJILQQDPHHV Map Users that 8VHUV Replicated join to exist in logs retain users that have ILOHLQORJHQWULHV Figure 4.8 The second job in the semi-join removes users from the user dataset missing from the log data. uses a replicated join to cache the user names that appear in the log files and join them with the user’s dataset. The unique user’s output from job 1 will be substantially smaller than the entire user dataset, which makes it the natural selection for caching. Figure 4.8 shows the flow of this job. This is a good time to quickly look at the replicated join framework in appendix D. The framework has built-in support for KeyValueTextInputFormat and TextOutputFormat, and assumes that the key produced by the KeyValueTextInputFormat is the join key. As it happens, this is also how your data is laid out. You can see the class diagram for the framework in figure 4.9. The GenericReplicatedJoin class performs the join. The first three methods in the GenericReplicatedJoin class, as shown in figure 4.9, are extensible and allow the Mapper Join framework DistributedCacheFileReader GenericReplicatedJoin Extensible Pair readFromInputFormat(Object key, Object value) methods that can DistributedCacheFileReader getDistributedCacheReader() TextDistributedCacheFileReader support any Pair join(Pair inputSplitPair, Pair distCachePair) InputFormat and Override void setup(Context context) PairK,V OutputFormat Override void map(Object key, Object value, ...) Override void cleanup(Context context) Figure 4.9 The replicated join framework illustrating the class diagram 152 CHAPTER 4 Applying MapReduce patterns to big data behavior of the replicated join to be customized. The readFromInputFormat can be used to work with any InputFormat, and the getDistributedCacheReader method can be overridden to support any file formats in the distributed cache. For this step you’re interested in the join method, which produces the output key and value for the job. The default implementation combines the values of both datasets to produce the final output value. You want to change this to output only the value from the user’s 6 table, as follows: public class ReplicatedFilterJob extends GenericReplicatedJoin Override public Pair join(Pair inputSplitPair, Pair distCachePair) return inputSplitPair; You also need to add the files from job 1 into the distributed cache: for(FileStatus f: fs.listStatus(uniqueUserStatus)) if(f.getPath().getName().startsWith("part")) DistributedCache.addCacheFile( f.getPath().toUri(), conf); That’s it. Let’s take a look at the complete driver code, which leverages the GenericReplicatedJoin class: public class ReplicatedFilterJob extends GenericReplicatedJoin public static void runJob(Path usersPath, Path uniqueUsersPath, Path outputPath) throws Exception Configuration conf = new Configuration(); for(FileStatus f: fs.listStatus(uniqueUsersPath)) if(f.getPath().getName().startsWith("part")) DistributedCache.addCacheFile(f.getPath().toUri(), conf); Job job = new Job(conf); job.setJarByClass(ReplicatedFilterJob.class); job.setMapperClass(ReplicatedFilterJob.class); job.setNumReduceTasks(0); job.setInputFormatClass(KeyValueTextInputFormat.class); outputPath.getFileSystem(conf).delete(outputPath, true); FileInputFormat.setInputPaths(job, usersPath); 6 GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/ manning/hip/ch4/joins/semijoin/ReplicatedFilterJob.java TECHNIQUE 20 Implementing a semi-join 153 FileOutputFormat.setOutputPath(job, outputPath); if(job.waitForCompletion(true)) throw new Exception("Job failed"); Override public Pair join(Pair inputSplitPair, Pair distCachePair) return inputSplitPair; The output of the second job is the filtered users that also existed in the log output. JOB 3 In this final step you’ll combine the filtered users produced from job 2 with the original user logs. Ostensibly, the filtered users are now small enough to stick into memory, allowing you to put them in the distributed cache. Figure 4.10 shows the flow of this job. FileStatus usersStatus = fs.getFileStatus(usersPath); for(FileStatus f: fs.listStatus(usersPath)) if(f.getPath().getName().startsWith("part")) DistributedCache.addCacheFile( Add the filtered user files f.getPath().toUri(), conf); to the distributed cache. Job 3 The final join is also a replicated join, where we need to cache the filtered-out users. Distributed cache Users that exist in logs Map Replicated join to join User logs Join results original user logs with filtered-out users Figure 4.10 The third job in the semi-join combines the users produced from the second job with the original user logs. 154 CHAPTER 4 Applying MapReduce patterns to big data Again you’re using the replicated join to perform the join, but this time you won’t tweak the behavior of the join method because you want the data from both datasets toappearinthefinaloutput. Runthecodeandlookattheoutputproducedbyeachoftheprevioussteps: bin/run.sh com.manning.hip.ch4.joins.semijoin.Main \ users.txt user-logs.txt output The output directory shows three subdirectories corresponding to the hadoop fs -ls output three jobs you ran. /user/aholmes/output/filtered /user/aholmes/output/result /user/aholmes/output/unique hadoop fs -cat output/unique/part The output of the first job is the bob unique user names in the log file. jim marie mike The second job output shows the hadoop fs -cat output/filtered/part users’ file filtered by users that mike 69 VA were in the log file. marie 27 OR jim 21 OR bob 71 CA The final output has the results of hadoop fs -cat output/result/part the join between the user logs and jim logout 93.24.237.12 21 OR the filtered users. mike new_tweet 87.124.79.252 69 VA bob new_tweet 58.133.120.100 71 CA mike logout 55.237.104.36 69 VA jim new_tweet 93.24.237.12 21 OR marie view_user 122.158.130.90 27 OR jim login 198.184.237.49 21 OR marie login 58.133.120.100 27 OR Theoutputshowsthelogicalprogressionofthejobsinthesemi-joinandthefinaljoin output. Summary In this technique we looked at how to use a semi-join to combine two datasets together. The semi-join construct involves more steps than the other joins, but it’s a powerful way to leverage a map-side join even when working with large datasets (with thecaveatthatoneofthedatasetsmustbereducedtoasizethatfitsinmemory). With the three join strategies in hand, you may be wondering which one you shoulduseinwhatcircumstance. 4.1.4 Picking the best join strategy for your data Each of the join strategies we’ve covered has different strengths and weaknesses, so how do you determine which one is best suited for the data you’re working with?Sorting 155 RUPUSUHURFHVVLQJ:LOOLQJWRSHIS LUFDEHIOWHHGDVHQVWHSDQGRQHGDWW WRDVLHWGRZQKDWFDQEHGHGDRO LQWRPHPRU\" 1R HV &DQRQHGDWDVHWEH 6HPLMRLQ ORDGHGQLWRPHPRU\" HV 1R Figure 4.11 Decision tree 5HSDUWLWLRQMRLQ5HSOLFDWHGMRLQ for selecting a join strategy Figure 4.11 shows a decision tree you can use, which is modeled after the decision tree 7 presented in the paper “A Comparison of Join Algorithms.” I’ll summarize the decision tree shown in the previous figure with the following three points: ■ If one of your datasets is small enough to fit into a mapper’s memory, the map- only replicated join is sufficient. ■ If both datasets are large, and one dataset can be substantially reduced by prefil- tering elements that don’t match the other, the semi-join works well. ■ If you can’t preprocess your data and your data sizes are too large to cache, which means you have to perform the join in the reducer, repartition joins needs to be used. It’s possible to have reduce-side joins in MapReduce because MapReduce sorts and correlates the map output keys together. In the next section we’ll look at common sorting techniques in MapReduce. 4.2 Sorting MapReduce sorts data for two reasons: sorting allows MapReduce to group the map keys together so that reduce tasks can be called once per unique map key. And sorting allows users to sort job outputs when they have specific use cases that need sorting. Examples of these use cases include data analytical jobs where you want to see the top N most popular users or web pages. In this section you’ll look at two particular scenarios where you want to tweak the behavior of MapReduce sorting. First we’ll look at the secondary sort, which allows you to sort values for a reducer key. Secondary sorts are useful when you want some data to arrive at your reducer ahead of other data, as in the case of the optimized repartition join earlier in this chapter. Secondary sorts are also useful if you want your 7 See http://pages.cs.wisc.edu/jignesh/publ/hadoopjoin.pdf. 156 CHAPTER 4 Applying MapReduce patterns to big data job output to be sorted by a secondary key. An example of this would be if you want to perform a primary sort of stock data by stock symbol, and then perform a secondary sort on the time of each stock quote during a day. Secondary sorts are used in many of the techniques in this book, ranging from optimizing the repartition join to graph algorithms such as friends-of-friends. The second scenario we’ll cover in this section looks at sorting data across all the reducer outputs. This is useful in situations where you want to extract the top or bot- tom N elements from a dataset. 4.2.1 Secondary sort As you saw earlier in the joining section, you need secondary sorts to allow some records to arrive at a reducer ahead of other records. Secondary sorts require an understanding of both data arrangement and data flows in MapReduce. Figure 4.12 shows the three elements that impact data arrangement and flow (partitioning, sort- ing, and grouping) and how they’re integrated into MapReduce. The partitioner is invoked as part of the map output collection process, and is used to determine which reducer should receive the map output. The sorting RawComparator User space MapReduce space (map) Map function Collector Partitioner RawComparator Spill (sort) Local disk MapOutputServlet MapReduce space (reduce) Copy RawComparator Sort Local disk (sort) User space Reduce RawComparator Reduce function (group) The three aspects that aect sorting and grouping: Partitioner Determine what reducer will receive the record RawComparator (sort) Determines how records are sorted RawComparator (group) Determines how sorted records are logically grouped together for a single reducer function call Figure 4.12 An overview of where sorting, partitioning, and grouping occur in MapReduce TECHNIQUE 21 Implementing a secondary sort 157 is used to sort the map outputs within their respective partitions, and is used in both the map and reduce sides. Finally, the grouping RawComparator is responsible for deter- mining the group boundaries across the sorted records. The default behavior in MapReduce is for all three functions to operate on the entire output key emitted by map functions. TECHNIQUE 21 Implementing a secondary sort Secondary sorts are useful when you want some of the values for a unique map key to arrive at a reducer ahead of other values. I’ll show the value of secondary sorting in other techniques in this book, such as the optimized repartition join, and the friends- of-friends algorithm in chapter 7. Problem You want to order values sent to a single reducer invocation for a natural key. Solution This technique covers writing your partitioner, sort comparator, and grouping com- parator classes, which are required for secondary sort to work. Discussion In this technique we’ll look at how to use secondary sort to order people’s names. You’ll use the primary sort to order people’s last names, and secondary sort on their first names. To support secondary sort you need to create a composite output key, which will be emitted by your map functions. The composite key will contain two parts: 1 The natural key, which is the key to use for joining purposes 2 The secondary key, which is the key to use to order all of the values sent to the reducer for the natural key Figure 4.13 shows the composite key for your user. It also shows a composite value that provides reducer-side access to the secondary key. The entire composite key is used for sorting. &RPSRVHNH\LWOHDXYVLWHPSR&R H/DVQDPW)LUVQDPHW HODXDOYWXU1DPHQD)LUVW “Natural key" “Secondary key" Provides reduce-side used for partitioning only used during access to secondary key. and grouping. sorting. Figure 4.13 The user composite key and value 158 CHAPTER 4 Applying MapReduce patterns to big data Let’s go through the partitioning, sorting, and grouping phases and implement them for your user. But before that, you need to write your composite key class. COMPOSITE KEY The composite key contains both the first and last name. It extendsWritableComparable, 8 which is recommended forWritable classes that are emitted as keys from map functions: public class Person implements WritableComparablePerson private String firstName; private String lastName; Override public void readFields(DataInput in) throws IOException this.firstName = in.readUTF(); this.lastName = in.readUTF(); Override public void write(DataOutput out) throws IOException out.writeUTF(firstName); out.writeUTF(lastName); ... Figure 4.14 shows the configuration names and methods to call to set the partitioning, sorting, and grouping classes, and it also shows what part of the composite key each class uses. Let’s look at the implementation code for each of these classes. UWL3DWLRQ6RUWURXS Partitioner RawComparator RawComparator FODVVPDSUHGSDUWLWLRQHUFODVVFRPSDUDWRUNPDSUHGRXWH\SXWPDSUHGRYXWSXDOXHWJURXSIQFODVV a.k.a. a.k.a. a.k.a. JobConf.setPartitionerClass JobConf.setOutputKeyComparatorClass JobConf.setOutputValueGroupingComparator H\&RPSRVLWHNSRVLWH&RPHN\&RWHPSRVLNH\ 1DWXUDONH\6HFRQG\NH\DU1DWXUDONH\6HFRQGDU\NH\1DWXUDONH\6HFRQGDU\NH\ Only use the natural key for The output key comparator sorts The output value grouping compares the using the entire composite key. partitioning, so that they all natural key, ignoring the secondary go to the same reducer. sort key. Figure 4.14 Partitioning, sorting, and grouping settings and key utilization 8 GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/ manning/hip/ch4/sort/secondary/Person.java

Advise: Why You Wasting Money in Costly SEO Tools, Use World's Best Free SEO Tool Ubersuggest.