data serialization formats and what is XML data serialization language and data serialization methods and data serialization in hadoop
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Comment
Data serialization—
working with text
This chapter covers
■
Working with text, XML, and JSON
■
Understanding SequenceFiles, Avro, and
Protocol Buffers
■
Working with custom data formats
MapReduce offers straightforward, well-documented support for working with sim-
ple data formats such as log files. But the use of MapReduce has evolved beyond log
files to more sophisticated data serialization formats—such as text, XML, and
JSON—to the point that its documentation and built-in support runs dry. The goal
of this chapter is to document how to work with common data serialization formats,
as well as to examine more structured serialization formats and compare their fit-
ness for use with MapReduce.
83 84 CHAPTER 3 Data serialization—working with text and beyond
Imagine that you want to work with the ubiquitous data serialization formats XML
and JSON. These formats work in a straightforward manner in most programming lan-
guages, with several tools available to help you with marshalling, unmarshalling, and
validating where applicable. Working with XML and JSON in MapReduce, however,
poses two equally important challenges. First, though MapReduce requires classes that
can support reading and writing a particular data serialization format, there’s a good
chance it doesn’t have such classes to support the serialization format you’re working
with. Second, MapReduce’s power lies in its ability to parallelize reading your input
data. If your input files are large (think hundreds of megabytes or more), it’s crucial
that the classes reading your serialization format be able to split your large files so
multiple map tasks can read them in parallel.
We’ll start this chapter by tackling the problem of how to work with serialization
formats such as XML and JSON. Then we’ll compare and contrast data serialization
formats that are better suited to working with big data. I’ll also show how to use these
serialization formats with MapReduce. The final hurdle is when you need to work with
a file format that’s proprietary, or a less common file format for which no read/write
bindings exist in MapReduce. I’ll show you how to write your own classes to read/
write your file format.
Data serialization support in MapReduce is a property of the input and output
classes that read and write MapReduce data. Let’s start with an overview of how
MapReduce supports data input and output.
This chapter assumes you’re familiar with XML and JSON data formats. Wikipedia
provides some good background articles on XML and JSON, if needed. You should also
have some experience writing MapReduce programs and understand the basic con-
cepts of HDFS and MapReduce input and output. Chuck Lam’s book, Hadoop in Action
from Manning, represents a good resource on this topic.
3.1 Understanding inputs and outputs in MapReduce
Your data might be XML files sitting behind a number of FTP servers, text log files sit-
1
ting on a central web server, or Lucene indexes in HDFS. How does MapReduce sup-
port reading and writing to these different serialization structures across the various
storage mechanisms? You’ll need to know the answer in order to support a specific
serialization format.
Figure 3.1 shows the high-level data flows through MapReduce and identifies the
actors responsible for various parts of the flow. On the input side you see that some
work (Create split) is performed outside of the map phase, and other work is per-
formed as part of the map phase (Read split). All of the output work is performed in
the reduce phase (Write output).
1
Apache Lucene is an information retrieval project that stores data in an inverted index data structure opti-
mized for full-text search. More information is available at http://lucene.apache.org/.Understanding inputs and outputs in MapReduce 85
The partitioner's job is to
logically funnel map outputs to the
reducers.
Input
Output
RecordReader.
InputFormat.getSplits Mapper.map Partitioner.getPartition Reducer.reduce RecordWriter.write
nextKeyValue
Map phase Reduce phase
k,v k,list(v)
Create Write
Read split Map Partition Reduce
split output
The map and reduce functions are
typically written by the user to
The InputFormat and
address a specific use case. The RecordWriter writes the
RecordReader are responsible for
reduce output to the destination
determining what data to feed
data sink, which is the final
into the map function.
resting place of this MapReduce
data flow.
Figure 3.1 High-level input and output actors in MapReduce
Figure 3.2 shows the same flow with a map-only job. In a map-only job the MapReduce
framework still uses the OutputFormat and RecordWriter classes to write the outputs
directly to the data sink.
Let’s walk through the data flow and describe the responsibilities of the various
actors. As we do this, we’ll also look at the relevant code from the built-in TextInput-
Format and TextOutputFormat classes to better understand the concepts. The Text-
InputFormat and TextOutputFormat classes read and write line-oriented text files.
3.1.1 Data input
The two classes that support data input in MapReduce are InputFormat and Record-
Reader. The InputFormat class is consulted to determine how the input data should be
Input Output
RecordReader.
InputFormat.getSplits Mapper.map RecordWriter.write
nextKeyValue
Map phase
Create Write
Read split Map
split output
Figure 3.2 Input and output actors in MapReduce with no reducers 86 CHAPTER 3 Data serialization—working with text and beyond
Type definitions for map
input keys and values.
,QSXW)9RUPDW.
Partition the input
SOW/LVWL,QSXW6SOLWJHW6V-RE&RQWHWFRQWHW
data into InputSplits.
9FUHDWHFRU5HFRUG5HDGHUG5HDGHU,VSO.6SOH5QSXWLWLW
7DVNWWHPSW&RQWHWFRQWHW
Figure 3.3 The annotated
Create a RecordReader to InputFormat class and its
read data from the job inputs.
three contracts
partitioned for the map tasks, and the RecordReader performs the reading of data from
the inputs.
INPUTFORMAT
Every job in MapReduce must define its inputs according to contracts specified in the
InputFormat abstract class. InputFormat implementers must fulfill three contracts: first,
they describe type information for map input keys and values; next, they specify how
the input data should be partitioned; and finally, they indicate the RecordReader
instance that should read the data from source. Figure 3.3 shows the InputFormat class
and how these three contracts are defined.
Arguably the most crucial contract is that of determining how to divide the input
data. In MapReduce nomenclature these divisions are referred to as input splits. The
input splits directly impact the map parallelism because each split is processed by a
single map task. Working with an InputFormat that is unable to create multiple input
splits over a single data source (such as a file) will result in a slow map phase because
the file will be processed sequentially.
The TextInputFormat class (view source at http://goo.gl/VOMcJ) provides an imple-
mentation of the InputFormat class’s createRecordReader method but delegates the cal-
culation of input splits to its parent class, FileInputFormat. The following code shows
the relevant parts of the TextInputFormat class:
The parent class,
FileInputFormat, provides
public class TextInputFormat
all of the input split
extends FileInputFormatLongWritable, Text
functionality.
Override
public RecordReaderLongWritable, Text
createRecordReader(InputSplit split,
TaskAttemptContext context)
String delimiter = context.getConfiguration().get(
The default record delimiter is newline,
"textinputformat.record.delimiter");
but it can be overridden with
byte recordDelimiterBytes = null;
textinputformat.record.delimiter.
if (null = delimiter)
recordDelimiterBytes = delimiter.getBytes(); Understanding inputs and outputs in MapReduce 87
return new LineRecordReader(recordDelimiterBytes);
Construct the
RecordReader to read the
data from the data source.
...
The code in FileInputFormat (source at http://goo.gl/mQfq1) to determine the input
splits is a little more complicated. A simplified form of the code is shown in the follow-
ing example to portray the main elements of this method:
The listStatus method
public ListInputSplit getSplits(JobContext job determines all the input
) throws IOException files for the job.
ListInputSplit splits = new ArrayListInputSplit();
ListFileStatusfiles = listStatus(job);
for (FileStatus file: files)
Path path = file.getPath();
Retrieve all of
BlockLocation blkLocations =
the file blocks.
FileSystem.getFileBlockLocations(file, 0, length);
long splitSize = file.getBlockSize(); The size of the splits is the same as the
block size for the file. Each file can have
while (splitsRemaining())
a different block size.
splits.add(new FileSplit(path, ...));
return splits;
Create a split for each file
block and add it to the result.
The following code is an example of how you specify the InputFormat to use for a
MapReduce job:
job.setInputFormatClass(TextInputFormat.class);
RECORDREADER
The RecordReader class is used by MapReduce in the map tasks to read data from an
input split and provide each record in the form of a key/value pair for use by map-
pers. A task is commonly created for each input split, and each task has a single
RecordReader that’s responsible for reading the data for that input split. Figure 3.4
shows the abstract methods you must implement.
As shown in a previous section, the TextInputFormat class created a LineRecordReader
to read records from the input splits. The LineRecordReader directly extends the
RecordReader class and leverages the LineReader class to read lines from the input split.
The LineRecordReader uses the byte offset in the file for the map key and the contents
of the line for the map value. I’ve included a simplified version of the LineRecordReader
in the following example (source at http://goo.gl/iIS59):
public class LineRecordReader
extends RecordReaderLongWritable, Text
private LineReader in; 88 CHAPTER 3 Data serialization—working with text and beyond
private LongWritable key = new LongWritable();
private Text value = new Text();
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException
FileSplit split = (FileSplit) genericSplit;
Open an InputStream to
Seek to the start the input split file.
// open the file and seek to the start of the split
of the input split. FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
fileIn.seek(start);
If you aren’t at the start of the file, you need to figure out
in = new LineReader(fileIn, job);
Create a new
where to start reading the lines. The only way to do this is to
LineReader that
keep reading characters until you hit a newline, at which point
if (notAtStartOfFile)
can read lines from
you’re ready to start supplying lines to the map.
start += in.readLine(...);
a stream.
After the initialize method is called, the
nextKeyValue method is called repeatedly
by the MapReduce framework until such a
public boolean nextKeyValue() throws IOException
time as it returns false, which signifies the
Set the byte
end of the input split.
key.set(pos);
offset in the
return in.readLine(value, ...) 0;
file as the key. Read the next line into the value. If
you’ve gone beyond the end of the input
split, you return false.
Because the LineReader class is easy, we’ll skip that code. The next step will be a look at
how MapReduce supports data outputs.
Type definitions for map
Initialization, which could involve
input keys and values.
seeking into a file and
determining the logical starting
point of the next record.
Reads the next record from
RecordReaderKEYIN,VALUEIN
file and returns a flag
void initialize(InputSplit split, TaskAttemptContext context)
indicating if the end of the
split has been reached.
boolean nextKeyValue()
KEYIN getCurrentKey()
Returns the current
record's key.
VALUEIN getCurrentValue()
Returns the current
record's value.
void close()
Returns the current
progress of the reader.
Closes any resources
associated with the data
source.
Figure 3.4 The annotated RecordReader class and its abstract methodsUnderstanding inputs and outputs in MapReduce 89
Type definitions for reduce
output keys and values.
Create a RecordWriter
OutputFormatK,V
instance to write data
RecordWriterK, V getRecordWriter(TaskAttemptContext context)
to the destination.
void checkOutputSpecs(JobContext context)
Verify the output details
OutputCommitter getOutputCommitter(TaskAttemptContext context)
associated with the
MapReduce job are correct.
Get the associated OutputCommitter.
OutputCommitters are responsible for
“finalizing” the output after successful
task and job completion.
Figure 3.5 The annotated OutputFormat class
3.1.2 Data output
MapReduce uses a similar process for supporting output data as it does for input data.
Two classes must exist, an OutputFormat and a RecordWriter. The OutputFormat performs
some basic validation of the data sink properties, and the RecordWriter writes each
reducer output to the data sink.
OUTPUTFORMAT
Much like the InputFormat class, the OutputFormat class, as shown in figure 3.5, defines
the contracts that implementers must fulfill, including checking the information
related to the job output, providing a RecordWriter, and specifying an output commit-
ter, which allows writes to be staged and then made “permanent” upon task and/or
job success. Please refer to chapter 5 for more details on output committing.
Just like the TextInputFormat, the TextOutputFormat also extends a base class, File-
OutputFormat, which takes care of some complicated logistics such as output commit-
ting, which we’ll cover further in this chapter. For now let’s take a look at the work the
TextOutputFormat is performing (source at http://goo.gl/8ab7Z):
public class TextOutputFormatK, V extends FileOutputFormatK, V
public RecordWriterK, V getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException
boolean isCompressed = getCompressOutput(job);
The default key/value separator is the tab
character, but this can be changed with
String keyValueSeparator= conf.get(
Creates a unique
the mapred.textoutputformat.separator
"mapred.textoutputformat.separator", "\t");
filename for the
configuration setting.
reducer in a
Path file = getDefaultWorkFile(job, extension);
temporary
directory.
FileSystem fs = file.getFileSystem(conf);
Creates the
FSDataOutputStream fileOut = fs.create(file, false);
output file. 90 CHAPTER 3 Data serialization—working with text and beyond
Returns a
return new LineRecordWriterK, V(
RecordWriter
fileOut, keyValueSeparator);
used to write to
the file.
The following code is an example of how you specify the OutputFormat that should be
usedforaMapReducejob:
job.setOutputFormatClass(TextOutputFormat.class);
RECORDWRITER
You’ll use the RecordWriter to write the reducer outputs to the destination data sink.
It’sasimpleclass,asfigure3.6illustrates.
The TextOutputFormat returned a LineRecordWriter object (LineRecordWriter is an
inner class of TextOutputFormat) to perform the writing to file. A simplified version of
thatclass(sourceathttp://goo.gl/8ab7Z)isshowninthefollowingexample:
protected static class LineRecordWriterK, V extends RecordWriterK, V
protected DataOutputStream out;
public synchronized void write(K key, V value)
throws IOException
writeObject(key);
Write out the key,
out.write(keyValueSeparator);
separator, value, and
writeObject(value);
newline.
out.write(newline);
Write out the
private void writeObject(Object o) throws IOException
Object to the
out.write(o);
output stream.
While on the map side it’s the InputFormat that determines how many map tasks are
executed, on the reducer side the number of tasks is solely based on the value for
Type definitions for reduce
output keys and values.
HUULW.95HFRUG:
Write a logical key/value
record to the
XHYDO9YRLGZULWH.NH\
destination data sink.
WWHPFRQW&HWHWRQWDVNSWYRLGFORVH7
Clean up any resources
related to the Figure 3.6 The annotated
destination data sink. RecordWriter class overview TECHNIQUE 12 MapReduce and XML 91
mapred.reduce.tasks set by the client (or if it isn’t set, the value is picked up from
mapred-site.xml,ormapred-default.xmlifitdoesn’texistinthesitefile).
Now that you know what’s involved in working with input and output data in
MapReduce, it’s timetoapplythat knowledgeto solvingsomecommon dataserializa-
tion problems. Your first step in this data serialization journey is to learn how to work
withfileformatssuchas XML.
3.2 Processing common serialization formats
XML and JSON are industry-standard data interchange formats. Their ubiquity in the
technology industry is evidenced by their heavy adoption in data storage and
exchange.
3.2.1 XML
XML has existed since 1998 as a mechanism to represent data that’s readable by
machineandhumanalike.Itbecameauniversallanguagefordataexchangebetween
systems. It’s employed by many standards today such as SOAP and RSS, and used as an
opendataformatforproductssuchasMicrosoftOffice.
TECHNIQUE 12 MapReduce and XML
While MapReduce comes bundled with an InputFormat that works with text, it doesn’t
come with one that supports XML. Working on a single XML file in parallel in
MapReduce is tricky because XML doesn’t contain a synchronization marker in its
dataformat.
Problem
You want to work with large XML files in MapReduce and be able to split and process
theminparallel.
Solution
Mahout’s XMLInputFormatcanbeusedtoworkwithXMLfilesinHDFSwithMapReduce.
It reads records that are delimited by a specific XML begin and end tag. This tech-
niquealsocovershow XMLcanbeemittedasoutputinMapReduceoutput.
Discussion
MapReducedoesn’tcontainbuilt-insupportfor XML,sowe’llturntoanotherApache
project,Mahout,amachinelearningsystem,toprovidean XML InputFormat.Toshow-
case the XML InputFormat, let’s write a MapReduce job that uses Mahout’s XML Input-
FormattoreadpropertynamesandvaluesfromHadoop’sconfigurationfiles.Thefirst
stepwillbetosetupthejobconfiguration:
Define the string form of the XML start tag. Your
job is taking Hadoop config files as input, where each
Define the conf.set("xmlinput.start", "property");
configuration entry uses the property tag.
string form of
conf.set("xmlinput.end", "/property");
the XML end
Set the Mahout XML
tag.
job.setInputFormatClass(XmlInputFormat.class); input format class. 92 CHAPTER 3 Data serialization—working with text and beyond
Looking at the previous code, it quickly becomes apparent that Mahout’s XML
Input-Format is rudimentary; you need to tell it an exact sequence of start and end
XML tags that will be searched in the file. Looking at the source of the InputFormat
2
confirms this:
private boolean next(LongWritable key, Text value)
throws IOException
if (fsin.getPos() end && readUntilMatch(startTag, false))
try
buffer.write(startTag);
if (readUntilMatch(endTag, true))
key.set(fsin.getPos());
value.set(buffer.getData(), 0, buffer.getLength());
return true;
finally
buffer.reset();
return false;
Next you need to write a mapper to consume Mahout’s XML InputFormat. The XML
element in Text form has been supplied, so you’ll need to use an XML parser to
extract content from the XML.
A mapper to work with XML
public static class Map extends MapperLongWritable, Text,
Text, Text
Override
protected void map(LongWritable key, Text value,
Mapper.Context context)
throws
IOException, InterruptedException
String document = value.toString();
System.out.println("'" + document + "'");
try
XMLStreamReader reader =
XMLInputFactory.newInstance().createXMLStreamReader(new
ByteArrayInputStream(document.getBytes()));
String propertyName = ";
String propertyValue = ";
String currentElement = ";
while (reader.hasNext())
int code = reader.next();
switch (code)
case START_ELEMENT:
currentElement = reader.getLocalName();
2
GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/
manning/hip/ch3/xml/XmlInputFormat.javaTECHNIQUE 12 MapReduce and XML 93
break;
case CHARACTERS:
if (currentElement.equalsIgnoreCase("name"))
propertyName += reader.getText();
else if (currentElement.equalsIgnoreCase("value"))
propertyValue += reader.getText();
break;
reader.close();
context.write(propertyName.trim(), propertyValue.trim());
catch (Exception e)
log.error("Error processing '" + document + "'", e);
The map is given a Text instance, which contains a String representation of the data
between the start and end tags. In this code you use Java’s built-in Streaming API for
XML (StAX) parser to extract the key and value for each property and output them. If
you run the MapReduce job against Cloudera’s core-site.xml and use the HDFS cat
command to show the output, you’ll see the following output:
hadoop fs -put HADOOP_HOME/conf/core-site.xml core-site.xml
bin/run.sh com.manning.hip.ch3.xml.HadoopPropertyXMLMapReduce \
core-site.xml output
hadoop fs -cat output/part
fs.default.name hdfs://localhost:8020
hadoop.tmp.dir /var/lib/hadoop-0.20/cache/user.name
hadoop.proxyuser.oozie.hosts
hadoop.proxyuser.oozie.groups
This output shows that you’ve successfully worked with XML as an input serialization
format with MapReduce. Not only that—you can support huge XML files since the
InputFormat supports splitting XML.
WRITING XML
Having successfully read XML, the next question is how do you write XML? In your
reducer you have callbacks that occur before and after your main reduce method is
3
called, which you can use to emit a start and end tag, as shown in the following example.
3
GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/
manning/hip/ch3/xml/SimpleXmlOutputMapReduce.java 94 CHAPTER 3 Data serialization—working with text and beyond
A reducer to emit start and end tags
public static class Reduce
extends ReducerText, Text, Text, Text
Override
Use the setup method to write
protected void setup(
the root element start tag.
Context context)
throws IOException, InterruptedException
context.write(new Text("configuration"), null);
Override
Use the cleanup method to write
protected void cleanup(
the root element end tag.
Context context)
throws IOException, InterruptedException
context.write(new Text("/configuration"), null);
private Text outputKey = new Text();
Construct a child XML element for
public void reduce(Text key, IterableText values,
each key/value combination provided
Context context)
in the reducer.
throws IOException, InterruptedException
for (Text value : values)
outputKey.set(constructPropertyXml(key, value));
context.write(outputKey, null);
Emit the XML element.
public static String constructPropertyXml(Text name, Text value)
StringBuilder sb = new StringBuilder();
sb.append("propertyname").append(name)
.append("/namevalue").append(value)
.append("/value/property");
return sb.toString();
This could also be embedded in an OutputFormat, but I’ll leave that as an exercise for
the reader. Writing an OutputFormat class is covered in section 3.4.1.
PIG
If you want to work with XML in Pig, the Piggybank library (a user-contributed library
of useful Pig code, detailed in chapter 10) contains an XMLLoader. It works in a way sim-
ilar to this technique and captures all of the content between a start and end tag, sup-
plying it as a single byte array field in a Pig tuple.
HIVE
Currently, no means exists for working with XML in Hive. You would have to write a
4
custom SerDe, which we’ll cover in chapter 10.
4
SerDe is a shortened form of Serializer/Deserializer, and is the mechanism that allows Hive to read and write
data in HDFS.TECHNIQUE 13 MapReduce and JSON 95
Summary
Mahout’s XML InputFormat certainly helps you work with XML. But it’s sensitive to an
exact string match of both the start and end element names. If the element tag can
contain attributes with variable values, or the generation of the element can’t be con-
trolled and could result in XML namespace qualifiers being used, then this approach
may not work for you. Also problematic will be situations where the element name you
specify is used as a descendant child element.
If you have control over the XML laid out in the input, this exercise can be simpli-
fied by having a single XML element per line. This will let you use the built-in
MapReduce text-based InputFormats (such as TextInputFormat), which treat each line as
a record and split accordingly to preserve that demarcation.
Another option worth considering is that of a preprocessing step, where you could
convert the original XML into a separate line per XML element, or convert it into an
altogether different data format such as a SequenceFile or Avro, both of which solve
the splitting problem for you.
A streaming class called StreamXmlRecordReader also allows you to work with XML in
your streaming code.
Now that you have a handle on how to work with XML, let’s tackle another popular
serialization format, JSON.
3.2.2 JSON
JSON shares the machine- and human-readable traits of XML, and has existed since
the early 2000s. It’s less verbose than XML, and doesn’t have the rich typing and vali-
dation features available in XML.
TECHNIQUE 13 MapReduce and JSON
Imagine you have some code that’s downloading JSON data from a streaming REST
service and every hour writes a file into HDFS. The data amount that’s being down-
loaded is large, so each file being produced is multiple gigabytes in size.
You’ve been asked to write a MapReduce job that can take as input these large
JSON files. What you have here is a problem in two parts: first, MapReduce doesn’t
come with an InputFormat that works with JSON. Second, how does one even go about
splitting JSON? Figure 3.7 shows the problem with splitting JSON. To split files, given a
random offset in a file, you’ll need to be able to determine the start of the next JSON
element. This is made more challenging when working with JSON because it’s a hierar-
chical data format and the same element name can be used in multiple levels, as
shown in the figure.
JSON is harder to partition into distinct segments than a format such as XML
because JSON doesn’t have a token (like an end tag in XML) to denote the start or end
of a record. 96 CHAPTER 3 Data serialization—working with text and beyond
Input split N
Figure 3.7 Example of issue with JSON and multiple input splits
Problem
You want to work with JSON inputs in MapReduce and also ensure that input JSON
files can be partitioned for concurrent reads.
Solution
The Elephant Bird LzoJsonInputFormat input format is used as a basis to create an
input format class to work with JSON elements. This technique also covers another
approach using my open source project that can work with multiline JSON.
Discussion
5
Elephant Bird, an open source project that contains some useful utilities for work-
ing with LZOP compression, has an LzoJsonInputFormat that can read JSON, though it
requires that the input file be LZOP-compressed. You’ll use the Elephant Bird code
as a template for your own JSON InputFormat, which doesn’t have the LZOP compres-
sion requirement.
We’re cheating with this solution, which assumes that each JSON record is on a
separate line. My JsonInputFormat is simple and does nothing other than construct
and return a JsonRecordReader, so we’ll skip over that code. The JsonRecordReader
emits LongWritable, MapWritable key/value pairs to the mapper, where the MapWritable
is a map of JSON element names and their values. Let’s take a look at how this
5
See https://github.com/kevinweil/elephant-bird. TECHNIQUE 13 MapReduce and JSON 97
RecordReader works. It leverages the LineRecordReader, which is a built-in MapReduce
reader that emits a record for each line. To convert the line to a MapWritable, the
6
reader uses the following method:
public static boolean decodeLineToJson(JSONParser parser, Text line,
MapWritable value)
try
JSONObject jsonObj = (JSONObject)parser.parse(line.toString());
for (Object key: jsonObj.keySet())
Text mapKey = new Text(key.toString());
Text mapValue = new Text();
if (jsonObj.get(key) = null)
mapValue.set(jsonObj.get(key).toString());
value.put(mapKey, mapValue);
return true;
catch (ParseException e)
LOG.warn("Could not json-decode string: " + line, e);
return false;
catch (NumberFormatException e)
LOG.warn("Could not parse field into number: " + line, e);
return false;
7
The reader uses the json-simple parser to parse the line into a JSON object, and then
iterates over the keys and puts the keys and values into a MapWritable. The mapper is
given the JSON data in LongWritable, MapWritable pairs and can process the data accord-
ingly. You can view this basic code for the MapReduce job in the GitHub repository.
I’ll demonstrate this technique using the following JSON:
"results" :
"created_at" : "Thu, 29 Dec 2011 21:46:01 +0000",
"from_user" : "grep_alex",
"text" : "RT kevinweil: After a lot of hard work by ..."
,
"created_at" : "Mon, 26 Dec 2011 21:18:37 +0000",
"from_user" : "grep_alex",
"text" : "miguno pull request has been merged, thanks again"
6
GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/
manning/hip/ch3/json/JsonInputFormat.java
7
See http://code.google.com/p/json-simple/. 98 CHAPTER 3 Data serialization—working with text and beyond
Because this technique assumes a JSON object per line, the following shows the JSON
file you’ll work with:
"created_at" : "Thu, 29 Dec 2011 21:46:01 +0000","from_user" : ...
"created_at" : "Mon, 26 Dec 2011 21:18:37 +0000","from_user" : ...
Now copy the JSON file into HDFS and run your MapReduce code. The MapReduce
code writes each JSON key/value as the job output:
hadoop fs -put test-data/ch3/singleline-tweets.json \
singleline-tweets.json
bin/run.sh com.manning.hip.ch3.json.JsonMapReduce \
singleline-tweets.json output
hadoop fs -cat output/part
text RT kevinweil: After a lot of hard work by ...
from_user grep_alex
created_at Thu, 29 Dec 2011 21:46:01 +0000
text
miguno pull request has been merged, thanks again
from_user grep_alex
created_at Mon, 26 Dec 2011 21:18:37 +0000
WRITING JSON
An approach similar to what we looked at in section 3.2.1 for writing XML could also
be used to write JSON.
PIG
Elephant Bird contains a JsonLoader and an LzoJsonLoader, which you can use to work
with JSON in Pig. These loaders work with line-based JSON. Each Pig tuple contains a
chararray field for each JSON element in the line.
HIVE
Hive contains a DelimitedJSONSerDe, which can serialize JSON, but, unfortunately, not
deserialize it, so you can’t load data into Hive using this SerDe.
Summary
This solution assumes that the JSON input is structured with a line per JSON object.
How would you work with JSON objects that are across multiple lines? An experimen-
8
tal project on GitHub works with multiple input splits over a single JSON file. This
approach searches for a specific JSON member and retrieves the containing object.
9
You can also review a Google Code project called hive-json-serde, which can sup-
port both serialization and deserialization.
As you can see, using XML and JSON in MapReduce is kludgy and has rigid require-
ments about how to lay out your data. Support for these two formats in MapReduce is
also complex and error prone, since neither lends itself naturally to splitting. Clearly
you need to look at alternative file formats that have built-in support for splittability.
8
A multiline JSON InputFormat: https://github.com/alexholmes/json-mapreduce.
9
See http://code.google.com/p/hive-json-serde/. TECHNIQUE 13 Big data serialization formats 99
The next step is to compare more sophisticated file formats, which are better
suited to working with MapReduce, such as Avro and SequenceFiles.
3.3 Big data serialization formats
Unstructured text works well when you’re working with scalar or tabular data. Semi-
structured text formats such as XML and JSON can model more sophisticated data
structures that include composite fields, or hierarchical data. But when you’re work-
ing with big data volumes you’ll need serialization formats with compact serialized
forms that natively support partitioning and have schema evolution features.
In this section we’ll compare the serialization formats that work best with big data
in MapReduce, and follow up with how you can use them with MapReduce.
3.3.1 Comparing SequenceFiles, Protocol Buffers, Thrift, and Avro
It’s important to make certain considerations when choosing a file format. I’ve
selected the following criteria based on my belief that these are the important charac-
teristics for big data serialization:
■
Code generation—The ability to generate Java classes and utilities that can be
used for serialization and deserialization.
■
Versioning—The ability for the file format to support backward or forward
compatibility.
■
Language support—The programming languages supported by the library.
■
Transparent compression—The ability for the file format to handle compressing
records internally.
■
Splittability—The ability of the file format to support multiple input splits.
■
Native support in MapReduce—The input/output formats that support reading
and writing files in their native format (that is, produced directly from the data
format library).
■
Pig and Hive support—The Pig Store and Load Functions (referred to as Funcs)
and Hive SerDe classes to support the data format.
Table 3.1 compares three data serialization frameworks to see how they stack up
against each other. Additional background on these technologies is provided in the
succeeding section. Now let’s look at each of these formats in more detail.
SEQUENCEFILE
The SequenceFile format was created to work with MapReduce, Pig, and Hive, and
therefore integrates well with all of those tools. Its shortcomings are mainly its lack of
code generation and versioning support, as well as limited language support. 100 CHAPTER 3 Data serialization—working with text and beyond
Table 3.1 Feature comparison of data serialization frameworks
Native
Code Language Transparent Pig and Hive
Library Versioning Splittable support in
generation support compression support
MapReduce
SequenceFile No No Java, Yes Yes Yes Yes
Python
Protocol Yes Yes C++, No No No No
Buffers (optional) Java,
Python,
Perl, Ruby
a
Thrift Yes Yes C, C++, No No No
No
(mandatory) Java,
Python,
Ruby, Perl
Avro Yes Yes C, C++, Yes Yes Yes Pig only (Hive
(optional) Java, coming soon,
Python, see HIVE-895)
Ruby, C
a
Thrift does support compression, but not in the Java library.
PROTOCOL BUFFERS
The Protocol Buffers format has been used heavily by Google for interoperability. Its
strengths are its versioning support and compact binary format. Downsides include its
lack of support in MapReduce (or in any third-party software) for reading files gener-
ated by Protocol Buffers serialization. Not all is lost, however; we’ll look at how Ele-
phant Bird uses Protocol Buffers serialization within a higher-level container file in
section 3.3.3.
THRIFT
Thrift was developed at Facebook as a data serialization and RPC framework. It doesn’t
have support in MapReduce for its native data serialization format, though it can sup-
port different wire-level data representations, including JSON and various binary
encodings. Thrift also includes an RPC layer with various types of servers, including a
nonblocking implementation. We’ll ignore the RPC capabilities for this chapter and
focus on the data serialization.
AVRO
The Avro format was Doug Cutting’s creation to help address the shortcomings of
SequenceFiles. Based on certain evaluation criteria, Avro seems to be the best fit as a
data serialization framework in Hadoop. SequenceFile is a close second, due to its
inherent compatibility with Hadoop (it was designed for use with Hadoop).
You can review a useful project at https://github.com/eishay/jvm-serializers/
wiki/, which runs various benchmarks to compare file formats based on items such as
serialization and deserialization times. It contains benchmarks for Avro, Protocol Buf-
fers, and Thrift, along with a number of other frameworks. Big data serialization formats 101
After looking at how the various data serialization frameworks compare, we’ll ded-
icate the next few sections to showing you how you can work with them. We’ll start
things off with a look at SequenceFiles.
3.3.2 SequenceFiles
Because SequenceFiles were created for use with MapReduce,
+HDGHU
arguably they offer the highest level of integration support in
conjunction with MapReduce, Pig, and Hive. SequenceFiles are
HUVLRQ9
a splittable binary file format that stores data in the form of
.H\FODQVVDPH
key/value pairs. All SequenceFiles share the same header for-
DOXHFODQVVDPH9
mat, as shown in figure 3.8.
SequenceFiles come in three types, which vary based
FRPSUHVVHG,V"
on how you apply compression. In addition, each type has
EORFNFRPS,VUHVVHG"
its own corresponding Writer classes.
&RPSUHVVLRQFRGHF
UNCOMPRESSED
Uncompressed SequenceFiles are written using the
0HWDGDDW
SequenceFile.Writer class. No advantage exists for this over
6\QF
the compressed formats, since compression generally
reduces your storage footprint and is more efficient for
Figure 3.8 Sequence-
reads and writes. The file format is shown in figure 3.9.
File header format
RECORD-COMPRESSED
Record compression SequenceFiles are written using the SequenceFile.RecordCompress-
Writer class. When a record is added to the SequenceFile, it’s immediately compressed
and written to the file. The disadvantage to this approach is that your compression
ratio will suffer compared to block compression. This file format is shown, along with
uncompressed SequenceFiles, in figure 3.9.
Two of the three
SequenceFile formats
....
(uncompressed and
Header Record 1 Record 2 Sync Record 3
record-compressed)
utilize the same file
format.
....
Record length Key length Key Value
The only difference between
uncompressed and record-compressed
is compression of the value.
Figure 3.9 Record-based SequenceFile format and its uncompressed SequenceFiles 102 CHAPTER 3 Data serialization—working with text and beyond
BLOCK-COMPRESSED
Block-compression SequenceFiles are written using the SequenceFile.BlockCompress-
Writer class. By default the block size is the same as the HDFS block size, although this
can be overridden. The advantage to this compression is that it’s more aggressive; the
whole block is compressed, rather than at the record level. Data isn’t written until it
reaches the block size, at which point the whole block is compressed, resulting in
good overall compression. The file format is shown in figure 3.10.
You only need one Reader class (SequenceFile.Reader) to read all three types of
SequenceFiles. Even the Writer is abstracted because you can call SequenceFile.create-
Writer to choose the preferred format and it returns a base class that can be used for
writing regardless of compression.
SequenceFiles have a pluggable serialization framework. Written keys and values
must have a related org.apache.hadoop.io.serializer.Serializer and Deserializer for
marshalling and unmarshalling. Hadoop comes with four serializers: Avro, Java,
10
Tether (for binary data contained within a TetherData class), and Writable (the
default serializer).
CUSTOM SEQUENCEFILE SERIALIZATION
If you want your SequenceFile to contain objects that aren’t Writable or
Serializable, you’ll need to implement your own Serializer and register it.
You register it by updating core-site.xml and appending the class name of
the custom serialization implementation to the io.serializations property.
Block-compressed
Header Block 1 Sync Block 2 Sync Block 3
file format
Number of records
Length Key lengths Each one of these
fields contains N
Length Keys
entries, where N =
number of records.
Length Value lengths
They are also all
Length Values
compressed.
Figure 3.10 Block-based SequenceFile format
10
Writable is an interface in Hadoop used to support general-purpose data serialization, and is used for
sending data across the wire between Hadoop components. Yahoo has a good introduction to Writables
at http://developer.yahoo.com/hadoop/tutorial/module5.htmlwritable.
Advise:Why You Wasting Money in Costly SEO Tools, Use World's Best Free SEO Tool Ubersuggest.