How to Process unstructured data in Hadoop

how to process xml data in hadoop and how to process json data in hadoop and how to process unstructured data in hadoop example
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Comment
Processing Data in Hadoop In the previous chapters we’ve covered considerations around modeling data in Hadoop and how to move data in and out of Hadoop. Once we have data loaded and modeled in Hadoop, we’ll of course want to access and work with that data. In this chapter we review the frameworks available for processing data in Hadoop. With processing, just like everything else with Hadoop, we have to understand the available options before deciding on a specific framework. These options give us the knowledge to select the correct tool for the job, but they also add confusion for those new to the ecosystem. This chapter is written with the goal of giving you the knowl‐ edge to select the correct tool based on your specific use cases. We will open the chapter by reviewing the main execution engines—the frameworks directly responsible for executing data processing tasks on Hadoop clusters. This includes the well-established MapReduce framework, as well as newer options such as data flow engines like Spark. We’ll then move to higher-level abstractions such as Hive, Pig, Crunch, and Cascad‐ ing. These tools are designed to provide easier-to-use abstractions over lower-level frameworks such as MapReduce. For each processing framework, we’ll provide: • An overview of the framework • A simple example using the framework • Rules for when to use the framework • Recommended resources for further information on the framework After reading this chapter, you will gain an understanding of the various data pro‐ cessing options, but not deep expertise in any of them. Our goal in this chapter is to 79 give you confidence that you are selecting the correct tool for your use case. If you want more detail, we’ll provide references for you to dig deeper into a particular tool. Shared Nothing Architectures Before we dive into a specifics of each framework, note one thing they all have in common: as much as possible, they attempt to implement a shared nothing architecture. In distributed systems, this is an architecture where each node is completely independent of other nodes in the system. There are no shared resources that can become bottlenecks. The lack of shared resources refers to physical resources such as memory, disks, and CPUs—instead of using centralized storage, Hadoop’s processing framework uses the distributed HDFS storage. It also refers to lack of shared data—in those frameworks, each node is processing a distinct subset of the data and there’s no need to manage access to shared data. Shared nothing architectures are very scalable: because there are no shared resources, addition of nodes adds resources to the system and does not introduce further contention. These architectures are also fault- tolerant: each node is independent, so there are no single points of failure, and the system can quickly recover from a failure of an individual node. As you read this chapter, notice how each frame‐ work preserves the principles of shared nothing architecture whereas its other details differ. MapReduce The MapReduce model was introduced in a white paper by Jeffrey Dean and Sanjay Ghemawat from Google called MapReduce: Simplified Data Processing on Large Clus‐ ters. This paper described a programming model and an implementation for process‐ ing and generating large data sets. This programming model provided a way to develop applications to process large data sets in parallel, without many of the pro‐ gramming challenges usually associated with developing distributed, concurrent applications. The shared nothing architecture described by this model provided a way to implement a system that could be scaled through the addition of more nodes, while also providing fault tolerance when individual nodes or processes failed. MapReduce Overview The MapReduce programming paradigm breaks processing into two basic phases: a map phase and a reduce phase. The input and output of each phase are key-value pairs. The processes executing the map phase are called mappers. Mappers are Java pro‐ cesses (JVMs) that normally start up on nodes that also contain the data they will 80 Chapter 3: Processing Data in Hadoop process. Data locality is an important principle of MapReduce; with large data sets, moving the processing to the servers that contain the data is much more efficient than moving the data across the network. An example of the types of processing typi‐ cally performed in mappers are parsing, transformation, and filtering. When the mapper has processed the input data it will output a key-value pair to the next phase, the sort and shuffle. In the sort and shuffle phase, data is sorted and partitioned. We will discuss the details of how this works later in the chapter. This partitioned and sorted data is sent over the network to reducer JVMs that read the data ordered and partitioned by the keys. When a reducer process gets these records, the reduce0 function can do any number of operations on the data, but most likely the reducer will write out some amount of the data or aggregate to a store like HDFS or HBase. To summarize, there are two sets of JVMs. One gets data unsorted and the other gets data sorted and partitioned. There are many more parts to MapReduce that we will touch on in a minute, but Figure 3-1 shows what has been described so far. Figure 3-1. MapReduce sort andshuffle The following are some typical characteristics of MapReduce processing: • Mappers process input in key-value pairs and are only able to process a single pair at a time. The number of mappers is set by the framework, not the developer. • Mappers pass key-value pairs as output to reducers, but can’t pass information to other mappers. Reducers can’t communicate with other reducers either. • Mappers and reducers typically don’t use much memory and the JVM heap size is set relatively low. MapReduce 81 • Each reducer typically, although not always, has a single output stream—by default a set of files named part-r-00000, part-r-00001, and so on, in a single HDFS directory. • The output of the mapper and the reducer is written to disk. If the output of the reducer requires additional processing, the entire data set will be written to disk and then read again. This pattern is called synchronization barrier and is one of the major reasons MapReduce is considered inefficient for iterative processing of data. Before we go into the lower-level details of MapReduce, it is important to note that MapReduce has two major weaknesses that make it a poor option for iterative algo‐ rithms. The first is the startup time. Even if you are doing almost nothing in the Map‐ Reduce processing, there is a loss of 10—30 seconds just to startup cost. Second, MapReduce writes to disk frequently in order to facilitate fault tolerance. Later on in this chapter when we study Spark, we will learn that all this disk I/O isn’t required. Figure 3-2 illustrates how many times MapReduce reads and writes to disk during typical processing. Figure 3-2. MapReduce I/O One of the things that makes MapReduce so powerful is the fact that it is made not just of map and reduce tasks, but rather multiple components working together. Each one of these components can be extended by the developer. Therefore, in order to make the most out of MapReduce, it is important to understand its basic building blocks in detail. In the next section we’ll start with a detailed look into the map phase in order to work toward this understanding. There are a number of good references that provide more detail on MapReduce than we can go into here, including implementations of various algorithms. Some good resources are Hadoop: The Definitive Guide, Hadoop in Practice, and MapReduce Design Patterns by Donald Miner and Adam Shook (O’Reilly). 82 Chapter 3: Processing Data in HadoopMap phase Next, we provide a detailed overview of the major components involved in the map phase of a MapReduce job. InputFormat. MapReduce jobs access their data through the InputFormat class. This class implements two important methods: getSplits() This method implements the logic of how input will be distributed between the map processes. The most commonly used Input Format is the TextInputFormat, which will generate an input split per block and give the location of the block to the map task. The framework will then execute a mapper for each of the splits. This is why developers usually assume the number of mappers in a MapReduce job is equal to the number of blocks in the data set it will process. This method determines the number of map processes and the cluster nodes on which they will execute, but because it can be overridden by the developer of the MapReduce job, you have complete control over the way in which files are read. For example, the NMapInputFormat in the HBase code base allows you to directly set the number of mappers executing the job. getReader() This method provides a reader to the map task that allows it to access the data it will process. Because the developer can override this method, MapReduce can support any data type. As long as you can provide a method that reads the data into a writable object, you can process it with the MapReduce framework. RecordReader. The RecordReader class reads the data blocks and returns key-value records to the map task. The implementation of most RecordReaders is surprisingly simple: a RecordReader instance is initialized with the start position in the file for the block it needs to read and the URI of the file in HDFS. After seeking to the start posi‐ tion, each call to nextKeyValue() will find the next row delimiter and read the next record. This pattern is illustrated in Figure 3-3. MapReduce 83Figure 3-3. MapReduce RecordReader The MapReduce framework and other ecosystem projects provide RecordReader implementations for many file formats: text delimited, SequenceFile, Avro, Parquet, and more. There are even RecordReaders that don’t read any data—NMapInputFormat returns a NullWritable as the key and value to the mapper. This is to make sure the map() method gets called once. Mapper.setup(). Before the map method of the map task gets called, the mapper’s setup() method is called once. This method is used by developers to initialize vari‐ ables and file handles that will later get used in the map process. Very frequently the setup() method is used to retrieve values from the configuration object. Every component in Hadoop is configured via a Configuration object, which con‐ tains key-value pairs and is passed to the map and reduce JVMs when the job is exe‐ cuted. The contents of this object can be found in job.xml. By default the Configuration object contains information regarding the cluster that every JVM requires for successful execution, such as the URI of the NameNode and the process coordinating the job (e.g., the JobTracker when Hadoop is running within the Map‐ Reduce v1 framework or the Application Manager when it’s running with YARN). Values can be added to the Configuration object in the setup phase, before the map and reduce tasks are launched. After the job is executed, the mappers and reducers can access the Configuration object at any time to retrieve these values. Here is a simple example of a setup() method that gets a Configuration value to populate a member variable: public String fooBar; public final String FOO_BAR_CONF = "custom.foo.bar.conf"; Override 84 Chapter 3: Processing Data in Hadooppublic void setup(Context context) throws IOException foobar = context.getConfiguration().get(FOO_BAR_CONF); Note that anything you put in the Configuration object can be read through the Job‐ Tracker (in MapReduce v1) or Application Manager (in YARN). These processes have a web UI that is often left unsecured and readable to anyone with access to its URL, so we recommend against passing sensitive information such as passwords through the Configuration object. A better method is to pass the URI of a password file in HDFS, which can have proper access permissions. The map and reduce tasks can then read the content of the file and get the password if the user executing the Map‐ Reduce job has sufficient privileges. Mapper.map. The map() method is the heart of the mapper. Even if you decide to use the defaults and not implement any other component of the map task, you will still need to implement a map() method. This method has three inputs: key, value, and a context. The key and value are provided by the RecordReader and contain the data that the map() method should process. The context is an object that provides com‐ mon actions for a mapper: sending output to the reducer, reading values from the Configuration object, and incrementing counters to report on the progress of the map task. When the map task writes to the reducer, the data it is writing is buffered and sorted. MapReduce will attempt to sort it in memory, with the available space defined by the io.sort.mb configuration parameter. If the memory buffer is too small, not all the output data can be sorted in memory. In this case the data is spilled to the local disk of the node where the map task is running and sorted on disk. Partitioner. The partitioner implements the logic of how data is partitioned between the reducers. The default partitioner will simply take the key, hash it using a standard hash function, and divide by the number of reducers. The remainder will determine the target reducer for the record. This guarantees equal distribution of data between the reducers, which will help ensure that reducers can begin and end at the same time. But if there is any requirement for keeping certain values together for process‐ ing by the reducers, you will need to override the default and implement a custom partitioner. One such example is a secondary sort. Suppose that you have a time series—for example, stock market pricing information. You may wish to have each reducer scan all the trades of a given stock ticker ordered by the time of the trade in order to look for correlations in pricing over time. In this case you will define the key as ticker-time. The default partitioner could send records belonging to the same stock ticker to dif‐ ferent reducers, so you will also want to implement your own partitioner to make MapReduce 85sure the ticker symbol is used for partitioning the records to reducers, but the time‐ stamp is not used. Here is a simple code example of how this type of partitioner method would be implemented: public static class CustomPartitioner extends PartitionerText, Text Override public int getPartition(Text key, Text value, int numPartitions) String ticker = key.toString().substring(5); return ticker.hashCode() % numPartitions; We simply extract the ticker symbol out of the key and use only the hash of this part for partitioning instead of the entire key. Mapper.cleanup(). The cleanup() method is called after the map() method has exe‐ cuted for all records. This is a good place to close files and to do some last-minute reporting—for example, to write a message to the log with final status. Combiner. Combiners in MapReduce can provide an easy method to reduce the amount of network traffic between the mappers and reducers. Let’s look at the famous word count example. In word count, the mapper takes each input line, splits it into individual words and writes out each word with “1” after it, to indicate current count, like the following: the = 1 cat = 1 and = 1 the = 1 hat = 1 If a combine() method is defined it can aggregate the values produced by the mapper. It executes locally on the same node where the mapper executes, so this aggregation reduces the output that is later sent through the network to the reducer. The reducer will still have to aggregate the results from different mappers, but this will be over sig‐ nificantly smaller data sets. It is important to remember that you have no control on whether the combiner will execute. Therefore, the output of the combiner has to be identical in format to the output of the mapper, because the reducer will have to pro‐ cess either of them. Also note that the combiner executes after the output of the map‐ per is already sorted, so you can assume that the input of the combiner is sorted. In our example, this would be the output of a combiner: and = 1 cat = 1 86 Chapter 3: Processing Data in Hadoophat = 1 the = 2 Reducer The reduce task is not as complex as the map task, but there are a few components of which you should be aware. Shuffle . Before the reduce stage begins, the reduce tasks copy the output of the map‐ pers from the map nodes to the reduce nodes. Since each reducer will need data to aggregate data from multiple mappers, we can have each reducer just read the data locally in the same way that map tasks do. Copying data over the network is manda‐ tory, so a high-throughput network within the cluster will improve processing times significantly. This is the main reason why using a combiner can be very effective; aggregating the results of the mapper before sending them over the network will speed up this phase significantly. Reducer.setup(). The reducer setup() step is very similar to the map setup(). The method executes before the reducer starts processing individual records and is typi‐ cally used to initialize variables and file handles. Reducer.reduce(). Similar to the map() method in the mapper, thereduce() method is where the reducer does most of the data processing. There are a few significant differ‐ ences in the inputs, however: • The keys are sorted. • The value parameter has changed to values. So for one key the input will be all the values for that key, allowing you to then perform any type of aggregation and processing for all the values of the key. It is important to remember that a key and all its values will never be split across more than one reducer; this seems obvious, but often developers are surprised when one reducer takes significantly longer to finish than the rest. This is typically the result of this reducer processing a key that has significantly more values than the rest. This kind of skew in the way data is partitioned is a very common cause of performance concerns, and as a result a skilled MapReduce developer will invest significant effort in making sure data is partitioned between the reducers as evenly as possible while still aggregating the values correctly. • In the map() method, calling context.write(K,V) stores the output in a buffer that is later sorted and read by the reducer. In the reduce() method, calling con text.write(Km,V) sends the output to the outputFileFormat, which we will dis‐ cuss shortly. MapReduce 87 Reducer.cleanup(). Similar to the mapper cleanup() method, the reducer cleanup() method is called after all the records are processed. This is where you can close files and log overall status. OutputFormat. Just like the InputFormat class handled the access to input data, the OutputFormat class is responsible for formatting and writing the output data, typi‐ cally to HDFS. Custom output formats are rare compared to input formats. The main reason is that developers rarely have control over the input data, which often arrives in legacy formats from legacy systems. On the other hand, the output data can be standardized and there are several suitable output formats already available for you to use. There is always a client with a new and unique input format, but generally one of the available output formats will be suitable. In the first chapter we discussed the most common data formats available and made recommendations regarding which ones to use in specific situations. The OutputFormat class works a little bit differently than InputFormat. In the case of a large file, InputFormat would split the input to multiple map tasks, each handling a small subset of a single input file. With the OutputFormat class, a single reducer will always write a single file, so on HDFS you will get one file per reducer. The files will be named something like part-r-00000 with the numbers ascending as the task num‐ bers of the reducers ascend. It is interesting to note that if there are no reducers, the output format is called by the mapper. In that case, the files will be named part-m-0000N, replacing the r with m. This is just the common format for naming, however, and different output formats can use different variations. For example, the Avro output format uses part- m-00000.avro as its naming convention. Example for MapReduce Of all the approaches to data processing in Hadoop that will be included in this chap‐ ter, MapReduce requires the most code by far. As verbose as this example will seem, if we included every part of MapReduce here, it would easily be another 20 pages. We will look at a very simple example: joining and filtering the two data sets shown in Figure 3-4. Figure 3-4. Data sets for joining andfiltering example The data processing requirements in this example include: 88 Chapter 3: Processing Data in Hadoop • Join Foo to Bar by FooBarId and BarId. • Filter Foo and remove all records where FooVal is greater than a user-defined value,fooValueMaxFilter. • Filter the joined table and remove all records where the sum of FooVal and BarVal is greater than another user parameter,joinValueMaxFilter. • Use counters to track the number of rows we removed. MapReduce jobs always start by creating a Job instance and executing it. Here is an example of how this is done: public int run(String args) throws Exception String inputFoo = args0; String inputBar = args1; String output = args2; String fooValueMaxFilter = args3; String joinValueMaxFilter = args4; int numberOfReducers = Integer.parseInt(args5); Job job = Job.getInstance(); job.setJarByClass(JoinFilterExampleMRJob.class); job.setJobName("JoinFilterExampleMRJob"); Configuration config = job.getConfiguration(); config.set(FOO_TABLE_CONF, inputFoo); config.set(BAR_TABLE_CONF, inputBar); config.set(FOO_VAL_MAX_CONF, fooValueMaxFilter); config.set(JOIN_VAL_MAX_CONF, joinValueMaxFilter); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path(inputFoo)); TextInputFormat.addInputPath(job, new Path(inputBar)); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(output)); job.setMapperClass(JoinFilterMapper.class); job.setReducerClass(JoinFilterReducer.class); job.setPartitionerClass(JoinFilterPartitioner.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(numberOfReducers); job.waitForCompletion(true); return 0; MapReduce 89Let’s drill down into the job setup code: This is the constructor of the Job object that will hold all the information needed for the execution of our MapReduce job. While not mandatory, it is a good practice to name the job so it will be easy to find in various logs or web UIs. As we discussed, when setting up the job, you can create a Configuration object with values that will be available to all map and reduce tasks. Here we add the values that will be used for filtering the records, so they are defined by the user as arguments when the job is executed, not hardcoded into the map and reduce tasks. Here we are setting the input and output directories. There can be multiple input paths, and they can be either files or entire directories. But unless a special output format is used, there is only one output path and it must be a directory, so each reducer can create its own output file in that directory. This is where we configure the classes that will be used in this job: mapper, reducer, partitioner, and the input and output formats. In our example we only need a mapper, reducer, and partitioner. We will soon show the code used to implement each of those. Note that for the output format we use Text as the value output, but NullWritable as the key output. This is because we are only interested in the values for the final output. The keys will simply be ignored and not written to the reducer output files. While the number of mappers is controlled by the input format, we have to con‐ figure the number of reducers directly. If the number of reducers is set to 0, we would get a map-only job. The default number of reducers is defined at the clus‐ ter level, but is typically overridden by the developers of specific jobs because they are more familiar with the size of the data involved and how it is partitioned. Finally, we fire off the configured MapReduce job to the cluster and wait for its success or failure. Now let’s look at the mapper: public class JoinFilterMapper extends MapperLongWritable, Text, Text, Text boolean isFooBlock = false; int fooValFilter; public static final int FOO_ID_INX = 0; 90 Chapter 3: Processing Data in Hadooppublic static final int FOO_VALUE_INX = 1; public static final int FOO_BAR_ID_INX = 2; public static final int BAR_ID_INX = 0; public static final int BAR_VALUE_INX = 1; Text newKey = new Text(); Text newValue = new Text(); Override public void setup(Context context) Configuration config = context.getConfiguration(); fooValFilter = config.getInt(JoinFilterExampleMRJob.FOO_VAL_MAX_CONF, -1); String fooRootPath = config.get(JoinFilterExampleMRJob.FOO_TABLE_CONF); FileSplit split = (FileSplit) context.getInputSplit(); if (split.getPath().toString().contains(fooRootPath)) isFooBlock = true; Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException String cells = StringUtils.split(value.toString(), ""); if (isFooBlock) int fooValue = Integer.parseInt(cellsFOO_VALUE_INX); if (fooValue = fooValFilter) newKey.set(cellsFOO_BAR_ID_INX + "" + JoinFilterExampleMRJob.FOO_SORT_FLAG); newValue.set(cellsFOO_ID_INX + "" + cellsFOO_VALUE_INX); context.write(newKey, newValue); else context.getCounter("Custom", "FooValueFiltered").increment(1); else newKey.set(cellsBAR_ID_INX + "" + JoinFilterExampleMRJob.BAR_SORT_FLAG); newValue.set(cellsBAR_VALUE_INX); context.write(newKey, newValue); MapReduce 91 As we discussed, the mapper’s setup() method is used to read predefined values from the Configuration object. Here we are getting the fooValMax filter value that we will use later in themap() method for filtering. Each map task will read data from a file block that belongs either to the Foo or Bar data sets. We need to be able to tell which is which, so we can filter only the data from Foo tables and so we can add this information to the output key—it will be used by the reducer for joining the data sets. In this section of the code, the setup() method identifies which block we are processing in this task. Later, in the map() method, we will use this value to separate the logic for processing the Foo and Bar data sets. This is where we use the block identifier we defined earlier. And here we use thefooValMax value for filtering. The last thing to point out here is the method to increment a counter in MapRe‐ duce. A counter has a group and counter name, and both can be set and incre‐ mented by the map and reduce tasks. They are reported at the end of the job and are also tracked by the various UIs while the job is executing, so it is a good way to give users feedback on the job progress, as well as give the developers useful information for debugging and troubleshooting. Note how we set the output key: first there is the value used to join the data sets, followed by “” and then a flag marking the record with A if it arrived from the Bar data set and B if it arrived from Foo. This means that when the reducer receives a key and an array of values to join, the values from the Bar data set will appear first (since keys are sorted). To perform the join we will only have to store the Bar data set in memory until the Foo values start arriving. Without the flag to assist in sorting the values, we will need to store the entire data set in memory when joining. Now let’s look into the partitioner. We need to implement a customer partitioner because we are using a multipart key that contains the join key plus the sort flag. We need to partition only by ID, so both records with the same join key will end up in the same reducer regardless of whether they originally arrived from the data set Foo or Bar. This is essential for joining them because a single reducer will need to join all values with the same key. To do this, we need only partition on the ID and entire composite key as shown: public class JoinFilterPartitioner extends PartitionerText, Text Override public int getPartition(Text key, Text value, int numberOfReducers) 92 Chapter 3: Processing Data in HadoopString keyStr = key.toString(); String pk = keyStr.substring(0, keyStr.length() - 2); return Math.abs(pk.hashCode() % numberOfReducers); In the partitioner we get the join key out of the key in the map output and apply the partitioning method using this part of the key only, as we discussed previously. Next we’ll look at the reducer and how it joins the two data sets: public class JoinFilterReducer extends ReducerText, Text, NullWritable, Text int joinValFilter; String currentBarId = ""; ListInteger barBufferList = new ArrayListInteger(); Text newValue = new Text(); Override public void setup(Context context) Configuration config = context.getConfiguration(); joinValFilter = config.getInt(JoinFilterExampleMRJob.JOIN_VAL_MAX_CONF, -1); Override public void reduce(Text key, IterableText values, Context context) throws IOException, InterruptedException String keyString = key.toString(); String barId = keyString.substring(0, keyString.length() - 2); String sortFlag = keyString.substring(keyString.length() - 1); if (currentBarId.equals(barId)) barBufferList.clear(); currentBarId = barId; if (sortFlag.equals(JoinFilterExampleMRJob.BAR_SORT_FLAG)) for (Text value : values) barBufferList.add(Integer.parseInt(value.toString())); else if (barBufferList.size() 0) for (Text value : values) for (Integer barValue : barBufferList) String fooCells = StringUtils.split(value.toString(), ""); int fooValue = Integer.parseInt(fooCells1); MapReduce 93int sumValue = barValue + fooValue; if (sumValue joinValFilter) newValue.set(fooCells0 + "" + barId + "" + sumValue); context.write(NullWritable.get(), newValue); else context.getCounter("custom", "joinValueFiltered").increment(1); else System.out.println("Matching with nothing"); Because we used a flag to assist in sorting, we are getting all the records from the Bar data set for a given join key first. As we receive them, we store all the Bar records in a list in memory. As we process the Foo records, we’ll loop through the cached Bar records to exe‐ cute the join. This is a simple implementation of a nested-loops join. When to Use MapReduce As you can see from the example, MapReduce is a very low-level framework. The developer is responsible for very minute details of operation, and there is a significant amount of setup and boilerplate code. Because of this MapReduce code typically has more bugs and higher costs of maintenance. 1 However, there is a subset of problems, such as file compaction, distributed file-copy, or row-level data validation, which translates to MapReduce quite naturally. At other times, code written in MapReduce can take advantage of properties of the input data to improve performance—for example, if we know the input files are sorted, we can use MapReduce to optimize merging of data sets in ways that higher-level abstrac‐ tions can’t. We recommend MapReduce for experienced Java developers who are comfortable with the MapReduce programming paradigm, especially for problems that translate to MapReduce naturally or where detailed control of the execution has significant advantages. 1 We discuss compaction in more detail in Chapter 4. 94 Chapter 3: Processing Data in HadoopSpark In 2009 Matei Zaharia and his team at UC Berkeley’s AMPLab researched possible improvements to the MapReduce framework. Their conclusion was that while the MapReduce model is useful for large-scale data processing, the MapReduce frame‐ work is limited to a very rigid data flow model that is unsuitable for many applica‐ tions. For example, applications such as iterative machine learning or interactive data analysis can benefit from reusing a data set cached in memory for multiple process‐ ing tasks. MapReduce forces writing data to disk at the end of each job execution and reading it again from disk for the next. When you combine this with the fact that jobs are limited to a single map step and a single reduce step, you can see how the model can be significantly improved by a more flexible framework. Out of this reseach came Spark, a new processing framework for big data that addresses many of the shortcomings in the MapReduce model. Since its introduction, Spark has grown to be the second largest Apache top-level project (after HDFS) with 150 contributors. Spark Overview Spark is different from MapReduce in several important ways. DAG Model Looking back at MapReduce you only had two processing phases: map and/or reduce. With the MapReduce framework, it is only possible to build complete applications by stringing together sets of map and reduce tasks. These complex chains of tasks are known as directed acyclic graphs, or DAGs, illustrated in Figure 3-5. Figure 3-5. Directed acyclic graphs DAGs contain series of actions connected to each other in a workflow. In the case of MapReduce, the DAG is a series of map and reduce tasks used to implement the application. The use of DAGs to define Hadoop applications is not new—MapReduce developers implemented these, and they are used within all high-level abstractions that use MapReduce. Oozie even allows users to define these workflows of MapRe‐ duce tasks in XML and use an orchestration framework to monitor their execution. Spark 95 What Spark adds is the fact that the engine itself creates those complex chains of steps from the application’s logic, rather than the DAG being an abstraction added exter‐ nally to the model. This allows developers to express complex algorithms and data processing pipelines within the same job and allows the framework to optimize the job as a whole, leading to improved performance. For more on Spark, see the Apache Spark site. There are still relatively few texts avail‐ able on Spark, but Learning Spark by Holden Karau, et al. (O’Reilly) will provide a comprehensive introduction to Spark. For more advanced Spark usage, see the Advanced Analytics with Spark by Sandy Ryza, et al. (O’Reilly). Overview of Spark Components Before we get to the example, it is important to go over the different parts of Spark at a high level. Figure 3-6 shows the major Spark components. Figure 3-6. Spark components Let’s discuss the components in this diagram from left to right: • The driver is the code that includes the “main” function and defines the resilient distributed datasets (RDDs) and their transformations. RDDs are the main data structures we will use in our Spark programs, and will be discussed in more detail in the next section. • Parallel operations on the RDDs are sent to the DAG scheduler, which will opti‐ mize the code and arrive at an efficient DAG that represents the data processing steps in the application. • The resulting DAG is sent to the cluster manager. The cluster manager has infor‐ mation about the workers, assigned threads, and location of data blocks and is 96 Chapter 3: Processing Data in Hadoopresponsible for assigning specific processing tasks to workers. The cluster man‐ ager is also the service that handles DAG play-back in the case of worker failure. As we explained earlier, the cluster manager can be YARN, Mesos, or Spark’s cluster manager. • The worker receives units of work and data to manage. The worker executes its specific task without knowledge of the entire DAG, and its results are sent back to the driver application. Basic Spark Concepts Before we go into the code for our filter-join-filter example, let’s talk about the main components of writing applications in Spark. Resilient Distributed Datasets RDDs are collections of serializable elements, and such a collection may be parti‐ tioned, in which case it is stored on multiple nodes. An RDD may reside in memory or on disk. Spark uses RDDs to reduce I/O and maintain the processed data set in memory, while still tolerating node failures without having to restart the entire job. RDDs are typically created from a Hadoop input format (a file on HDFS, for exam‐ ple), or from transformations applied on existing RDDs. When creating an RDD from an input format, Spark determines the number of partitions by the input for‐ mat, very similar to the way splits are determined in MapReduce jobs. When RDDs are transformed, it is possible to shuffle the data and repartition it to any number of partitions. RDDs store their lineage—the set of transformations that was used to create the cur‐ rent state, starting from the first input format that was used to create the RDD. If the data is lost, Spark will replay the lineage to rebuild the lost RDDs so the job can con‐ tinue. Figure 3-7 is a common image used to illustrate a DAG in spark. The inner boxes are RDD partitions; the next layer is an RDD and single chained operation. Spark 97Figure 3-7. Spark DAG Now let’s say we lose the partition denoted by the black box in Figure 3-8. Spark would replay the “Good Replay” boxes and the “Lost Block” boxes to get the data needed to execute the final step. Figure 3-8. Spark DAGafter a lost partition Note that there are multiple types of RDDs, and not all transformations are possible on every RDD. For example, you can’t join an RDD that doesn’t contain a key-value pair. 98 Chapter 3: Processing Data in Hadoop

Advise: Why You Wasting Money in Costly SEO Tools, Use World's Best Free SEO Tool Ubersuggest.