XML Data Serialization formats and Methods

data serialization formats and what is XML data serialization language and data serialization methods and data serialization in hadoop
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Data serialization— working with text This chapter covers ■ Working with text, XML, and JSON ■ Understanding SequenceFiles, Avro, and Protocol Buffers ■ Working with custom data formats MapReduce offers straightforward, well-documented support for working with sim- ple data formats such as log files. But the use of MapReduce has evolved beyond log files to more sophisticated data serialization formats—such as text, XML, and JSON—to the point that its documentation and built-in support runs dry. The goal of this chapter is to document how to work with common data serialization formats, as well as to examine more structured serialization formats and compare their fit- ness for use with MapReduce. 83 84 CHAPTER 3 Data serialization—working with text and beyond Imagine that you want to work with the ubiquitous data serialization formats XML and JSON. These formats work in a straightforward manner in most programming lan- guages, with several tools available to help you with marshalling, unmarshalling, and validating where applicable. Working with XML and JSON in MapReduce, however, poses two equally important challenges. First, though MapReduce requires classes that can support reading and writing a particular data serialization format, there’s a good chance it doesn’t have such classes to support the serialization format you’re working with. Second, MapReduce’s power lies in its ability to parallelize reading your input data. If your input files are large (think hundreds of megabytes or more), it’s crucial that the classes reading your serialization format be able to split your large files so multiple map tasks can read them in parallel. We’ll start this chapter by tackling the problem of how to work with serialization formats such as XML and JSON. Then we’ll compare and contrast data serialization formats that are better suited to working with big data. I’ll also show how to use these serialization formats with MapReduce. The final hurdle is when you need to work with a file format that’s proprietary, or a less common file format for which no read/write bindings exist in MapReduce. I’ll show you how to write your own classes to read/ write your file format. Data serialization support in MapReduce is a property of the input and output classes that read and write MapReduce data. Let’s start with an overview of how MapReduce supports data input and output. This chapter assumes you’re familiar with XML and JSON data formats. Wikipedia provides some good background articles on XML and JSON, if needed. You should also have some experience writing MapReduce programs and understand the basic con- cepts of HDFS and MapReduce input and output. Chuck Lam’s book, Hadoop in Action from Manning, represents a good resource on this topic. 3.1 Understanding inputs and outputs in MapReduce Your data might be XML files sitting behind a number of FTP servers, text log files sit- 1 ting on a central web server, or Lucene indexes in HDFS. How does MapReduce sup- port reading and writing to these different serialization structures across the various storage mechanisms? You’ll need to know the answer in order to support a specific serialization format. Figure 3.1 shows the high-level data flows through MapReduce and identifies the actors responsible for various parts of the flow. On the input side you see that some work (Create split) is performed outside of the map phase, and other work is per- formed as part of the map phase (Read split). All of the output work is performed in the reduce phase (Write output). 1 Apache Lucene is an information retrieval project that stores data in an inverted index data structure opti- mized for full-text search. More information is available at http://lucene.apache.org/.Understanding inputs and outputs in MapReduce 85 The partitioner's job is to logically funnel map outputs to the reducers. Input Output RecordReader. InputFormat.getSplits Mapper.map Partitioner.getPartition Reducer.reduce RecordWriter.write nextKeyValue Map phase Reduce phase k,v k,list(v) Create Write Read split Map Partition Reduce split output The map and reduce functions are typically written by the user to The InputFormat and address a specific use case. The RecordWriter writes the RecordReader are responsible for reduce output to the destination determining what data to feed data sink, which is the final into the map function. resting place of this MapReduce data flow. Figure 3.1 High-level input and output actors in MapReduce Figure 3.2 shows the same flow with a map-only job. In a map-only job the MapReduce framework still uses the OutputFormat and RecordWriter classes to write the outputs directly to the data sink. Let’s walk through the data flow and describe the responsibilities of the various actors. As we do this, we’ll also look at the relevant code from the built-in TextInput- Format and TextOutputFormat classes to better understand the concepts. The Text- InputFormat and TextOutputFormat classes read and write line-oriented text files. 3.1.1 Data input The two classes that support data input in MapReduce are InputFormat and Record- Reader. The InputFormat class is consulted to determine how the input data should be Input Output RecordReader. InputFormat.getSplits Mapper.map RecordWriter.write nextKeyValue Map phase Create Write Read split Map split output Figure 3.2 Input and output actors in MapReduce with no reducers 86 CHAPTER 3 Data serialization—working with text and beyond Type definitions for map input keys and values. ,QSXW)9RUPDW. Partition the input SOW/LVW L,QSXW6SOLWJHW6V -RE&RQWHWFRQWHW data into InputSplits. 9FUHDWHFRU5HFRUG5HDGHUG5HDGHU ,VSO.6SOH5QSXWLWLW 7DVNWWH PSW&RQWHWFRQWHW Figure 3.3 The annotated Create a RecordReader to InputFormat class and its read data from the job inputs. three contracts partitioned for the map tasks, and the RecordReader performs the reading of data from the inputs. INPUTFORMAT Every job in MapReduce must define its inputs according to contracts specified in the InputFormat abstract class. InputFormat implementers must fulfill three contracts: first, they describe type information for map input keys and values; next, they specify how the input data should be partitioned; and finally, they indicate the RecordReader instance that should read the data from source. Figure 3.3 shows the InputFormat class and how these three contracts are defined. Arguably the most crucial contract is that of determining how to divide the input data. In MapReduce nomenclature these divisions are referred to as input splits. The input splits directly impact the map parallelism because each split is processed by a single map task. Working with an InputFormat that is unable to create multiple input splits over a single data source (such as a file) will result in a slow map phase because the file will be processed sequentially. The TextInputFormat class (view source at http://goo.gl/VOMcJ) provides an imple- mentation of the InputFormat class’s createRecordReader method but delegates the cal- culation of input splits to its parent class, FileInputFormat. The following code shows the relevant parts of the TextInputFormat class: The parent class, FileInputFormat, provides public class TextInputFormat all of the input split extends FileInputFormatLongWritable, Text functionality. Override public RecordReaderLongWritable, Text createRecordReader(InputSplit split, TaskAttemptContext context) String delimiter = context.getConfiguration().get( The default record delimiter is newline, "textinputformat.record.delimiter"); but it can be overridden with byte recordDelimiterBytes = null; textinputformat.record.delimiter. if (null = delimiter) recordDelimiterBytes = delimiter.getBytes(); Understanding inputs and outputs in MapReduce 87 return new LineRecordReader(recordDelimiterBytes); Construct the RecordReader to read the data from the data source. ... The code in FileInputFormat (source at http://goo.gl/mQfq1) to determine the input splits is a little more complicated. A simplified form of the code is shown in the follow- ing example to portray the main elements of this method: The listStatus method public ListInputSplit getSplits(JobContext job determines all the input ) throws IOException files for the job. ListInputSplit splits = new ArrayListInputSplit(); ListFileStatusfiles = listStatus(job); for (FileStatus file: files) Path path = file.getPath(); Retrieve all of BlockLocation blkLocations = the file blocks. FileSystem.getFileBlockLocations(file, 0, length); long splitSize = file.getBlockSize(); The size of the splits is the same as the block size for the file. Each file can have while (splitsRemaining()) a different block size. splits.add(new FileSplit(path, ...)); return splits; Create a split for each file block and add it to the result. The following code is an example of how you specify the InputFormat to use for a MapReduce job: job.setInputFormatClass(TextInputFormat.class); RECORDREADER The RecordReader class is used by MapReduce in the map tasks to read data from an input split and provide each record in the form of a key/value pair for use by map- pers. A task is commonly created for each input split, and each task has a single RecordReader that’s responsible for reading the data for that input split. Figure 3.4 shows the abstract methods you must implement. As shown in a previous section, the TextInputFormat class created a LineRecordReader to read records from the input splits. The LineRecordReader directly extends the RecordReader class and leverages the LineReader class to read lines from the input split. The LineRecordReader uses the byte offset in the file for the map key and the contents of the line for the map value. I’ve included a simplified version of the LineRecordReader in the following example (source at http://goo.gl/iIS59): public class LineRecordReader extends RecordReaderLongWritable, Text private LineReader in; 88 CHAPTER 3 Data serialization—working with text and beyond private LongWritable key = new LongWritable(); private Text value = new Text(); public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException FileSplit split = (FileSplit) genericSplit; Open an InputStream to Seek to the start the input split file. // open the file and seek to the start of the split of the input split. FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); fileIn.seek(start); If you aren’t at the start of the file, you need to figure out in = new LineReader(fileIn, job); Create a new where to start reading the lines. The only way to do this is to LineReader that keep reading characters until you hit a newline, at which point if (notAtStartOfFile) can read lines from you’re ready to start supplying lines to the map. start += in.readLine(...); a stream. After the initialize method is called, the nextKeyValue method is called repeatedly by the MapReduce framework until such a public boolean nextKeyValue() throws IOException time as it returns false, which signifies the Set the byte end of the input split. key.set(pos); offset in the return in.readLine(value, ...) 0; file as the key. Read the next line into the value. If you’ve gone beyond the end of the input split, you return false. Because the LineReader class is easy, we’ll skip that code. The next step will be a look at how MapReduce supports data outputs. Type definitions for map Initialization, which could involve input keys and values. seeking into a file and determining the logical starting point of the next record. Reads the next record from RecordReaderKEYIN,VALUEIN file and returns a flag void initialize(InputSplit split, TaskAttemptContext context) indicating if the end of the split has been reached. boolean nextKeyValue() KEYIN getCurrentKey() Returns the current record's key. VALUEIN getCurrentValue() Returns the current record's value. void close() Returns the current progress of the reader. Closes any resources associated with the data source. Figure 3.4 The annotated RecordReader class and its abstract methodsUnderstanding inputs and outputs in MapReduce 89 Type definitions for reduce output keys and values. Create a RecordWriter OutputFormatK,V instance to write data RecordWriterK, V getRecordWriter(TaskAttemptContext context) to the destination. void checkOutputSpecs(JobContext context) Verify the output details OutputCommitter getOutputCommitter(TaskAttemptContext context) associated with the MapReduce job are correct. Get the associated OutputCommitter. OutputCommitters are responsible for “finalizing” the output after successful task and job completion. Figure 3.5 The annotated OutputFormat class 3.1.2 Data output MapReduce uses a similar process for supporting output data as it does for input data. Two classes must exist, an OutputFormat and a RecordWriter. The OutputFormat performs some basic validation of the data sink properties, and the RecordWriter writes each reducer output to the data sink. OUTPUTFORMAT Much like the InputFormat class, the OutputFormat class, as shown in figure 3.5, defines the contracts that implementers must fulfill, including checking the information related to the job output, providing a RecordWriter, and specifying an output commit- ter, which allows writes to be staged and then made “permanent” upon task and/or job success. Please refer to chapter 5 for more details on output committing. Just like the TextInputFormat, the TextOutputFormat also extends a base class, File- OutputFormat, which takes care of some complicated logistics such as output commit- ting, which we’ll cover further in this chapter. For now let’s take a look at the work the TextOutputFormat is performing (source at http://goo.gl/8ab7Z): public class TextOutputFormatK, V extends FileOutputFormatK, V public RecordWriterK, V getRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException boolean isCompressed = getCompressOutput(job); The default key/value separator is the tab character, but this can be changed with String keyValueSeparator= conf.get( Creates a unique the mapred.textoutputformat.separator "mapred.textoutputformat.separator", "\t"); filename for the configuration setting. reducer in a Path file = getDefaultWorkFile(job, extension); temporary directory. FileSystem fs = file.getFileSystem(conf); Creates the FSDataOutputStream fileOut = fs.create(file, false); output file. 90 CHAPTER 3 Data serialization—working with text and beyond Returns a return new LineRecordWriterK, V( RecordWriter fileOut, keyValueSeparator); used to write to the file. The following code is an example of how you specify the OutputFormat that should be usedforaMapReducejob: job.setOutputFormatClass(TextOutputFormat.class); RECORDWRITER You’ll use the RecordWriter to write the reducer outputs to the destination data sink. It’sasimpleclass,asfigure3.6illustrates. The TextOutputFormat returned a LineRecordWriter object (LineRecordWriter is an inner class of TextOutputFormat) to perform the writing to file. A simplified version of thatclass(sourceathttp://goo.gl/8ab7Z)isshowninthefollowingexample: protected static class LineRecordWriterK, V extends RecordWriterK, V protected DataOutputStream out; public synchronized void write(K key, V value) throws IOException writeObject(key); Write out the key, out.write(keyValueSeparator); separator, value, and writeObject(value); newline. out.write(newline); Write out the private void writeObject(Object o) throws IOException Object to the out.write(o); output stream. While on the map side it’s the InputFormat that determines how many map tasks are executed, on the reducer side the number of tasks is solely based on the value for Type definitions for reduce output keys and values. HUULW.95HFRUG: Write a logical key/value record to the XH YDO9YRLGZULWH .NH\ destination data sink. WWHPFRQW&HWHWRQWDVNSW YRLGFORVH7 Clean up any resources related to the Figure 3.6 The annotated destination data sink. RecordWriter class overview TECHNIQUE 12 MapReduce and XML 91 mapred.reduce.tasks set by the client (or if it isn’t set, the value is picked up from mapred-site.xml,ormapred-default.xmlifitdoesn’texistinthesitefile). Now that you know what’s involved in working with input and output data in MapReduce, it’s timetoapplythat knowledgeto solvingsomecommon dataserializa- tion problems. Your first step in this data serialization journey is to learn how to work withfileformatssuchas XML. 3.2 Processing common serialization formats XML and JSON are industry-standard data interchange formats. Their ubiquity in the technology industry is evidenced by their heavy adoption in data storage and exchange. 3.2.1 XML XML has existed since 1998 as a mechanism to represent data that’s readable by machineandhumanalike.Itbecameauniversallanguagefordataexchangebetween systems. It’s employed by many standards today such as SOAP and RSS, and used as an opendataformatforproductssuchasMicrosoftOffice. TECHNIQUE 12 MapReduce and XML While MapReduce comes bundled with an InputFormat that works with text, it doesn’t come with one that supports XML. Working on a single XML file in parallel in MapReduce is tricky because XML doesn’t contain a synchronization marker in its dataformat. Problem You want to work with large XML files in MapReduce and be able to split and process theminparallel. Solution Mahout’s XMLInputFormatcanbeusedtoworkwithXMLfilesinHDFSwithMapReduce. It reads records that are delimited by a specific XML begin and end tag. This tech- niquealsocovershow XMLcanbeemittedasoutputinMapReduceoutput. Discussion MapReducedoesn’tcontainbuilt-insupportfor XML,sowe’llturntoanotherApache project,Mahout,amachinelearningsystem,toprovidean XML InputFormat.Toshow- case the XML InputFormat, let’s write a MapReduce job that uses Mahout’s XML Input- FormattoreadpropertynamesandvaluesfromHadoop’sconfigurationfiles.Thefirst stepwillbetosetupthejobconfiguration: Define the string form of the XML start tag. Your job is taking Hadoop config files as input, where each Define the conf.set("xmlinput.start", "property"); configuration entry uses the property tag. string form of conf.set("xmlinput.end", "/property"); the XML end Set the Mahout XML tag. job.setInputFormatClass(XmlInputFormat.class); input format class. 92 CHAPTER 3 Data serialization—working with text and beyond Looking at the previous code, it quickly becomes apparent that Mahout’s XML Input-Format is rudimentary; you need to tell it an exact sequence of start and end XML tags that will be searched in the file. Looking at the source of the InputFormat 2 confirms this: private boolean next(LongWritable key, Text value) throws IOException if (fsin.getPos() end && readUntilMatch(startTag, false)) try buffer.write(startTag); if (readUntilMatch(endTag, true)) key.set(fsin.getPos()); value.set(buffer.getData(), 0, buffer.getLength()); return true; finally buffer.reset(); return false; Next you need to write a mapper to consume Mahout’s XML InputFormat. The XML element in Text form has been supplied, so you’ll need to use an XML parser to extract content from the XML. A mapper to work with XML public static class Map extends MapperLongWritable, Text, Text, Text Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException String document = value.toString(); System.out.println("'" + document + "'"); try XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new ByteArrayInputStream(document.getBytes())); String propertyName = "; String propertyValue = "; String currentElement = "; while (reader.hasNext()) int code = reader.next(); switch (code) case START_ELEMENT: currentElement = reader.getLocalName(); 2 GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/ manning/hip/ch3/xml/XmlInputFormat.javaTECHNIQUE 12 MapReduce and XML 93 break; case CHARACTERS: if (currentElement.equalsIgnoreCase("name")) propertyName += reader.getText(); else if (currentElement.equalsIgnoreCase("value")) propertyValue += reader.getText(); break; reader.close(); context.write(propertyName.trim(), propertyValue.trim()); catch (Exception e) log.error("Error processing '" + document + "'", e); The map is given a Text instance, which contains a String representation of the data between the start and end tags. In this code you use Java’s built-in Streaming API for XML (StAX) parser to extract the key and value for each property and output them. If you run the MapReduce job against Cloudera’s core-site.xml and use the HDFS cat command to show the output, you’ll see the following output: hadoop fs -put HADOOP_HOME/conf/core-site.xml core-site.xml bin/run.sh com.manning.hip.ch3.xml.HadoopPropertyXMLMapReduce \ core-site.xml output hadoop fs -cat output/part fs.default.name hdfs://localhost:8020 hadoop.tmp.dir /var/lib/hadoop-0.20/cache/user.name hadoop.proxyuser.oozie.hosts hadoop.proxyuser.oozie.groups This output shows that you’ve successfully worked with XML as an input serialization format with MapReduce. Not only that—you can support huge XML files since the InputFormat supports splitting XML. WRITING XML Having successfully read XML, the next question is how do you write XML? In your reducer you have callbacks that occur before and after your main reduce method is 3 called, which you can use to emit a start and end tag, as shown in the following example. 3 GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/ manning/hip/ch3/xml/SimpleXmlOutputMapReduce.java 94 CHAPTER 3 Data serialization—working with text and beyond A reducer to emit start and end tags public static class Reduce extends ReducerText, Text, Text, Text Override Use the setup method to write protected void setup( the root element start tag. Context context) throws IOException, InterruptedException context.write(new Text("configuration"), null); Override Use the cleanup method to write protected void cleanup( the root element end tag. Context context) throws IOException, InterruptedException context.write(new Text("/configuration"), null); private Text outputKey = new Text(); Construct a child XML element for public void reduce(Text key, IterableText values, each key/value combination provided Context context) in the reducer. throws IOException, InterruptedException for (Text value : values) outputKey.set(constructPropertyXml(key, value)); context.write(outputKey, null); Emit the XML element. public static String constructPropertyXml(Text name, Text value) StringBuilder sb = new StringBuilder(); sb.append("propertyname").append(name) .append("/namevalue").append(value) .append("/value/property"); return sb.toString(); This could also be embedded in an OutputFormat, but I’ll leave that as an exercise for the reader. Writing an OutputFormat class is covered in section 3.4.1. PIG If you want to work with XML in Pig, the Piggybank library (a user-contributed library of useful Pig code, detailed in chapter 10) contains an XMLLoader. It works in a way sim- ilar to this technique and captures all of the content between a start and end tag, sup- plying it as a single byte array field in a Pig tuple. HIVE Currently, no means exists for working with XML in Hive. You would have to write a 4 custom SerDe, which we’ll cover in chapter 10. 4 SerDe is a shortened form of Serializer/Deserializer, and is the mechanism that allows Hive to read and write data in HDFS.TECHNIQUE 13 MapReduce and JSON 95 Summary Mahout’s XML InputFormat certainly helps you work with XML. But it’s sensitive to an exact string match of both the start and end element names. If the element tag can contain attributes with variable values, or the generation of the element can’t be con- trolled and could result in XML namespace qualifiers being used, then this approach may not work for you. Also problematic will be situations where the element name you specify is used as a descendant child element. If you have control over the XML laid out in the input, this exercise can be simpli- fied by having a single XML element per line. This will let you use the built-in MapReduce text-based InputFormats (such as TextInputFormat), which treat each line as a record and split accordingly to preserve that demarcation. Another option worth considering is that of a preprocessing step, where you could convert the original XML into a separate line per XML element, or convert it into an altogether different data format such as a SequenceFile or Avro, both of which solve the splitting problem for you. A streaming class called StreamXmlRecordReader also allows you to work with XML in your streaming code. Now that you have a handle on how to work with XML, let’s tackle another popular serialization format, JSON. 3.2.2 JSON JSON shares the machine- and human-readable traits of XML, and has existed since the early 2000s. It’s less verbose than XML, and doesn’t have the rich typing and vali- dation features available in XML. TECHNIQUE 13 MapReduce and JSON Imagine you have some code that’s downloading JSON data from a streaming REST service and every hour writes a file into HDFS. The data amount that’s being down- loaded is large, so each file being produced is multiple gigabytes in size. You’ve been asked to write a MapReduce job that can take as input these large JSON files. What you have here is a problem in two parts: first, MapReduce doesn’t come with an InputFormat that works with JSON. Second, how does one even go about splitting JSON? Figure 3.7 shows the problem with splitting JSON. To split files, given a random offset in a file, you’ll need to be able to determine the start of the next JSON element. This is made more challenging when working with JSON because it’s a hierar- chical data format and the same element name can be used in multiple levels, as shown in the figure. JSON is harder to partition into distinct segments than a format such as XML because JSON doesn’t have a token (like an end tag in XML) to denote the start or end of a record. 96 CHAPTER 3 Data serialization—working with text and beyond Input split N Figure 3.7 Example of issue with JSON and multiple input splits Problem You want to work with JSON inputs in MapReduce and also ensure that input JSON files can be partitioned for concurrent reads. Solution The Elephant Bird LzoJsonInputFormat input format is used as a basis to create an input format class to work with JSON elements. This technique also covers another approach using my open source project that can work with multiline JSON. Discussion 5 Elephant Bird, an open source project that contains some useful utilities for work- ing with LZOP compression, has an LzoJsonInputFormat that can read JSON, though it requires that the input file be LZOP-compressed. You’ll use the Elephant Bird code as a template for your own JSON InputFormat, which doesn’t have the LZOP compres- sion requirement. We’re cheating with this solution, which assumes that each JSON record is on a separate line. My JsonInputFormat is simple and does nothing other than construct and return a JsonRecordReader, so we’ll skip over that code. The JsonRecordReader emits LongWritable, MapWritable key/value pairs to the mapper, where the MapWritable is a map of JSON element names and their values. Let’s take a look at how this 5 See https://github.com/kevinweil/elephant-bird. TECHNIQUE 13 MapReduce and JSON 97 RecordReader works. It leverages the LineRecordReader, which is a built-in MapReduce reader that emits a record for each line. To convert the line to a MapWritable, the 6 reader uses the following method: public static boolean decodeLineToJson(JSONParser parser, Text line, MapWritable value) try JSONObject jsonObj = (JSONObject)parser.parse(line.toString()); for (Object key: jsonObj.keySet()) Text mapKey = new Text(key.toString()); Text mapValue = new Text(); if (jsonObj.get(key) = null) mapValue.set(jsonObj.get(key).toString()); value.put(mapKey, mapValue); return true; catch (ParseException e) LOG.warn("Could not json-decode string: " + line, e); return false; catch (NumberFormatException e) LOG.warn("Could not parse field into number: " + line, e); return false; 7 The reader uses the json-simple parser to parse the line into a JSON object, and then iterates over the keys and puts the keys and values into a MapWritable. The mapper is given the JSON data in LongWritable, MapWritable pairs and can process the data accord- ingly. You can view this basic code for the MapReduce job in the GitHub repository. I’ll demonstrate this technique using the following JSON: "results" : "created_at" : "Thu, 29 Dec 2011 21:46:01 +0000", "from_user" : "grep_alex", "text" : "RT kevinweil: After a lot of hard work by ..." , "created_at" : "Mon, 26 Dec 2011 21:18:37 +0000", "from_user" : "grep_alex", "text" : "miguno pull request has been merged, thanks again" 6 GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/ manning/hip/ch3/json/JsonInputFormat.java 7 See http://code.google.com/p/json-simple/. 98 CHAPTER 3 Data serialization—working with text and beyond Because this technique assumes a JSON object per line, the following shows the JSON file you’ll work with: "created_at" : "Thu, 29 Dec 2011 21:46:01 +0000","from_user" : ... "created_at" : "Mon, 26 Dec 2011 21:18:37 +0000","from_user" : ... Now copy the JSON file into HDFS and run your MapReduce code. The MapReduce code writes each JSON key/value as the job output: hadoop fs -put test-data/ch3/singleline-tweets.json \ singleline-tweets.json bin/run.sh com.manning.hip.ch3.json.JsonMapReduce \ singleline-tweets.json output hadoop fs -cat output/part text RT kevinweil: After a lot of hard work by ... from_user grep_alex created_at Thu, 29 Dec 2011 21:46:01 +0000 text miguno pull request has been merged, thanks again from_user grep_alex created_at Mon, 26 Dec 2011 21:18:37 +0000 WRITING JSON An approach similar to what we looked at in section 3.2.1 for writing XML could also be used to write JSON. PIG Elephant Bird contains a JsonLoader and an LzoJsonLoader, which you can use to work with JSON in Pig. These loaders work with line-based JSON. Each Pig tuple contains a chararray field for each JSON element in the line. HIVE Hive contains a DelimitedJSONSerDe, which can serialize JSON, but, unfortunately, not deserialize it, so you can’t load data into Hive using this SerDe. Summary This solution assumes that the JSON input is structured with a line per JSON object. How would you work with JSON objects that are across multiple lines? An experimen- 8 tal project on GitHub works with multiple input splits over a single JSON file. This approach searches for a specific JSON member and retrieves the containing object. 9 You can also review a Google Code project called hive-json-serde, which can sup- port both serialization and deserialization. As you can see, using XML and JSON in MapReduce is kludgy and has rigid require- ments about how to lay out your data. Support for these two formats in MapReduce is also complex and error prone, since neither lends itself naturally to splitting. Clearly you need to look at alternative file formats that have built-in support for splittability. 8 A multiline JSON InputFormat: https://github.com/alexholmes/json-mapreduce. 9 See http://code.google.com/p/hive-json-serde/. TECHNIQUE 13 Big data serialization formats 99 The next step is to compare more sophisticated file formats, which are better suited to working with MapReduce, such as Avro and SequenceFiles. 3.3 Big data serialization formats Unstructured text works well when you’re working with scalar or tabular data. Semi- structured text formats such as XML and JSON can model more sophisticated data structures that include composite fields, or hierarchical data. But when you’re work- ing with big data volumes you’ll need serialization formats with compact serialized forms that natively support partitioning and have schema evolution features. In this section we’ll compare the serialization formats that work best with big data in MapReduce, and follow up with how you can use them with MapReduce. 3.3.1 Comparing SequenceFiles, Protocol Buffers, Thrift, and Avro It’s important to make certain considerations when choosing a file format. I’ve selected the following criteria based on my belief that these are the important charac- teristics for big data serialization: ■ Code generation—The ability to generate Java classes and utilities that can be used for serialization and deserialization. ■ Versioning—The ability for the file format to support backward or forward compatibility. ■ Language support—The programming languages supported by the library. ■ Transparent compression—The ability for the file format to handle compressing records internally. ■ Splittability—The ability of the file format to support multiple input splits. ■ Native support in MapReduce—The input/output formats that support reading and writing files in their native format (that is, produced directly from the data format library). ■ Pig and Hive support—The Pig Store and Load Functions (referred to as Funcs) and Hive SerDe classes to support the data format. Table 3.1 compares three data serialization frameworks to see how they stack up against each other. Additional background on these technologies is provided in the succeeding section. Now let’s look at each of these formats in more detail. SEQUENCEFILE The SequenceFile format was created to work with MapReduce, Pig, and Hive, and therefore integrates well with all of those tools. Its shortcomings are mainly its lack of code generation and versioning support, as well as limited language support. 100 CHAPTER 3 Data serialization—working with text and beyond Table 3.1 Feature comparison of data serialization frameworks Native Code Language Transparent Pig and Hive Library Versioning Splittable support in generation support compression support MapReduce SequenceFile No No Java, Yes Yes Yes Yes Python Protocol Yes Yes C++, No No No No Buffers (optional) Java, Python, Perl, Ruby a Thrift Yes Yes C, C++, No No No No (mandatory) Java, Python, Ruby, Perl Avro Yes Yes C, C++, Yes Yes Yes Pig only (Hive (optional) Java, coming soon, Python, see HIVE-895) Ruby, C a Thrift does support compression, but not in the Java library. PROTOCOL BUFFERS The Protocol Buffers format has been used heavily by Google for interoperability. Its strengths are its versioning support and compact binary format. Downsides include its lack of support in MapReduce (or in any third-party software) for reading files gener- ated by Protocol Buffers serialization. Not all is lost, however; we’ll look at how Ele- phant Bird uses Protocol Buffers serialization within a higher-level container file in section 3.3.3. THRIFT Thrift was developed at Facebook as a data serialization and RPC framework. It doesn’t have support in MapReduce for its native data serialization format, though it can sup- port different wire-level data representations, including JSON and various binary encodings. Thrift also includes an RPC layer with various types of servers, including a nonblocking implementation. We’ll ignore the RPC capabilities for this chapter and focus on the data serialization. AVRO The Avro format was Doug Cutting’s creation to help address the shortcomings of SequenceFiles. Based on certain evaluation criteria, Avro seems to be the best fit as a data serialization framework in Hadoop. SequenceFile is a close second, due to its inherent compatibility with Hadoop (it was designed for use with Hadoop). You can review a useful project at https://github.com/eishay/jvm-serializers/ wiki/, which runs various benchmarks to compare file formats based on items such as serialization and deserialization times. It contains benchmarks for Avro, Protocol Buf- fers, and Thrift, along with a number of other frameworks. Big data serialization formats 101 After looking at how the various data serialization frameworks compare, we’ll ded- icate the next few sections to showing you how you can work with them. We’ll start things off with a look at SequenceFiles. 3.3.2 SequenceFiles Because SequenceFiles were created for use with MapReduce, +HDGHU arguably they offer the highest level of integration support in conjunction with MapReduce, Pig, and Hive. SequenceFiles are HUVLRQ9 a splittable binary file format that stores data in the form of .H\FODQVVDPH key/value pairs. All SequenceFiles share the same header for- DOXHFODQVVDPH9 mat, as shown in figure 3.8. SequenceFiles come in three types, which vary based FRPSUHVVHG,V" on how you apply compression. In addition, each type has EORFNFRPS,VUHVVHG" its own corresponding Writer classes. &RPSUHVVLRQFRGHF UNCOMPRESSED Uncompressed SequenceFiles are written using the 0HWDGDDW SequenceFile.Writer class. No advantage exists for this over 6\QF the compressed formats, since compression generally reduces your storage footprint and is more efficient for Figure 3.8 Sequence- reads and writes. The file format is shown in figure 3.9. File header format RECORD-COMPRESSED Record compression SequenceFiles are written using the SequenceFile.RecordCompress- Writer class. When a record is added to the SequenceFile, it’s immediately compressed and written to the file. The disadvantage to this approach is that your compression ratio will suffer compared to block compression. This file format is shown, along with uncompressed SequenceFiles, in figure 3.9. Two of the three SequenceFile formats .... (uncompressed and Header Record 1 Record 2 Sync Record 3 record-compressed) utilize the same file format. .... Record length Key length Key Value The only difference between uncompressed and record-compressed is compression of the value. Figure 3.9 Record-based SequenceFile format and its uncompressed SequenceFiles 102 CHAPTER 3 Data serialization—working with text and beyond BLOCK-COMPRESSED Block-compression SequenceFiles are written using the SequenceFile.BlockCompress- Writer class. By default the block size is the same as the HDFS block size, although this can be overridden. The advantage to this compression is that it’s more aggressive; the whole block is compressed, rather than at the record level. Data isn’t written until it reaches the block size, at which point the whole block is compressed, resulting in good overall compression. The file format is shown in figure 3.10. You only need one Reader class (SequenceFile.Reader) to read all three types of SequenceFiles. Even the Writer is abstracted because you can call SequenceFile.create- Writer to choose the preferred format and it returns a base class that can be used for writing regardless of compression. SequenceFiles have a pluggable serialization framework. Written keys and values must have a related org.apache.hadoop.io.serializer.Serializer and Deserializer for marshalling and unmarshalling. Hadoop comes with four serializers: Avro, Java, 10 Tether (for binary data contained within a TetherData class), and Writable (the default serializer). CUSTOM SEQUENCEFILE SERIALIZATION If you want your SequenceFile to contain objects that aren’t Writable or Serializable, you’ll need to implement your own Serializer and register it. You register it by updating core-site.xml and appending the class name of the custom serialization implementation to the io.serializations property. Block-compressed Header Block 1 Sync Block 2 Sync Block 3 file format Number of records Length Key lengths Each one of these fields contains N Length Keys entries, where N = number of records. Length Value lengths They are also all Length Values compressed. Figure 3.10 Block-based SequenceFile format 10 Writable is an interface in Hadoop used to support general-purpose data serialization, and is used for sending data across the wire between Hadoop components. Yahoo has a good introduction to Writables at http://developer.yahoo.com/hadoop/tutorial/module5.htmlwritable.