How does Hadoop process large volumes of Data

How to Process unstructured data in Hadoop and how to process xml data in hadoop and installation process of hadoop and hadoop replication process
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Comment
Common Hadoop Processing Patterns With an understanding of how to access and process data on Hadoop, we’d like to move on to discuss how to solve some fairly common problems in Hadoop using some of the tools we discussed in Chapter 3. We’ll cover the following data processing tasks, which in addition to being common patterns in processing data on Hadoop, also have a fairly high degree of complexity in implementation: • Removing duplicate records by primary key (compaction) • Using windowing analytics • Updating time series data We’ll go into more detail on these patterns next, and take a deep dive into how they’re implemented. We’ll present implementations of these patterns in both Spark and SQL (for Impala and Hive). You’ll note that we’re not including implementations in Map‐ Reduce; this is because of the size and complexity of the code in MapReduce, as well as the move toward newer processing frameworks such as Spark and abstractions such as SQL. Pattern: Removing Duplicate Records by Primary Key Duplicate records are a common occurrence when you are working with data in Hadoop for two primary reasons: Resends during data ingest As we’ve discussed elsewhere in the book, it’s difficult to ensure that records are sent exactly once, and it’s not uncommon to have to deal with duplicate records during ingest processing. 135Deltas (updated records) HDFS is a “write once and read many” filesystem. Making modifications at a record level is not a simple thing to do. In the use case of deltas we would have an existing data set with a primary key (or composite key), and we will have updated records being added to that data set. We cover methods for dealing with the first case, fully duplicate records, elsewhere in this book—for example, in the clickstream case study in Chapter 8—so we’ll discuss the second case, record updates, in this example. This will require implementing pro‐ cessing to rewrite an existing data set so that it only shows the newest versions of each record. If you’re familiar with HBase, you might have noted that this is similar to the way HBase works; at a high level, a region in HBase has an HFile that has values linked to a key. When new data is added, a second HFile is added with keys and values. During cleanup activities called compactions, HBase does a merge join to execute this dedu‐ plication pattern, as shown in Figure 4-1. Figure 4-1. HBase compaction Note that we’re omitting some additional complexity, such as HBase’s support for ver‐ sioning, in the preceding example. Data Generation for Deduplication Example Before we get into examples of implementing this pattern, let’s first look at some code to create test data. We are going to use the Scala object GenDedupInput, which uses the HDFS API to create a file on HDFS and write out records in the following format: PrimaryKey,timeStamp,value We’ll write x records and y unique primary keys. This means if we set x to 100 and y to 10, we will get something close to 10 duplicate records for every primary key as seen in this example: object GenDedupInput def main(args:ArrayString): Unit = 136 Chapter 4: Common Hadoop Processing Patternsif (args.length 3) println("outputPath numberOfRecords numberOfUniqueRecords") return // The output file that will hold the data val outputPath = new Path(args(0)) // Number of records to be written to the file val numberOfRecords = args(1).toLong // Number of unique primary keys val numberOfUniqueRecords = args(2).toInt // Open fileSystem to HDFS val fileSystem = FileSystem.get(new Configuration()) // Create buffered writer val writer = new BufferedWriter( new OutputStreamWriter(fileSystem.create(outputPath))) val r = new Random() // This loop will write out all the records // Every primary key will get about // numberOfRecords/numberOfUniqueRecords records for (i - 0 until numberOfRecords) val uniqueId = r.nextInt(numberOfUniqueRecords) // Format: key, timeStamp, value writer.write(uniqueId + "," + i + "," + r.nextInt(10000)) writer.newLine() writer.close() Code Example: Spark Deduplication in Scala Now that we’ve created our test data in HDFS, let’s look at the code to deduplicate these records in theSparkDedupExecution object: object SparkDedupExecution def main(args:ArrayString): Unit = if (args.length 2) println("inputPath outputPath") return // set up given parameters val inputPath = args(0) val outputPath = args(1) // set up spark conf and connection Pattern: Removing Duplicate Records by Primary Key 137val sparkConf = new SparkConf().setAppName("SparkDedupExecution") sparkConf.set("spark.cleaner.ttl", "120000"); val sc = new SparkContext(sparkConf) // Read data in from HDFS val dedupOriginalDataRDD = sc.hadoopFile(inputPath, classOfTextInputFormat, classOfLongWritable, classOfText, 1) // Get the data in a key-value format val keyValueRDD = dedupOriginalDataRDD.map(t = val splits = t._2.toString.split(",") (splits(0), (splits(1), splits(2)))) // reduce by key so we will only get one record for every primary key val reducedRDD = keyValueRDD.reduceByKey((a,b) = if (a._1.compareTo(b._1) 0) a else b) // Format the data to a human-readable format and write it back out to HDFS reducedRDD .map(r = r._1 + "," + r._2._1 + "," + r._2._2) .saveAsTextFile(outputPath) Let’s break this code down to discuss what’s going on. We’ll skip the setup code, which just gets the user arguments and sets up the SparkContext, and skip to the following code that will get our duplicate record data from HDFS: val dedupOriginalDataRDD = sc.hadoopFile(inputPath, classOfTextInputFormat, classOfLongWritable, classOfText, 1) There are many ways to read data in Spark, but for this example we’ll use the hadoop File() method so we can show how the existing input formats can be used. If you have done much MapReduce programing, you will be familiar with theTextInputFor mat, which is one of the most basic input formats available. The TextInputFormat provides functionality that will allow Spark or MapReduce jobs to break up a direc‐ tory into files, which are then broken up into blocks to be processed by different tasks. The next item of code is the firstmap() function: val keyValueRDD = dedupOriginalDataRDD.map(t = val splits = t._2.toString.split(",") (splits(0), (splits(1), splits(2)))) 138 Chapter 4: Common Hadoop Processing Patterns This code will run in parallel across different workers and parse the incoming records into aTuple object that has two values representing a key and a value. This key-value structure is required for the next piece of code, which will use the reduceByKey() method. As you might guess by the name, in order to use the reduce ByKey() method we need a key. Now let’s look at the code that callsreduceByKey(): val reducedRDD = keyValueRDD.reduceByKey((a,b) = if (a._1.compareTo(b._1) 0) a else b) The reduceByKey() method takes a function that takes a left and right value and returns a value of the same type. The goal of reduceByKey() is to combine all values of the same key. In the word count example, it is used to add all the counts of a single word to get the total count. In our example, thea andb are strings, and we will return a or b depending on which is greater. Since the key we’re reducing by is the primary key, this function will make sure that we only have one record per primary key— hence, deduplicating the data based on the greatest primary key-value. The last bit of code will just write the results back to HDFS: reducedRDD .map(r = r._1 + "," + r._2._1 + "," + r._2._2) .saveAsTextFile(outputPath) We will get a text output file for every partition in Spark, similar to the way MapRe‐ duce will output a file for each mapper or reducer at the end of a MapReduce job. Code Example: Deduplication in SQL Now we’ll turn to the venerable SQL—well, more accurately, HiveQL, although the examples in this chapter will work with either Hive or Impala. First, we need to put our test data into a table using this data definition language (DDL) query: CREATE EXTERNAL TABLE COMPACTION_TABLE ( PRIMARY_KEY STRING, TIME_STAMP BIGINT, EVENT_VALUE STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'compaction_data'; Now that we have a table, let’s look at the query to perform the deduplication: SELECT A.PRIMARY_KEY, A.TIME_STAMP, MAX(A.EVENT_VALUE) FROM COMPACTION_TABLE A JOIN ( Pattern: Removing Duplicate Records by Primary Key 139SELECT PRIMARY_KEY AS P_KEY, MAX(TIME_STAMP) as TIME_SP FROM COMPACTION_TABLE GROUP BY PRIMARY_KEY ) B WHERE A.PRIMARY_KEY = B.P_KEY AND A.TIME_STAMP = B.TIME_SP GROUP BY A.PRIMARY_KEY, A.TIME_STAMP Here we have a two-level-deep SQL query. The deepest SELECT is getting the latest TIME_STAMP for all the PRIMARY_KEY records. The outer SELECT statement is taking the results from the inner SELECT statement to pull out the latest EVENT_VALUE. Also note that we apply a MAX() function to the EVENT_VALUE value; this is because we only want a single value, so if we have two EVENT_VALUEs with the same timestamp we’ll select the one with the greatest value to keep for our new record. Pattern: Windowing Analysis Windowing functions provide the ability to scan over an ordered sequence of events over some window—for example, a specific slice of time. This pattern is very powerful and is useful in many industries: • It can be used in finance to gain a better understanding of changing security prices. • In sensor monitoring, it’s useful in predicting failure from abnormalities in readings. • It can be used in churn analysis for trying to predict if a customer will leave you based on behavior patterns. • In gaming, it can help to identify trends as users progress from novice to expert. To illustrate, we’ll use an example that relates to the finance use case: finding peaks and valleys in equity prices in order to provide some insight into price changes. A peak is a record that has a lower price before it and a lower price after it, while a valley is just the opposite, with higher prices on both sides, as shown in Figure 4-2. 140 Chapter 4: Common Hadoop Processing PatternsFigure 4-2. Peaks and valleys in stock prices over time To implement this example we’ll need to maintain a window of stock prices in order to determine where the peaks and valleys occur. Note that a simple example like this makes it possible for us to show the solution in both SQL and Spark. As windowing analysis gets more complex, SQL becomes a less suitable solution. Data Generation for Windowing Analysis Example Let’s create some test data containing records with a value that goes up and down, similar to stock prices. The following code example takes the same input parameters as our last data generation tool—numberOfRecords and numberOfUniqueIds— although the resulting records will be somewhat different: Primary key An identifier for each sequence of events we are analyzing—for example, a stock ticker symbol. This will be based on thenumberOfUniqueIds input parameter. Incrementing counter This will be unique for every record in the generated data. Event value This will have a value that increases and decreases for a random set of records for a given primary key. Let’s take a look at the code to generate this test data: def main(args: ArrayString): Unit = if (args.length == 0) println("outputPath numberOfRecords numberOfUniqueIds") return Pattern: Windowing Analysis 141val outputPath = new Path(args(0)) val numberOfRecords = args(1).toInt val numberOfUniqueIds = args(2).toInt val fileSystem = FileSystem.get(new Configuration()) val writer = new BufferedWriter( new OutputStreamWriter(fileSystem.create(outputPath))) val r = new Random() var direction = 1 var directionCounter = r.nextInt(numberOfUniqueIds 10) var currentPointer = 0 for (i - 0 until numberOfRecords) val uniqueId = r.nextInt(numberOfUniqueIds) currentPointer = currentPointer + direction directionCounter = directionCounter - 1 if (directionCounter == 0) var directionCounter = r.nextInt(numberOfUniqueIds 10) direction = direction -1 writer.write(uniqueId + "," + i + "," + currentPointer) writer.newLine() writer.close() Code Example: Peaks and Valleys in Spark Now, let’s look at the code to implement this pattern in Spark. There’s quite a bit going on in the following code example, so after we present the code we’ll drill down further to help you to understand what’s going on. You’ll find this code in the SparkPeaksAndValleysExecution.scala file: object SparkPeaksAndValleysExecution def main(args: ArrayString): Unit = if (args.length == 0) println("inputPath outputPath numberOfPartitions") return val inputPath = args(0) val outputPath = args(1) val numberOfPartitions = args(2).toInt 142 Chapter 4: Common Hadoop Processing Patternsval sparkConf = new SparkConf().setAppName("SparkTimeSeriesExecution") sparkConf.set("spark.cleaner.ttl", "120000"); val sc = new SparkContext(sparkConf) // Read in the data var originalDataRDD = sc.hadoopFile(inputPath, classOfTextInputFormat, classOfLongWritable, classOfText, 1).map(r = val splits = r._2.toString.split(",") (new DataKey(splits(0), splits(1).toLong), splits(2).toInt) ) // Partitioner to partition by primaryKey only val partitioner = new Partitioner override def numPartitions: Int = numberOfPartitions override def getPartition(key: Any): Int = Math.abs(key.asInstanceOfDataKey.uniqueId.hashCode() % numPartitions) // Partition and sort val partedSortedRDD = new ShuffledRDDDataKey, Int, Int( originalDataRDD, partitioner).setKeyOrdering(implicitlyOrderingDataKey) // MapPartition to do windowing val pivotPointRDD = partedSortedRDD.mapPartitions(it = val results = new mutable.MutableListPivotPoint // Keeping context var lastUniqueId = "foobar" var lastRecord: (DataKey, Int) = null var lastLastRecord: (DataKey, Int) = null var position = 0 it.foreach( r = position = position + 1 if (lastUniqueId.equals(r._1.uniqueId)) lastRecord = null lastLastRecord = null // Finding peaks and valleys Pattern: Windowing Analysis 143if (lastRecord = null && lastLastRecord = null) if (lastRecord._2 r._2 && lastRecord._2 lastLastRecord._2) results.+=(new PivotPoint(r._1.uniqueId, position, lastRecord._1.eventTime, lastRecord._2, false)) else if (lastRecord._2 r._2 && lastRecord._2 lastLastRecord._2) results.+=(new PivotPoint(r._1.uniqueId, position, lastRecord._1.eventTime, lastRecord._2, true)) lastUniqueId = r._1.uniqueId lastLastRecord = lastRecord lastRecord = r ) results.iterator ) // Format output pivotPointRDD.map(r = val pivotType = if (r.isPeak) "peak" else "valley" r.uniqueId + "," + r.position + "," + r.eventTime + "," + r.eventValue + "," + pivotType ).saveAsTextFile(outputPath) class DataKey(val uniqueId:String, val eventTime:Long) extends Serializable with ComparableDataKey override def compareTo(other:DataKey): Int = val compare1 = uniqueId.compareTo(other.uniqueId) if (compare1 == 0) eventTime.compareTo(other.eventTime) else compare1 class PivotPoint(val uniqueId: String, val position:Int, val eventTime:Long, 144 Chapter 4: Common Hadoop Processing Patternsval eventValue:Int, val isPeak:boolean) extends Serializable Nothing too interesting here: we’re simply reading the input data and parsing it into easy-to-consume objects. This is where things get interesting. We’re defining a partition here, just like defining a custom partitioner with MapReduce. A partition will help us to decide which records go to which worker after the shuffle process. We need a custom partitioner here because we have a two-part key: primary_key and position. We want to sort by both, but we only want to partition by the primary_key so we get output like that shown in Figure 4-3. This is the shuffle action that will partition and sort the data for us. Note that the 1.3 release of Spark provides a transformation calledrepartitionAndSortWithin Partitions(), which would provide this functionality for us, but since this is coded with Spark 1.2 we need to manually implement the shuffle. This mapPartition() method will allow us to run through the primary_key in the order of the position. This is where the windowing will happen. This is context information we need in order to find peaks and valleys and to know if we have changed primary_keys. Remember, to find a peak or a valley we will need to know of the record before and the one after. So we will have the currentRow, lastRow, and lastLastRow, and we can determine if the lastRow is a peak or valley by comparing it against the others. Perform comparisons to determine if we’re in a peak or in a valley. And finally, this is the code that will format the records and write them to HDFS. Figure 4-3. Partitioning in the peaks and valleys example—here we partition the sequen‐ces into two groups, so we can distribute the analysis on two workers while still keeping all events for each sequence together Pattern: Windowing Analysis 145Code Example: Peaks and Valleys in SQL As in the previous example, we’ll first create a table over our test data: CREATE EXTERNAL TABLE PEAK_AND_VALLEY_TABLE ( PRIMARY_KEY STRING, POSITION BIGINT, EVENT_VALUE BIGINT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'PeakAndValleyData'; Now that we have our table we need to order the records and then use thelead() and lag() functions, which will provide the context of the surrounding records: SELECT PRIMARY_KEY, POSITION, EVENT_VALUE, CASE WHEN LEAD_EVENT_VALUE is null OR LAG_EVENT_VALUE is null THEN 'EDGE' WHEN EVENT_VALUE LEAD_EVENT_VALUE AND EVENT_VALUE LAG_EVENT_VALUE THEN 'VALLEY' WHEN EVENT_VALUE LEAD_EVENT_VALUE AND EVENT_VALUE LAG_EVENT_VALUE THEN 'PEAK' ELSE 'SLOPE' END AS POINT_TYPE FROM ( SELECT PRIMARY_KEY, POSITION, EVENT_VALUE, LEAD(EVENT_VALUE,1,null) OVER (PARTITION BY PRIMARY_KEY ORDER BY POSITION) AS LEAD_EVENT_VALUE, LAG(EVENT_VALUE,1,null) OVER (PARTITION BY PRIMARY_KEY ORDER BY POSITION) AS LAG_EVENT_VALUE FROM PEAK_AND_VALLEY_TABLE ) A Although this SQL is not overly complex, it is big enough that we should break it down to explain what’s going on: After execution of the subquery in step 2, we have the data organized in such a way that all the information we need is in the same record and we can now use that record to determine if we have one of the following: an edge, a point on the leftmost or rightmost part of the window timeline; a peak, a value that has a 146 Chapter 4: Common Hadoop Processing Patterns lower value before and after it; a valley, a value that has a higher value before and after it; or a slope point, a value that has a lower value either before or after it and a higher value either before or after it. The subquery is where we’re doing all the windowing logic. This query is putting the values that appear before and after the current value in the same row. Figure 4-4 shows an example of the input and output of this subquery. Figure 4-4. Input and output of the preceding subquery: thefirst step groups the events for each sequence together and sorts them by the order the events occurred; the second step adds to each event the values from the previous and following event, which is then used by the main query to detect peaks and valleys Windowing and SQL Even in this example the SQL code is more concise than Spark, and could be considered simpler. At the same time we need to consider the limitations; doing multiple complex windowing operations in SQL will mean an increase in complexity that will need to read and write to disk, which will increase I/O and lead to a corresponding decrease in performance. With Spark we only need to order the data once and then do N windowing operations on it, using the functionality provided by Java or Scala to hold information in memory and perform operations. Pattern: Time SeriesModifications For our final example we are going to mix components from the first two examples. We will update records based on a primary key while also keeping all of the history. Pattern: Time SeriesModifications 147 This will allow a record to know when it was current and when it expired, providing information about an entity and a start and stop time, as shown in Figure 4-5. Figure 4-5.The price of Apple stock over time The start and stop times will mark the time range over which that record represented the truth. If the stop time is null, this means that the record is the current truth for that entity. In the preceding example, we can see the current price of Apple stock is 42. The problem comes in when we need to update this table. As shown in Figure 4-6, when we add a new record of price 43, we need to close out the previous record with a new end time that equals the new record’s start time. The darker cells are cells that need to be updated. Figure 4-6. Adding a new record to Apple stock price table At face value this looks like a very simple problem, but in fact, can be complex to implement when you are dealing with updates in large data sets. We’ll discuss some of the challenges and approaches to address them in the following discussion. Use HBase and Versioning One common way of storing this information is as versions in HBase. HBase has a way to store every change you make to a record as versions. Versions are defined at a 148 Chapter 4: Common Hadoop Processing Patterns column level and ordered by modification timestamp, so you can go to any point in time to see what the record looked like. The advantage to this approach is that modifications are very fast, as is getting the latest values. There is, however, some penalty in getting historical versions and a major disadvantage in performing large scans or block cache reads. Large scans will suffer because they have to pass over all the versions of one record before reaching the next record. So the larger your version count, the slower your large scans will be. Block cache reads are disadvantageous because when retrieving a record HBase pulls a whole 64 KB HFile block into memory, and if your history is large, it may be pulling into memory other versions instead of the actual record you want. Lastly, this data model does have the start and stop time in the same record. You need to look at a couple of versions to figure out when a version started and ended. Use HBase with a RowKey of RecordKey and StartTime In order to have HBase include the start and stop time in the same record, we have to create a composite key of RecordKey and StartTime. Now the difference from the versioning solution is that the record will have a column for stop time. Note that start time in the RowKey will need to be a reverse epoch number so that it can be sorted from newest to oldest. So, to modify such a schema when a new version of a record is added, we will need to first do a single-record scan with the RecordKey as the start RowKey. This will return the most current version of the RecordKey. When we have that information we need to do two puts: one to update the stop time in the current record, and the other to add a new current record. When we want to get a version for any given time, we just do a single-row scan of RecordKey and the reverse epoch of the time for the record we wish to fetch. This solution still has the large scan and block cache problems, but now we can get a version quickly and have the stop time in the same row. Just remember this comes at the cost of an additionalget andput upon insertion of the record. Use HDFS and Rewrite the Whole Table If we remove HBase from the picture and just do the simplest HDFS implementation, we would have all the data in HDFS and we’d refresh the table as we get new data on some periodic basis—for example, once daily. Pattern: Time SeriesModifications 149 This solution might seem very expensive, but with Hadoop we can re-write terabytes of data in a short period of time. As data sets get larger, there are techniques we can use to optimize the execution—for example, separate partitions for the most current records. We’ll discuss that solution next. Use Partitions on HDFS for Current and Historical Records A smarter implementation on HDFS is to put the most current records in one parti‐ tion and the historic records in another partition. This would allow you to rewrite just the latest version, not all the versions in history. Then we only append the new records to the older record’s partition. The big win here is that the execution time is fixed to the number of current records as opposed to getting longer and longer with every version of history added. We’ll provide an example of this technique next, again using Spark and SQL. For pur‐ poses of illustration, we’ll use a simple, contrived data set composed of a unique ID, event time, and a random integer, but it should be easy to imagine how this technique could be extended to real-world, time-based data, such as the stock ticker example described earlier. Data Generation for Time Series Example Let’s create our data set for our time series example, again using Scala and the HDFS FileSystem API: def main(args: ArrayString): Unit = if (args.length == 0) println("outputPath numberOfRecords numberOfUniqueRecords startTime") return val outputPath = new Path(args(0)) val numberOfRecords = args(1).toInt val numberOfUniqueRecords = args(2).toInt val startTime = args(3).toInt val fileSystem = FileSystem.get(new Configuration()) val writer = new BufferedWriter(new OutputStreamWriter(fileSystem.create(outputPath))) val r = new Random for (i - 0 until numberOfRecords) val uniqueId = r.nextInt(numberOfUniqueRecords) val madeUpValue = r.nextInt(1000) val eventTime = i + startTime 150 Chapter 4: Common Hadoop Processing Patternswriter.write(uniqueId + "," + eventTime + "," + madeUpValue) writer.newLine() writer.close() Looking at this data generator, you will note it is very close in design to the previous examples of data generation. Code Example: Time Series in Spark Now, we move on to our Spark code implementation to update our single partition of time series data with the updated start and stop times. You can find this code in the GitHub in the SparkTimeSeriesExecution Scala object. This is the largest example we have in this chapter, but we’ll walk you through the code to explain what’s going on. object SparkTimeSeriesExecution def main(args: ArrayString): Unit = if (args.length == 0) println("newDataInputPath " + "outputPath " + "numberOfPartitions") println("or") println("newDataInputPath " + "existingTimeSeriesDataInputPath " + "outputPath " + "numberOfPartitions") return val newDataInputPath = args(0) val existingTimeSeriesDataInputPath = if (args.length == 4) args(1) else null val outputPath = args(args.length - 2) val numberOfPartitions = args(args.length - 1).toInt val sparkConf = new SparkConf().setAppName("SparkTimeSeriesExecution") sparkConf.set("spark.cleaner.ttl", "120000"); val sc = new SparkContext(sparkConf) // Load data from HDFS var unendedRecordsRDD = sc.hadoopFile(newDataInputPath, classOfTextInputFormat, classOfLongWritable, classOfText, 1).map(r = val splits = r._2.toString.split(",") (new TimeDataKey(splits(0), splits(1).toLong), new TimeDataValue(-1, splits(2))) Pattern: Time SeriesModifications 151) var endedRecordsRDD:RDD(TimeDataKey, TimeDataValue) = null // Get existing records if they exist if (existingTimeSeriesDataInputPath = null) val existingDataRDD = sc.hadoopFile(existingTimeSeriesDataInputPath, classOfTextInputFormat, classOfLongWritable, classOfText, 1).map(r = val splits = r._2.toString.split(",") (new TimeDataKey(splits(0), splits(1).toLong), new TimeDataValue(splits(2).toLong, splits(3))) ) unendedRecordsRDD = unendedRecordsRDD .union(existingDataRDD.filter(r = r._2.endTime == -1)) endedRecordsRDD = existingDataRDD.filter(r = r._2.endTime -1) // Define our partitioner val partitioner = new Partitioner override def numPartitions: Int = numberOfPartitions override def getPartition(key: Any): Int = Math.abs( key.asInstanceOfTimeDataKey.uniqueId.hashCode() % numPartitions) val partedSortedRDD = new ShuffledRDDTimeDataKey, TimeDataValue, TimeDataValue( unendedRecordsRDD, partitioner).setKeyOrdering(implicitlyOrderingTimeDataKey) // Walk down each primaryKey to make sure the stop times are updated var updatedEndedRecords = partedSortedRDD.mapPartitions(it = val results = new mutable.MutableList(TimeDataKey, TimeDataValue) var lastUniqueId = "foobar" var lastRecord: (TimeDataKey, TimeDataValue) = null it.foreach(r = if (r._1.uniqueId.equals(lastUniqueId)) if (lastRecord = null) results.+=(lastRecord) lastUniqueId = r._1.uniqueId lastRecord = null 152 Chapter 4: Common Hadoop Processing Patterns else if (lastRecord = null) lastRecord._2.endTime = r._1.startTime results.+=(lastRecord) lastRecord = r ) if (lastRecord = null) results.+=(lastRecord) results.iterator ) // If there were existing records union them back in if (endedRecordsRDD = null) updatedEndedRecords = updatedEndedRecords.union(endedRecordsRDD) // Format and save the results to HDFS updatedEndedRecords .map(r = r._1.uniqueId + "," + r._1.startTime + "," + r._2.endTime + "," + r._2.data) .saveAsTextFile(outputPath) class TimeDataKey(val uniqueId:String, val startTime:Long) extends Serializable with ComparableTimeDataKey override def compareTo(other:TimeDataKey): Int = val compare1 = uniqueId.compareTo(other.uniqueId) if (compare1 == 0) startTime.compareTo(other.startTime) else compare1 class TimeDataValue(var endTime:Long, val data:String) extends Serializable As in previous code examples, here we are just reading in the new data from HDFS. Unlike previous examples in this chapter, in this case we have the possibility of two inputs: the new data and the data from the existing table in HDFS. We’re making the existing data set optional here, because the first time we add records we obviously won’t have an existing data set. Note that we will filter out any Pattern: Time SeriesModifications 153 records that already have endTimes. This is because we don’t need them to go through the shuffle code and be transferred over the network and sorted. We will union these values back in later. Just as in the peak and valley example, we are going to need a custom partition and shuffle. This will be a common pattern we’ll use whenever we need to tra‐ verse over a data set in order by a given key. The partitioning here is similar to previous examples: we want to partition on the primaryKey and sort on the com‐ bination of theprimaryKey and thestartTime. This is the code that will traverse each primaryKey and update records that need new stop times. Here is where we union in the existing records that already had endTimes with a union() method. And finally, this is where we write out the formatted results to HDFS. Code Example: Time Series in SQL As before, we need to first set up our source tables for Hive or Impala. In this exam‐ ple we are going to have two tables: one for the existing time series data and one for the new data: CREATE EXTERNAL TABLE EXISTING_TIME_SERIES_TABLE ( PRIMARY_KEY STRING, EFFECTIVE_DT BIGINT, EXPIRED_DT BIGINT, EVENT_VALUE STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'ExistingTimeSeriesData'; CREATE EXTERNAL TABLE NEW_TIME_SERIES_TABLE ( PRIMARY_KEY STRING, EFFECTIVE_DT BIGINT, EVENT_VALUE STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'NewTimeSeriesData'; The two tables are very close except the new records don’t have an expired date. 154 Chapter 4: Common Hadoop Processing Patterns

Advise: Why You Wasting Money in Costly SEO Tools, Use World's Best Free SEO Tool Ubersuggest.