Mapreduce types and Formats in Hadoop

mapreduce inputformat types and explain mapreduce types and format and different mapreduce examples and elastic mapreduce instance types
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Comment
MapReduce Types and Formats MapReduce has a simple model of data processing: inputs and outputs for the map and reduce functions are key-value pairs. This chapter looks at the MapReduce model in detail and, in particular, how data in various formats, from simple text to structured binary objects, can be used with this model. MapReduce Types The map and reduce functions in Hadoop MapReduce have the following general form: map: (K1, V1) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3) In general, the map input key and value types (K1 and V1) are different from the map output types (K2 and V2). However, the reduce input must have the same types as the map output, although the reduce output types may be different again (K3 and V3). The Java API mirrors this general form: public class MapperKEYIN, VALUEIN, KEYOUT, VALUEOUT public class Context extends MapContextKEYIN, VALUEIN, KEYOUT, VALUEOUT // ... protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException // ... public class ReducerKEYIN, VALUEIN, KEYOUT, VALUEOUT public class Context extends ReducerContextKEYIN, VALUEIN, KEYOUT, VALUEOUT // ... protected void reduce(KEYIN key, IterableVALUEIN values, Context context 221 Context context) throws IOException, InterruptedException // ... The context objects are used for emitting key-value pairs, so they are parameterized by the output types, so that the signature of the write() method is: public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException Since Mapper and Reducer are separate classes the type parameters have different scopes, and the actual type argument of KEYIN (say) in the Mapper may be different to the type of the type parameter of the same name (KEYIN) in the Reducer. For instance, in the maximum temparature example from earlier chapters, KEYIN is replaced by LongWrita ble for the Mapper, and by Text for the Reducer. Similarly, even though the map output types and the reduce input types must match, this is not enforced by the Java compiler. The type parameters are named differently to the abstract types (KEYIN versus K1, and so on), but the form is the same. If a combine function is used, then it is the same form as the reduce function (and is an implementation of Reducer), except its output types are the intermediate key and value types (K2 and V2), so they can feed the reduce function: map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3) Often the combine and reduce functions are the same, in which case, K3 is the same as K2, and V3 is the same as V2. The partition function operates on the intermediate key and value types (K2 and V2), and returns the partition index. In practice, the partition is determined solely by the key (the value is ignored): partition: (K2, V2) → integer Or in Java: public abstract class PartitionerKEY, VALUE public abstract int getPartition(KEY key, VALUE value, int numPartitions); MapReduce signatures in the old API In the old API the signatures are very similar, and actually name the type parameters K1, V1, and so on, although the constraints on the types are exactly the same in both old and new APIs. public interface MapperK1, V1, K2, V2 extends JobConfigurable, Closeable 222 Chapter 7: MapReduce Types and Formats void map(K1 key, V1 value, OutputCollectorK2, V2 output, Reporter reporter) throws IOException; public interface ReducerK2, V2, K3, V3 extends JobConfigurable, Closeable void reduce(K2 key, IteratorV2 values, OutputCollectorK3, V3 output, Reporter reporter) throws IOException; public interface PartitionerK2, V2 extends JobConfigurable int getPartition(K2 key, V2 value, int numPartitions); So much for the theory, how does this help configure MapReduce jobs? Table 7-1 summarizes the configuration options for the new API (and Table 7-2 does the same for the old API). It is divided into the properties that determine the types and those that have to be compatible with the configured types. Input types are set by the input format. So, for instance, a TextInputFormat generates keys of type LongWritable and values of type Text. The other types are set explicitly by calling the methods on the Job (or JobConf in the old API). If not set explicitly, the intermediate types default to the (final) output types, which default to LongWritable and Text. So if K2 and K3 are the same, you don’t need to call setMapOutputKeyClass(), since it falls back to the type set by calling setOutputKeyClass(). Similarly, if V2 and V3 are the same, you only need to use setOutputValueClass(). It may seem strange that these methods for setting the intermediate and final output types exist at all. After all, why can’t the types be determined from a combination of the mapper and the reducer? The answer is that it’s to do with a limitation in Java generics: type erasure means that the type information isn’t always present at runtime, so Hadoop has to be given it explicitly. This also means that it’s possible to configure a MapReduce job with incompatible types, because the configuration isn’t checked at compile time. The settings that have to be compatible with the MapReduce types are listed in the lower part of Table 7-1. Type conflicts are detected at runtime during job execution, and for this reason, it is wise to run a test job using a small amount of data to flush out and fix any type incompatibilities. MapReduce Types 223224 Chapter 7: MapReduce Types and Formats Table 7-1. Configuration of MapReduce types in the new API Property Job setter method Input types Intermediate types Output types K1 V1 K2 V2 K3 V3 Properties for configuring types: mapreduce.job.inputformat.class setInputFormatClass() • • mapreduce.map.output.key.class setMapOutputKeyClass() • mapreduce.map.output.value.class setMapOutputValueClass() • mapreduce.job.output.key.class setOutputKeyClass() • mapreduce.job.output.value.class setOutputValueClass() • Properties that must be consistent with the types: mapreduce.job.map.class setMapperClass() • • • • mapreduce.job.combine.class setCombinerClass() • • mapreduce.job.partitioner.class setPartitionerClass() • • mapreduce.job.output.key.comparator.class setSortComparatorClass() • mapreduce.job.output.group.comparator.class setGroupingComparatorClass() • mapreduce.job.reduce.class setReducerClass() • • • • mapreduce.job.outputformat.class setOutputFormatClass() • •MapReduce Types 225 Table 7-2. Configuration of MapReduce types in the old API Property JobConf setter method Input types Intermediate types Output types K1 V1 K2 V2 K3 V3 Properties for configuring types: mapred.input.format.class setInputFormat() • • mapred.mapoutput.key.class setMapOutputKeyClass() • mapred.mapoutput.value.class setMapOutputValueClass() • mapred.output.key.class setOutputKeyClass() • mapred.output.value.class setOutputValueClass() • Properties that must be consistent with the types: mapred.mapper.class setMapperClass() • • • • mapred.map.runner.class setMapRunnerClass() • • • • mapred.combiner.class setCombinerClass() • • mapred.partitioner.class setPartitionerClass() • • mapred.output.key.comparator.class setOutputKeyComparatorClass() • mapred.output.value.groupfn.class setOutputValueGroupingComparator() • mapred.reducer.class setReducerClass() • • • • mapred.output.format.class setOutputFormat() • •The Default MapReduce Job What happens when you run MapReduce without setting a mapper or a reducer? Let’s try it by running this minimal MapReduce program: public class MinimalMapReduce extends Configured implements Tool Override public int run(String args) throws Exception if (args.length = 2) System.err.printf("Usage: %s generic options input output\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; Job job = new Job(getConf()); job.setJarByClass(getClass()); FileInputFormat.addInputPath(job, new Path(args0)); FileOutputFormat.setOutputPath(job, new Path(args1)); return job.waitForCompletion(true) ? 0 : 1; public static void main(String args) throws Exception int exitCode = ToolRunner.run(new MinimalMapReduce(), args); System.exit(exitCode); The only configuration that we set is an input path and an output path. We run it over a subset of our weather data with the following: % hadoop MinimalMapReduce "input/ncdc/all/1901,2.gz" output We do get some output: one file named part-r-00000 in the output directory. Here’s what the first few lines look like (truncated to fit the page): 0→0029029070999991901010106004+64333+023450FM-12+000599999V0202701N01591... 0→0035029070999991902010106004+64333+023450FM-12+000599999V0201401N01181... 135→0029029070999991901010113004+64333+023450FM-12+000599999V0202901N00821... 141→0035029070999991902010113004+64333+023450FM-12+000599999V0201401N01181... 270→0029029070999991901010120004+64333+023450FM-12+000599999V0209991C00001... 282→0035029070999991902010120004+64333+023450FM-12+000599999V0201401N01391... Each line is an integer followed by a tab character, followed by the original weather data record. Admittedly, it’s not a very useful program, but understanding how it pro- duces its output does provide some insight into the defaults that Hadoop uses when running MapReduce jobs. Example 7-1 shows a program that has exactly the same effect as MinimalMapReduce, but explicitly sets the job settings to their defaults. 226 Chapter 7: MapReduce Types and FormatsExample 7-1. A minimal MapReduce driver, with the defaults explicitly set public class MinimalMapReduceWithDefaults extends Configured implements Tool Override public int run(String args) throws Exception Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) return -1; job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(Mapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); job.setReducerClass(Reducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); return job.waitForCompletion(true) ? 0 : 1; public static void main(String args) throws Exception int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args); System.exit(exitCode); We’ve simplified the first few lines of the run() method, by extracting the logic for printing usage and setting the input and output paths into a helper method. Almost all MapReduce drivers take these two arguments (input and output), so reducing the boilerplate code here is a good thing. Here are the relevant methods in the JobBuilder class for reference: public static Job parseInputAndOutput(Tool tool, Configuration conf, String args) throws IOException if (args.length = 2) printUsage(tool, "input output"); return null; Job job = new Job(conf); job.setJarByClass(tool.getClass()); FileInputFormat.addInputPath(job, new Path(args0)); FileOutputFormat.setOutputPath(job, new Path(args1)); return job; MapReduce Types 227 Download from Wow eBook www.wowebook.com public static void printUsage(Tool tool, String extraArgsUsage) System.err.printf("Usage: %s genericOptions %s\n\n", tool.getClass().getSimpleName(), extraArgsUsage); GenericOptionsParser.printGenericCommandUsage(System.err); Going back to MinimalMapReduceWithDefaults in Example 7-1, although there are many other default job settings, the ones highlighted are those most central to running a job. Let’s go through them in turn. The default input format is TextInputFormat, which produces keys of type LongWrita ble (the offset of the beginning of the line in the file) and values of type Text (the line of text). This explains where the integers in the final output come from: they are the line offsets. The default mapper is just the Mapper class, which writes the input key and value un- changed to the output: public class MapperKEYIN, VALUEIN, KEYOUT, VALUEOUT protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException context.write((KEYOUT) key, (VALUEOUT) value); Mapper is a generic type, which allows it to work with any key or value types. In this case, the map input and output key is of type LongWritable and the map input and output value is of type Text. The default partitioner is HashPartitioner, which hashes a record’s key to determine which partition the record belongs in. Each partition is processed by a reduce task, so the number of partitions is equal to the number of reduce tasks for the job: public class HashPartitionerK, V extends PartitionerK, V public int getPartition(K key, V value, int numReduceTasks) return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; The key’s hash code is turned into a nonnegative integer by bitwise ANDing it with the largest integer value. It is then reduced modulo the number of partitions to find the index of the partition that the record belongs in. By default, there is a single reducer, and therefore a single partition, so the action of the partitioner is irrelevant in this case since everything goes into one partition. How- ever, it is important to understand the behavior of HashPartitioner when you have more than one reduce task. Assuming the key’s hash function is a good one, the records 228 Chapter 7: MapReduce Types and Formatswill be evenly allocated across reduce tasks, with all records sharing the same key being processed by the same reduce task. You may have noticed that we didn’t set the number of map tasks. The reason for this is that the number is equal to the number of splits that the input is turned into, which is driven by size of the input, and the file’s block size (if the file is in HDFS). The options for controlling split size are discussed in “FileInputFormat input splits” on page 236. Choosing the Number of Reducers The single reducer default is something of a gotcha for new users to Hadoop. Almost all real-world jobs should set this to a larger number; otherwise, the job will be very slow since all the intermediate data flows through a single reduce task. (Note that when running under the local job runner, only zero or one reducers are supported.) The optimal number of reducers is related to the total number of available reducer slots in your cluster. The total number of slots is found by multiplying the number of nodes in the cluster and the number of slots per node (which is determined by the value of the mapred.tasktracker.reduce.tasks.maximum property, described in “Environment Settings” on page 305). One common setting is to have slightly fewer reducers than total slots, which gives one wave of reduce tasks (and tolerates a few failures, without extending job execution time). If your reduce tasks are very big, then it makes sense to have a larger number of reducers (resulting in two waves, for example) so that the tasks are more fine-grained, and failure doesn’t affect job execution time significantly. The default reducer is Reducer, again a generic type, which simply writes all its input to its output: public class ReducerKEYIN, VALUEIN, KEYOUT, VALUEOUT protected void reduce(KEYIN key, IterableVALUEIN values, Context context Context context) throws IOException, InterruptedException for (VALUEIN value: values) context.write((KEYOUT) key, (VALUEOUT) value); For this job, the output key is LongWritable, and the output value is Text. In fact, all the keys for this MapReduce program are LongWritable, and all the values are Text, since these are the input keys and values, and the map and reduce functions are both identity functions which by definition preserve type. Most MapReduce programs, however, don’t use the same key or value types throughout, so you need to configure the job to declare the types you are using, as described in the previous section. Records are sorted by the MapReduce system before being presented to the reducer. In this case, the keys are sorted numerically, which has the effect of interleaving the lines from the input files into one combined output file. MapReduce Types 229The default output format is TextOutputFormat, which writes out records, one per line, by converting keys and values to strings and separating them with a tab character. This is why the output is tab-separated: it is a feature of TextOutputFormat. The default Streaming job In Streaming, the default job is similar, but not identical, to the Java equivalent. The minimal form is: % hadoop jar HADOOP_INSTALL/contrib/streaming/hadoopstreaming.jar \ -input input/ncdc/sample.txt \ -output output \ -mapper /bin/cat Notice that you have to supply a mapper: the default identity mapper will not work. The reason has to do with the default input format, TextInputFormat, which generates LongWritable keys and Text values. However, Streaming output keys and values (in- 1 cluding the map keys and values) are always both of type Text. The identity mapper cannot change LongWritable keys to Text keys, so it fails. When we specify a non-Java mapper, and the input format is TextInputFormat, Stream- ing does something special. It doesn’t pass the key to the mapper process, it just passes the value. (For other input formats the same effect can be achieved by setting stream.map.input.ignoreKey to true.) This is actually very useful, since the key is just the line offset in the file, and the value is the line, which is all most applications are interested in. The overall effect of this job is to perform a sort of the input. With more of the defaults spelled out, the command looks like this: % hadoop jar HADOOP_INSTALL/contrib/streaming/hadoopstreaming.jar \ -input input/ncdc/sample.txt \ -output output \ -inputformat org.apache.hadoop.mapred.TextInputFormat \ -mapper /bin/cat \ -partitioner org.apache.hadoop.mapred.lib.HashPartitioner \ -numReduceTasks 1 \ -reducer org.apache.hadoop.mapred.lib.IdentityReducer \ -outputformat org.apache.hadoop.mapred.TextOutputFormat The mapper and reducer arguments take a command or a Java class. A combiner may optionally be specified, using the -combiner argument. Keys and values in Streaming A Streaming application can control the separator that is used when a key-value pair is turned into a series of bytes and sent to the map or reduce process over standard input. The default is a tab character, but it is useful to be able to change it in the case that the keys or values themselves contain tab characters. 1. Except when used in binary mode, from version 0.21.0 onward, via the -io rawbytes or -io typedbytes options. Text mode (-io text) is the default. 230 Chapter 7: MapReduce Types and FormatsSimilarly, when the map or reduce writes out key-value pairs, they may be separated by a configurable separator. Furthermore, the key from the output can be composed of more than the first field: it can be made up of the first n fields (defined by stream.num.map.output.key.fields or stream.num.reduce.output.key.fields), with the value being the remaining fields. For example, if the output from a Streaming pro- cess was a,b,c (and the separator is a comma), and n is two, then the key would be parsed as a,b and the value as c. Separators may be configured independently for maps and reduces. The properties are listed in Table 7-3 and shown in a diagram of the data flow path in Figure 7-1. These settings do not have any bearing on the input and output formats. For example, if stream.reduce.output.field.separator were set to be a colon, say, and the reduce stream process wrote the line a:b to standard out, then the Streaming reducer would know to extract the key as a and the value as b. With the standard TextOutputFormat, this record would be written to the output file with a tab separating a and b. You can change the separator that TextOutputFormat uses by setting mapred.textoutputfor mat.separator. A list of Streaming configuration parameters can be found on the Hadoop website at http://hadoop.apache.org/mapreduce/docs/current/streaming.htmlConfigurable+pa rameters. Table 7-3. Streaming separator properties Property name Type Default value Description stream.map.input.field. String \t The separator to use when passing the input key and separator value strings to the stream map process as a stream of bytes. stream.map.output.field. String \t The separator to use when splitting the output from the separator stream map process into key and value strings for the map output. stream.num.map. int 1 The number of fields separated by output.key.fields stream.map.output.field.separator to treat as the map output key. stream.reduce.input.field. String \t The separator to use when passing the input key and separator value strings to the stream reduce process as a stream of bytes. stream.reduce. String \t The separator to use when splitting the output from the output.field. stream reduce process into key and value strings for the separator final reduce output. stream.num.reduce. int 1 The number of fields separated by output.key.fields stream.reduce.output.field.separator to treat as the reduce output key. MapReduce Types 231Figure 7-1. Where separators are used in a Streaming MapReduce job Input Formats Hadoop can process many different types of data formats, from flat text files to data- bases. In this section, we explore the different formats available. Input Splits and Records As we saw in Chapter 2, an input split is a chunk of the input that is processed by a single map. Each map processes a single split. Each split is divided into records, and the map processes each record—a key-value pair—in turn. Splits and records are log- ical: there is nothing that requires them to be tied to files, for example, although in their most common incarnations, they are. In a database context, a split might correspond to a range of rows from a table and a record to a row in that range (this is precisely what DBInputFormat does, an input format for reading data from a relational database). Input splits are represented by the Java class, InputSplit (which, like all of the classes 2 mentioned in this section, is in the org.apache.hadoop.mapreduce package ): public abstract class InputSplit public abstract long getLength() throws IOException, InterruptedException; public abstract String getLocations() throws IOException, InterruptedException; An InputSplit has a length in bytes and a set of storage locations, which are just host- name strings. Notice that a split doesn’t contain the input data; it is just a reference to the data. The storage locations are used by the MapReduce system to place map tasks as close to the split’s data as possible, and the size is used to order the splits so that the largest get processed first, in an attempt to minimize the job runtime (this is an instance of a greedy approximation algorithm). 2. But see the classes in org.apache.hadoop.mapred for the old MapReduce API counterparts. 232 Chapter 7: MapReduce Types and FormatsAs a MapReduce application writer, you don’t need to deal with InputSplits directly, as they are created by an InputFormat. An InputFormat is responsible for creating the input splits and dividing them into records. Before we see some concrete examples of InputFormat, let’s briefly examine how it is used in MapReduce. Here’s the interface: public abstract class InputFormatK, V public abstract ListInputSplit getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReaderK, V createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; The client running the job calculates the splits for the job by calling getSplits(), then sends them to the jobtracker, which uses their storage locations to schedule map tasks to process them on the tasktrackers. On a tasktracker, the map task passes the split to the createRecordReader() method on InputFormat to obtain a RecordReader for that split. A RecordReader is little more than an iterator over records, and the map task uses one to generate record key-value pairs, which it passes to the map function. We can see this by looking at the Mapper’s run() method: public void run(Context context) throws IOException, InterruptedException setup(context); while (context.nextKeyValue()) map(context.getCurrentKey(), context.getCurrentValue(), context); cleanup(context); After running setup(), the nextKeyValue() is called repeatedly on the Context, (which delegates to the identically-named method on the the RecordReader) to populate the key and value objects for the mapper. The key and value are retrieved from the Record Reader by way of the Context, and passed to the map() method for it to do its work. When the reader gets to the end of the stream, the nextKeyValue() method returns false, and the map task runs its cleanup() method, then completes. Input Formats 233It’s not shown in the code snippet, but for reasons of efficiency Record Reader implementations will return the same key and value objects on each call to getCurrentKey() and getCurrentValue(). Only the contents of these objects are changed by the reader’s nextKeyValue() method. This can be a surprise to users, who might expect keys and values to be immutable, and not to be reused. This causes problems when a reference to a key or value object is retained outside the map() method, as its value can change without warning. If you need to do this, make a copy of the object you want to hold on to. For example, for a Text object, you can use its copy constructor: new Text(value). The situation is similar with reducers. In this case, the value objects in the reducer’s iterator are reused, so you need to copy any that you need to retain between calls to the iterator (see Example 8-14). Finally, note that the Mapper’s run() method is public, and may be customized by users. mappers. MultithreadedMapper is an implementation that runs mappers concurrently in a configurable number of threads (set by mapreduce.mapper.multithreadedmap per.threads). For most data processing tasks, it confers no advantage over the default implementation. However, for mappers that spend a long time processing each record, because they contact external servers, for example, it allows multiple mappers to run in one JVM with little contention. See “Fetcher: A multithreaded MapRunner in ac- tion” on page 575 for an example of an application that uses the multi-threaded version (using the old API). FileInputFormat FileInputFormat is the base class for all implementations of InputFormat that use files as their data source (see Figure 7-2). It provides two things: a place to define which files are included as the input to a job, and an implementation for generating splits for the input files. The job of dividing splits into records is performed by subclasses. FileInputFormat input paths The input to a job is specified as a collection of paths, which offers great flexibility in constraining the input to a job. FileInputFormat offers four static convenience methods for setting a Job’s input paths: public static void addInputPath(Job job, Path path) public static void addInputPaths(Job job, String commaSeparatedPaths) public static void setInputPaths(Job job, Path... inputPaths) public static void setInputPaths(Job job, String commaSeparatedPaths) The addInputPath() and addInputPaths() methods add a path or paths to the list of inputs. You can call these methods repeatedly to build the list of paths. The setInput Paths() methods set the entire list of paths in one go (replacing any paths set on the Job in previous calls). 234 Chapter 7: MapReduce Types and FormatsFigure 7-2. InputFormat class hierarchy A path may represent a file, a directory, or, by using a glob, a collection of files and directories. A path representing a directory includes all the files in the directory as input to the job. See “File patterns” on page 67 for more on using globs. The contents of a directory specified as an input path are not processed recursively. In fact, the directory should only contain files: if the direc- tory contains a subdirectory, it will be interpreted as a file, which will cause an error. The way to handle this case is to use a file glob or a filter to select only the files in the directory based on a name pattern. Alter- natively, set mapred.input.dir.recursive to true to force the input di- rectory to be read recursively. The add and set methods allow files to be specified by inclusion only. To exclude certain files from the input, you can set a filter using the setInputPathFilter() method on FileInputFormat: public static void setInputPathFilter(Job job, Class? extends PathFilter filter) Filters are discussed in more detail in “PathFilter” on page 68. Input Formats 235Even if you don’t set a filter, FileInputFormat uses a default filter that excludes hidden files (those whose names begin with a dot or an underscore). If you set a filter by calling setInputPathFilter(), it acts in addition to the default filter. In other words, only non- hidden files that are accepted by your filter get through. Paths and filters can be set through configuration properties, too (Table 7-4), which can be handy for Streaming and Pipes. Setting paths is done with the -input option for both Streaming and Pipes interfaces, so setting paths directly is not usually needed. Table 7-4. Input path and filter properties Property name Type Default value Description mapred.input.dir comma-separated paths none The input files for a job. Paths that contain commas should have those commas escaped by a backslash character. For example, the glob a,b would be escaped as a\,b. mapred.input. PathFilter none The filter to apply to the input files for a job. pathFilter.class classname FileInputFormat input splits Given a set of files, how does FileInputFormat turn them into splits? FileInputFormat splits only large files. Here “large” means larger than an HDFS block. The split size is normally the size of an HDFS block, which is appropriate for most applications; how- ever, it is possible to control this value by setting various Hadoop properties, as shown in Table 7-5. Table 7-5. Properties for controlling split size Property name Type Default value Description mapred.min.split.size int 1 The smallest valid size in bytes for a file split. a mapred.max.split.size long Long.MAX_VALUE, that is The largest valid size in 9223372036854775807 bytes for a file split. dfs.block.size long 64 MB, that is 67108864 The size of a block in HDFS in bytes. a This property is not present in the old MapReduce API (with the exception of CombineFileInputFormat). Instead, it is calculated indirectly as the size of the total input for the job, divided by the guide number of map tasks specified by mapred.map.tasks (or the setNumMapTasks() method on JobConf). Because mapred.map.tasks defaults to 1, this makes the maximum split size the size of the input. The minimum split size is usually 1 byte, although some formats have a lower bound on the split size. (For example, sequence files insert sync entries every so often in the stream, so the minimum split size has to be large enough to ensure that every split has a sync point to allow the reader to resynchronize with a record boundary.) 236 Chapter 7: MapReduce Types and FormatsApplications may impose a minimum split size: by setting this to a value larger than the block size, they can force splits to be larger than a block. There is no good reason for doing this when using HDFS, since doing so will increase the number of blocks that are not local to a map task. The maximum split size defaults to the maximum value that can be represented by a Java long type. It has an effect only when it is less than the block size, forcing splits to be smaller than a block. The split size is calculated by the formula (see the computeSplitSize() method in FileInputFormat): max(minimumSize, min(maximumSize, blockSize)) by default: minimumSize blockSize maximumSize so the split size is blockSize. Various settings for these parameters and how they affect the final split size are illustrated in Table 7-6. Table 7-6. Examples of how to control the split size Minimum split size Maximum split size Block size Split size Comment 1 (default) Long.MAX_VALUE 64 MB (default) 64 MB By default, split size is the same as the (default) default block size. 1 (default) Long.MAX_VALUE 128 MB 128 MB The most natural way to increase the (default) split size is to have larger blocks in HDFS, by setting dfs.block size, or on a per-file basis at file construction time. 128 MB Long.MAX_VALUE 64 MB (default) 128 MB Making the minimum split size (default) greater than the block size increases the split size, but at the cost of locality. 1 (default) 32 MB 64 MB (default) 32 MB Making the maximum split size less than the block size decreases the split size. Small files and CombineFileInputFormat Hadoop works better with a small number of large files than a large number of small files. One reason for this is that FileInputFormat generates splits in such a way that each split is all or part of a single file. If the file is very small (“small” means significantly smaller than an HDFS block) and there are a lot of them, then each map task will process very little input, and there will be a lot of them (one per file), each of which imposes extra bookkeeping overhead. Compare a 1 GB file broken into sixteen 64 MB blocks, and 10,000 or so 100 KB files. The 10,000 files use one map each, and the job time can be tens or hundreds of times slower than the equivalent one with a single input file and 16 map tasks. Input Formats 237The situation is alleviated somewhat by CombineFileInputFormat, which was designed to work well with small files. Where FileInputFormat creates a split per file, CombineFileInputFormat packs many files into each split so that each mapper has more to process. Crucially, CombineFileInputFormat takes node and rack locality into account when deciding which blocks to place in the same split, so it does not compromise the speed at which it can process the input in a typical MapReduce job. Of course, if possible, it is still a good idea to avoid the many small files case since MapReduce works best when it can operate at the transfer rate of the disks in the cluster, and processing many small files increases the number of seeks that are needed to run a job. Also, storing large numbers of small files in HDFS is wasteful of the namenode’s memory. One technique for avoiding the many small files case is to merge small files into larger files by using a SequenceFile: the keys can act as filenames (or a constant such as NullWritable, if not needed) and the values as file contents. See Example 7-4. But if you already have a large number of small files in HDFS, then CombineFileInput Format is worth trying. CombineFileInputFormat isn’t just good for small files—it can bring ben- efits when processing large files, too. Essentially, CombineFileInputFor mat decouples the amount of data that a mapper consumes from the block size of the files in HDFS. If your mappers can process each block in a matter of seconds, then you could use CombineFileInputFormat with the maximum split size set to a small multiple of the number of blocks (by setting the mapred.max.split.size property in bytes) so that each mapper processes more than one block. In return, the overall processing time falls, since proportionally fewer mappers run, which reduces the overhead in task bookkeeping and startup time associated with a large number of short- lived mappers. Since CombineFileInputFormat is an abstract class without any concrete classes (unlike FileInputFormat), you need to do a bit more work to use it. (Hopefully, common im- plementations will be added to the library over time.) For example, to have the CombineFileInputFormat equivalent of TextInputFormat, you would create a concrete subclass of CombineFileInputFormat and implement the getRecordReader() method. 238 Chapter 7: MapReduce Types and FormatsPreventing splitting Some applications don’t want files to be split, so that a single mapper can process each input file in its entirety. For example, a simple way to check if all the records in a file are sorted is to go through the records in order, checking whether each record is not less than the preceding one. Implemented as a map task, this algorithm will work only 3 if one map processes the whole file. There are a couple of ways to ensure that an existing file is not split. The first (quick and dirty) way is to increase the minimum split size to be larger than the largest file in your system. Setting it to its maximum value, Long.MAX_VALUE, has this effect. The sec- ond is to subclass the concrete subclass of FileInputFormat that you want to use, to 4 override the isSplitable() method to return false. For example, here’s a nonsplittable TextInputFormat: import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class NonSplittableTextInputFormat extends TextInputFormat Override protected boolean isSplitable(JobContext context, Path file) return false; File information in the mapper A mapper processing a file input split can find information about the split by calling the getInputSplit() method on the Mapper’s Context object. When the input format derives from FileInputFormat, the InputSplit returned by this method can be cast to a FileSplit to access the file information listed in Table 7-7. In the old MapReduce API, Streaming, and Pipes, the same file split information is made available through properties which can be read from the mapper’s configuration. (In the old MapReduce API this is achieved by implementing configure() in your Mapper implementation to get access to the JobConf object.) In addition to the properties in Table 7-7 all mappers and reducers have access to the properties listed in “The Task Execution Environment” on page 212. Table 7-7. File split properties FileSplit method Property name Type Description getPath() map.input.file Path/String The path of the input file being processed 3. This is how the mapper in SortValidator.RecordStatsChecker is implemented. 4. In the method name isSplitable(), “splitable” has a single “t.” It is usually spelled “splittable,” which is the spelling I have used in this book. Input Formats 239FileSplit method Property name Type Description getStart() map.input.start long The byte offset of the start of the split from the beginning of the file getLength() map.input.length long The length of the split in bytes In the next section, you shall see how to use a FileSplit when we need to access the split’s filename. Processing a whole file as a record A related requirement that sometimes crops up is for mappers to have access to the full contents of a file. Not splitting the file gets you part of the way there, but you also need to have a RecordReader that delivers the file contents as the value of the record. The listing for WholeFileInputFormat in Example 7-2 shows a way of doing this. Example 7-2. An InputFormat for reading a whole file as a record public class WholeFileInputFormat extends FileInputFormatNullWritable, BytesWritable Override protected boolean isSplitable(JobContext context, Path file) return false; Override public RecordReaderNullWritable, BytesWritable createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; WholeFileInputFormat defines a format where the keys are not used, represented by NullWritable, and the values are the file contents, represented by BytesWritable in- stances. It defines two methods. First, the format is careful to specify that input files should never be split, by overriding isSplitable() to return false. Second, we implement createRecordReader() to return a custom implementation of Record Reader, which appears in Example 7-3. Example 7-3. The RecordReader used by WholeFileInputFormat for reading a whole file as a record class WholeFileRecordReader extends RecordReaderNullWritable, BytesWritable private FileSplit fileSplit; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; 240 Chapter 7: MapReduce Types and Formats