How to Transfer data from oracle to Hadoop

how to transfer data from mysql to hadoop and how to transfer data from mainframe to hadoop and how to transfer data from teradata to hadoop
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Comment
Data Movement Now that we’ve discussed considerations around storing and modeling data in Hadoop, we’ll move to the equally important subject of moving data between external systems and Hadoop. This includes ingesting data into Hadoop from systems such as relational databases or logs, and extracting data from Hadoop for ingestion into external systems. We’ll spend a good part of this chapter talking about considerations and best practices around data ingestion into Hadoop, and then dive more deeply into specific tools for data ingestion, such as Flume and Sqoop. We’ll then discuss considerations and recommendations for extracting data from Hadoop. Data Ingestion Considerations Just as important as decisions around how to store data in Hadoop, which we dis‐ cussed in Chapter 1, are the architectural decisions on getting that data into your Hadoop cluster. Although Hadoop provides a filesystem client that makes it easy to copy files in and out of Hadoop, most applications implemented on Hadoop involve ingestion of disparate data types from multiple sources and with differing require‐ ments for frequency of ingestion. Common data sources for Hadoop include: • Traditional data management systems such as relational databases and main‐ frames • Logs, machine-generated data, and other forms of event data • Files being imported from existing enterprise data storage systems There are a number of factors to take into consideration when you’re importing data into Hadoop from these different systems. As organizations move more and more data into Hadoop, these decisions will become even more critical. Here are the con‐ siderations we’ll cover in this chapter: 39Timeliness of data ingestion and accessibility What are the requirements around how often data needs to be ingested? How soon does data need to be available to downstream processing? Incremental updates How will new data be added? Does it need to be appended to existing data? Or overwrite existing data? Data access and processing Will the data be used in processing? If so, will it be used in batch processing jobs? Or is random access to the data required? Source system and data structure Where is the data coming from? A relational database? Logs? Is it structured, semistructured, or unstructured data? Partitioning and splitting of data How should data be partitioned after ingest? Does the data need to be ingested into multiple target systems (e.g., HDFS and HBase)? Storage format What format will the data be stored in? Data transformation Does the data need to be transformed in flight? We’ll start by talking about the first consideration: the timeliness requirements for the data being ingested. Timeliness of Data Ingestion When we talk about timeliness of data ingestion, we’re referring to the time lag from when data is available for ingestion to when it’s accessible to tools in the Hadoop eco‐ system. The time classification of an ingestion architecture will have a large impact on the storage medium and on the method of ingestion. For purposes of this discus‐ sion we’re not concerned with streaming processing or analytics, which we’ll discuss separately in Chapter 7, but only with when the data is stored and available for pro‐ cessing in Hadoop. In general, it’s recommended to use one of the following classifications before design‐ ing the ingestion architecture for an application: Macro batch This is normally anything over 15 minutes to hours, or even a daily job. 40 Chapter 2: Data MovementMicrobatch This is normally something that is fired off every 2 minutes or so, but no more than 15 minutes in total. Near-Real-Time Decision Support This is considered to be “immediately actionable” by the recipient of the informa‐ tion, with data delivered in less than 2 minutes but greater than 2 seconds. Near-Real-Time Event Processing This is considered under 2 seconds, and can be as fast as a 100-millisecond range. Real Time This term tends to be very overused, and has many definitions. For purposes of this discussion, it will be anything under 100 milliseconds. It’s important to note that as the implementation moves toward real time, the com‐ plexity and cost of the implementation increases substantially. Starting off at batch (e.g., using simple file transfers) is always a good idea; start simple before moving to more complex ingestion methods. With more lenient timeliness requirements, HDFS will likely be the preferred source as the primary storage location, and a simple file transfer or Sqoop jobs will often be suitable tools to ingest data. We’ll talk more about these options later, but these inges‐ tion methods are well suited for batch ingestion because of their simple implementa‐ tions and the validation checks that they provide out of the box. For example, the hadoop fs –put command will copy a file over and do a full checksum to confirm that the data is copied over correctly. One consideration when using the hdfs fs -put command or Sqoop is that the data will land on HDFS in a format that might not be optimal for long-term storage and processing, so using these tools might require an additional batch process to get the data into the desired format. An example of where such an additional batch process would be required is loading Gzip files into HDFS. Although Gzip files can easily be stored in HDFS and processed with MapReduce or other processing frameworks on Hadoop, as we discussed in the previous chapter Gzip files are not splittable. This will greatly impact the efficiency of processing these files, particularly as the files get larger. In this case a good solution is to store the files in Hadoop using a container format that supports splittable compression, such as SequenceFiles or Avro. As your requirements move from batch processing to more frequent updates, you should consider tools like Flume or Kafka. Sqoop and file transfers are not going to be a good selection, as the delivery requirements get shorter than two minutes. Fur‐ ther, as the requirements become shorter than two minutes, the storage layer may need to change to HBase or Solr for more granular insertions and read operations. As the requirements move toward the real-time category, we need to think about mem‐ ory first and permanent storage second. All of the parallelism in the world isn’t going Data Ingestion Considerations 41 to help for response requirements under 500 milliseconds as long as hard drives remain in the process. At this point, we start to enter the realm of stream processing, with tools like Storm or Spark Streaming. It should be emphasized that these tools are actually focused on data processing, as opposed to data ingest like Flume or Sqoop. Again, we’ll talk more about near-real-time streaming with tools like Storm and Spark Streaming in Chapter 7. Flume and Kafka will be discussed further in this chapter. Incremental Updates This decision point focuses on whether the new data is data that will append an exist‐ ing data set or modify it. If the requirements are for append only—for example, when you’re logging user activity on a website—then HDFS will work well for the vast majority of implementations. HDFS has high read and write rates because of its abil‐ ity to parallelize I/O to multiple drives. The downside to HDFS is the inability to do appends or random writes to files after they’re created. Within the requirements of an “append only” use case, this is not an issue. In this case it’s possible to “append” data by adding it as new files or partitions, which will enable MapReduce and other pro‐ cessing jobs to access the new data along with the old data. Note that Chapter 1 provides a full discussion of partitioning and organizing data stored in HDFS. A very important thing to note when you’re appending to directories with additional files is that HDFS is optimized for large files. If the requirements call for a two- minute append process that ends up producing lots of small files, then a periodic pro‐ cess to combine smaller files will be required to get the benefits from larger files. There are a number of reasons to prefer large files, one of the major reasons being how data is read from disk. Using a long consecutive scan to read a single file is faster than performing many seeks to read the same information from multiple files. We briefly discuss methods to manage small files later in this chap‐ ter, but a full discussion is beyond the scope of this book. See Hadoop: The Definitive Guide or Hadoop in Practice for detailed discussions on techniques to manage small files. If the requirements also include modifying existing data, then additional work is required to land the data properly in HDFS. HDFS is read only—you can’t update records in place as you would with a relational database. In this case we first write a “delta” file that includes the changes that need to be made to the existing file. A com‐ 42 Chapter 2: Data Movement paction job is required to handle the modifications. In a compaction job, the data is sorted by a primary key. If the row is found twice, then the data from the newer delta file is kept and the data from the older file is not. The results of the compaction pro‐ cess are written to disk, and when the process is complete the resulting compaction data will replace the older, uncompacted data. Note that this compaction job is a batch process—for example, a MapReduce job that’s executed as part of a job flow. It may take several minutes depending on the data size, so it will only work for multi- minute timeliness intervals. Table 2-1 is an example of how a compaction job works. Note in this example that the first column will be considered the primary key for the data. Table 2-1. Compaction Original data New data Resulting data A,Blue,5 B,Yellow,3 A,Blue,5 B,Red,6 D,Gray,0 B,Yellow,3 C,Green,2 C,Green,2 D,Gray,0 There are many ways to implement compaction with very different performance results. We’ll provide some examples of implementing this in Chapter 4. Another option to consider beyond HDFS and file compactions is to use HBase instead. HBase handles compactions in the background and has the architecture to support deltas that take effect upon the completion of an HBase put, which typically occurs in milliseconds. Note that HBase requires additional administration and application development knowledge. Also, HBase has much different access patterns than HDFS that should be considered—for example, scan rates. HBase scan rates are about 8–10 times slower than HDFS. Another difference is random access; HBase can access a single record in milliseconds, whereas HDFS doesn’t support random access other than file seeking, which is expensive and often complex. Access Patterns This decision point requires deep understanding of the underlying requirements for information delivery. How is the data going to be used once it is in Hadoop? For example: if the requirements call for random row access, HDFS may not be the best fit, and HBase might be a better choice. Conversely, if scans and data transformations are required, HBase may not be a good selection. Even though there can be many Data Ingestion Considerations 43variables to consider, we suggest this basic guiding principle: for cases where simplic‐ ity, best compression, and highest scan rate are called for, HDFS is the default selec‐ tion. In addition, in newer versions of HDFS (Hadoop 2.3.0 and later) caching data into memory is supported. This allows tools to read data directly from memory for files loaded into the cache. This moves Hadoop towards a massively parallel in- memory database accessible to tools in the Hadoop ecosystem. When random access is of primary importance, HBase should be the default, and for search processing you should consider Solr. For a more detailed look, Table 2-2 includes common access patterns for these stor‐ age options. Table 2-2. Access patterns for Hadoop storage options Tool Use cases Storage device MapReduce Large batch processes HDFS is preferred. HBase can be used but is less preferred. Hive Batch processing with HDFS is preferred. HBase can be used but is less SQL-like language preferred. Pig Batch processing with a HDFS is preferred. HBase can be used but is less data flow language preferred. Spark Fast interactive HDFS is preferred. HBase can be used but is less processing preferred. Giraph Batch graph processing HDFS is preferred. HBase can be used but is less preferred. Impala MPP style SQL HDFS is preferred for most cases, but HBase will make sense for some use cases even through the scan rates are slower, namely near-real-time access of newly updated data and very fast access to records by primary key. HBase API Atomicputs,gets, and HBase deletes on record- level data Original Source System and Data Structure When ingesting data from a filesystem, you should consider the following items: 44 Chapter 2: Data Movement www.allitebooks.comRead speed of the devices on source systems Disk I/O is often a major bottleneck in any processing pipeline. It may not be obvi‐ ous, but optimizing an ingestion pipeline often requires looking at the system from which the data is being retrieved. Generally, with Hadoop we’ll see read speeds of anywhere from 20 MBps to 100 MBps, and there are limitations on the motherboard or controller for reading from all the disks on the system. To maximize read speeds, make sure to take advantage of as many disks as possible on the source system. On some network attached storage (NAS) systems, additional mount points can increase throughput. Also note that a single reading thread may not be able to maximize the read speed of a drive or device. Based on our experience, on a typical drive three threads is normally required to maximize throughput, although this number will vary. Originalfile type Data can come in any format: delimited, XML, JSON, Avro, fixed length, variable length, copybooks, and many more. Hadoop can accept any file format, but not all formats are optimal for particular use cases. For example, consider a CSV file. CSV is a very common format, and a simple CSV file can generally be easily imported into a Hive table for immediate access and processing. However, many tasks converting the underlying storage of this CSV file may provide more optimal processing; for exam‐ ple, many analytical workloads using Parquet as a storage format may provide much more efficient processing while also reducing the storage size of the file. Another consideration is that not all file formats can work with all tools in the Hadoop ecosystem. An example of this would be variable-length files. Variable-length files are similar to flat files in that columns are defined with a fixed length. The differ‐ ence between a fixed-length file and a variable-length file is that in the variable-length file one of the leftmost columns can decide the rules to read the rest of the file. An example of this is if the first two columns are an 8-byte ID followed by a 3-byte type. The ID is just a global identifier and reads very much like a fixed-length file. The type column, however, will set the rules for the rest of the record. If the value of the type column is car, the record might contain columns like max speed, mileage, and color; however, if the value is pet, then there might be columns in the record such as size and breed. These different columns will have different lengths, hence the name “vari‐ able length.” With this understanding we can see that a variable-length file may not be a good fit for Hive, but can still be effectively processed by one of the processing frameworks available for Hadoop, such as a Java MapReduce job, Crunch, Pig, or Spark. Compression There is a pro and a con to compressing data on the original filesystem. The pro is that transferring a compressed file over the network requires less I/O and network Data Ingestion Considerations 45 bandwidth. The con is that most compression codecs applied outside of Hadoop are not splittable (e.g., Gzip), although most of these compression codecs are splittable in Hadoop if you use them with a splittable container format like SequenceFiles, Parquet files, or Avro files as we discussed in Chapter 1. Normally the way to do this is to copy the compressed file to Hadoop and convert the files in a post-processing step. It’s also possible to do the conversion as the data is streamed to Hadoop, but normally it makes more sense to use the distributed processing power of the Hadoop cluster to convert files, rather than just the edge nodes that are normally involved in moving data to the cluster. We discussed compression considerations with HDFS in Chapter 1, and we’ll look at concrete examples of ingestion and post- processing in the case studies later in the book. Relational database management systems It is common for Hadoop applications to integrate data from RDBMS vendors like Oracle, Netezza, Greenplum, Vertica, Teradata, Microsoft, and others. The tool of choice here is almost always Apache Sqoop. Sqoop is a very rich tool with lots of options, but at the same time it is simple and easy to learn compared to many other Hadoop ecosystem projects. These options will control which data is retrieved from the RDBMS, how the data is retrieved, which connector to use, how many map tasks to use, split patterns, and final file formats. Sqoop is a batch process, so if the timeliness of the data load into the cluster needs to be faster than batch, you’ll likely have to find an alternate method. One alternative for certain use cases is to split data on ingestion, with one pipeline landing data in the RDBMS, and one landing data in HDFS. This can be enabled with tools like Flume or Kafka, but this is a complex implementation that requires code at the application layer. Note that with the Sqoop architecture the DataNodes, not just the edge nodes, are connecting to the RDBMS. In some network configurations this is not possible, and Sqoop will not be an option. Examples of network issues are bottlenecks between devices and firewalls. In these cases, the best alternative to Sqoop is an RDBMS file dump and import into HDFS. Most relational databases support creating a delimited file dump, which can then be ingested into Hadoop via a simple file transfer. Figure 2-1 shows the difference between a file export with Sqoop versus RMDBS. 46 Chapter 2: Data Movement Figure 2-1. Sqoop versus RDBMSfile export We’ll discuss more considerations and best practices for using Sqoop later in this chapter. Streaming data Examples of streaming input data include Twitter feeds, a Java Message Service (JMS) queue, or events firing from a web application server. In this situation a tool like Flume or Kafka is highly recommended. Both systems offer levels of guarantees and provide similar functionality, with some important differences. Later in this chapter we will drill down deeper into both Flume and Kafka. Logfiles Logfiles get their own section because they’re a cross between filesystem and stream‐ ing input. An anti-pattern is to read the logfiles from disk as they are written because this is almost impossible to implement without losing data. The correct way of ingest‐ ing logfiles is to stream the logs directly to a tool like Flume or Kafka, which will write directly to Hadoop instead. Transformations In general, transformation refers to making modifications on incoming data, distrib‐ uting the data into partitions or buckets, or sending the data to more than one store or location. Here are some simple examples of each option: Transformation XML or JSON is converted to delimited data. Partitioning Incoming data is stock trade data and partitioning by ticker is required. Data Ingestion Considerations 47Splitting The data needs to land in HDFS and HBase for different access patterns. Making a decision on how to transform data will depend on the timeliness of the requirements. If the timeliness of the data is batch, then the preferred solution will most likely be a file transfer followed by a batch transformation. Note that these transformations can be accomplished with tools like Hive, Pig, or Java MapReduce, or newer tools like Spark. The use of post-processing for this batch transformation step is preferred because of the checkpoints for failure provided by this processing, including the checksum on the file transfer and the all-or-nothing success/failure behavior of MapReduce. An important thing to note: MapReduce can be configured to transfer, partition, and split data beyond all-or-nothing processing. You would do so by configuring MapReduce with two output directories: one for records that were processed successfully and one for failures. In some cases, the timeliness of the data requirements will not allow for the simplicity of a file transfer followed by MapRe‐ duce processing. Sometimes the work has to be done as the data is in flight to the cluster with a stream ingestion tool like Flume. When using Flume, this can be done using interceptors and selectors. We’ll cover Flume in more detail later in this chapter, but we’ll briefly cover the roles of interceptors and selectors here. Interceptors A Flume interceptor is a Java class that allows for in-flight enrichment of event data. Since it’s implemented in Java it provides great flexibility in the functionality that can be implemented. The interceptor has the capability to transform an event or a batch of events, including functionality to add or remove events from a batch. You must take special care when implementing it because it is part of a streaming framework, so the implementation should not cause the pipe to be slowed by things like calls to external services or garbage collection issues. Also remember that when transforming data with interceptors, you have a limit on processing power because normally Flume is only installed on a subset of nodes in a cluster. Selectors In Flume a selector object acts as a “fork in the road.” It will decide which of the roads (if any) an event will go down. We’ll provide an example of this in the following dis‐ cussion of Flume patterns. Network Bottlenecks When you’re ingesting into Hadoop the bottleneck is almost always the source system or the network between the source system and Hadoop. If the network is the bottle‐ neck, it will be important to either add more network bandwidth or compress the data over the wire. You can do this by compressing the files before sending them over 48 Chapter 2: Data Movement the wire or using Flume to compress the data between agents on different sides of the network bottleneck. The easiest way to know if the network is the bottleneck is to look at your throughput numbers and at the network configuration: if you are using 1 Gb ethernet, the expected throughput is around 100 MBps. If this is the throughput you are seeing with Flume, than indeed you maximized the network capacity and need to consider compressing the data. For a more accurate diagnostic, you can use a network moni‐ toring tool to determine your network utilization. Network Security Sometimes you need to ingest data from sources that you can access only by going outside the company’s firewalls. Depending on the data, it may be important to encrypt the data while it goes over the wire. You can do so by simply encrypting the files before sending them, for example using OpenSSL. Alternatively, Flume includes support for encrypting data sent between Flume agents. Note that Kafka does not currently support encryption of data within a Kafka data flow, so you’ll have to do additional work to encrypt and decrypt the data outside of Kafka. Push or Pull All the tools discussed in this chapter can be classified as either pushing or pulling tools. The important thing to note is the actor in the architecture because in the end that actor will have additional requirements to consider, such as: • Keeping track of what has been sent • Handling retries or failover options in case of failure • Being respectful of the source system that data is being ingested from • Access and security We’ll cover these requirements in more detail in this chapter, but first we’ll discuss two common Hadoop tools—Sqoop and Flume—to help clarify the distinction between push and pull in the context of data ingestion and extraction with Hadoop. Sqoop Sqoop is a pull solution that is used to extract data from an external storage system such as a relational database and move that data into Hadoop, or extract data from Hadoop and move it into an external system. It must be provided various parameters about the source and target systems, such as connection information for the source database, one or more tables to extract, and so on. Given these parameters, Sqoop will run a set of jobs to move the requested data. Data Ingestion Considerations 49 In the example where Sqoop is extracting (pulling) data from a relational database, we have considerations such as ensuring that we extract data from the source data‐ base at a defined rate, because Hadoop can easily consume too many resources on the source system. We also need to ensure that Sqoop jobs are scheduled to not interfere with the source system’s peak load time. We’ll provide further details and considera‐ tions for Sqoop later in this chapter and provide specific examples in the case studies later in the book. Flume Note that Flume can be bucketed in both descriptions depending on the source used. In the case of the commonly used Log4J appender, Flume is pushing events through a pipeline. There are also several Flume sources, such as the spooling directory source or the JMS source, where events are being pulled. Here again, we’ll get into more detail on Flume in this section and in our case studies later in the book. Failure Handling Failure handling is a major consideration when you’re designing an ingestion pipe‐ line; how to recover in case of failure is a critical question. With large distributed sys‐ tems, failure is not a question of if, but of when. We can put in many layers of protection when designing an ingestion flow (sometimes at great cost to perfor‐ mance), but there is no silver bullet that can prevent all possible failures. Failure sce‐ narios need to be documented, including failure delay expectations and how data loss will be handled. The simplest example of a failure scenario is with file transfers—for example, per‐ forming a hadoop fs -put filename command. When the put command has fin‐ ished, the command will have validated that the file is in HDFS, replicated three times, and passed a checksum check. But what if the put command fails? The normal way of handling the file transfer failure is to have multiple local filesystem directories that represent different bucketing in the life cycle of the file transfer process. Let’s look at an example using this approach. In this example, there are four local filesystem directories: ToLoad, InProgress, Failure, and Successful. The workflow in this case is as simple as the following: 1. Move the file into ToLoad. 2. When theput command is ready to be called, move the file into InProgress. 3. Call theput command. 4. If theput command fails, move the file into the Failures directory. 5. If theput was successful, move the file into the Successful directory. 50 Chapter 2: Data Movement Note that a failure in the middle of a file put will require a complete resend. Consider this carefully when doing a hadoop fs -put with very large files. If a failure on the network happens five hours into a file transfer, the entire process has to start again. File transfers are pretty simple, so let’s take an example with streaming ingestion and Flume. In Flume, there are many areas for failure, so to keep things simple, let’s just focus on the following three: • A “pull” source such as the spooling directory source could fail to load events to the channel because the channel is full. In this case, the source must pause before retrying the channel, as well as retain the events. • An event receiving source could fail to load the event to the channel because the channel is full. In this case, the source will tell the Flume client it was unsuccess‐ ful in accepting the last batch of events and then the Flume client is free to re- send to this source or another source. • The sink could fail to load the event into the final destination. A sink will take a number of events from the channel and then try to commit them to the final des‐ tination, but if that commit fails then the batch of events needs to be returned to the channel. The big difference between Flume and file transfers is that with Flume, in the case of a failure there is a chance of duplicate records getting created in Hadoop. This is because in the case of failure the event is always returned to the last step, so if the batch was half successful we will get duplicates. There are methods that try to address this issue with Flume and other streaming solutions like Kafka, but there is a heavy performance cost in trying to remove duplicates at the time of streaming. We’ll see examples of processing to handle deduplication later in the book. Level of Complexity The last design factor that needs to be considered is complexity for the user. A simple example of this is if users need to move data into Hadoop manually. For this use case, using a simple hadoop fs –put or a mountable HDFS solution, like FUSE (Filesys‐ tem in Userspace) or the new NFS (Network File System) gateway, might provide a solution. We’ll discuss these options next. Data Ingestion Options Now that we’ve discussed considerations around designing data ingestion pipelines, we’ll dive more deeply into specific tools and methods for moving data into Hadoop. These range from simply loading files into HDFS to using more powerful tools such as Flume and Sqoop. As we’ve noted before, we’re not attempting to provide in-depth introductions to these tools, but rather to provide specific information on effectively Data Ingestion Options 51leveraging these tools as part of an application architecture. The suggested references will provide more comprehensive and in-depth overviews for these tools. We’ll start by discussing basic file transfers in the next section, and then move on to discussing Flume, Sqoop, and Kafka as components in your Hadoop architectures. File Transfers The simplest and sometimes fastest way to get data into (and out of) Hadoop is by doing file transfers—in other words, using the hadoop fs -put and hadoop fs -get commands. For this reason file transfers should be considered as the first option when you are designing a new data processing pipeline with Hadoop. Before going into more detail, let’s review the characteristics of file transfers with Hadoop: • It’s an all-or-nothing batch processing approach, so if an error occurs during file transfer no data will be written or read. This should be contrasted with ingestion methods such as Flume or Kafka, which provide some level of failure handling and guaranteed delivery. • By default file transfers are single-threaded; it’s not possible to parallize file transfers. • File transfers are from a traditional filesystem to HDFS. • Applying transformations to the data is not supported; data is ingested into HDFS as is. Any processing of the data needs to be done after it lands in HDFS, as opposed to in-flight transformations that are supported with a system like Flume. • It is a byte-by-byte load, so any types of file can be transferred (text, binary, images, etc.). HDFS client commands As noted already, the hadoop fs -put command is a byte-by-byte copy of a file from a filesystem into HDFS. When the put job is completed, the file will be on HDFS as one or more blocks with each block replicated across different Hadoop DataNodes. The number of replicas is configurable, but the normal default is to replicate three times. To make sure that the blocks don’t get corrupted, a checksum file accompanies each block. When you use the put command there are normally two approaches: the double-hop and single-hop. The double-hop, shown in Figure 2-2, is the slower option because it involves an additional write and read to and from the disks on the Hadoop edge 52 Chapter 2: Data Movementnode. Sometimes, though, this is the only option because the external source filesys‐ tem isn’t available to be mounted from the Hadoop cluster. Figure 2-2. Double-hopfile uploads The alternative is the single-hop approach as shown in Figure 2-3, which requires that the source device is mountable—for example, a NAS or SAN. The external source can be mounted, and the put command can read directly from the device and write the file directly to HDFS. This approach has the benefits of improved perfor‐ mance. It also lessens the requirement to have edge nodes with large local drives. Figure 2-3. Single-hopfile uploads Mountable HDFS In addition to using the commands available as part of the Hadoop client, there are several options available that allow HDFS to be mounted as a standard filesystem. These options allow users to interact with HDFS using standard filesystem com‐ mands such as ls, cp, mv, and so forth. Although these options can facilitate access to HDFS for users, they’re of course still subject to the same limitations as HDFS: Data Ingestion Options 531 • Just as with HDFS, none of these options offer full POSIX semantics. • Random writes are not supported; the underlying filesystem is still “write once, read many.” Another pitfall with mountable HDFS is the potential for misuse. Although it makes it easier for users to access HDFS and ingest files, this ease of access may encourage the ingestion of many small files. Since Hadoop is tuned for a relatively small number of large files, this scenario should be guarded against. Note that “many small files” could mean millions of files in a reasonably sized cluster. Still, for storage and pro‐ cessing efficiency it’s better to store fewer large files in Hadoop. If there is a need to ingest many small files into Hadoop, there are several approaches that can be used to mitigate this: • Use Solr for storing and indexing the small files. We’ll discuss Solr in more detail in Chapter 7. • Use HBase to store the small files, using the path and filename as the key. We’ll also discuss HBase in more detail in Chapter 7. • Use a container format such as SequenceFiles or Avro to consolidate small files. There are several projects providing a mountable interface to Hadoop, but we’ll focus on two of the more common choices, Fuse-DFS and NFS: Fuse-DFS Fuse-DFS is built on the FUSE project,which was designed to facilitate the cre‐ ation of filesystems on UNIX/Linux systems without the need to modify the ker‐ nel. Fuse-DFS makes it easy to mount HDFS on a local filesystem, but as a user space module, it involves a number of hops between client applications and HDFS, which can significantly impact performance. Fuse-DFS also has a poor consistency model. For these reasons you should carefully consider it before deploying it as a production solution. NFSv3 A more recent project adds support for the NFSv3 protocol to HDFS (see Figure 3-4). This provides a scalable solution with a minimal performance hit. The design involves an NFS gateway server that streams files to HDFS using the DFSClient. You can scale this solution by adding multiple NFS gateway nodes. 1 POSIX refers to a set of standards that operating systems should adhere to in order to support portability, including filesystem specifications. While HDFS is “POSIX-like,” it does not fully support all the features expected of a POSIX-compliant filesystem. 54 Chapter 2: Data Movement Figure 2-4.The HDFS NFSv3 gateway Note that with the NFS gateway, writes are sent by the kernel out of order. This requires the NFS server to buffer and reorder the writes before sending them to HDFS, which can have an impact with high data volumes, both in performance and disk consumption. Before you deploy any mountable HDFS solution, it’s highly recommended that you perform testing with sufficient data volumes to ensure that the solution will provide suitable performance. In general, these options are recommended for only small, manual data transfers, not for ongoing movement of data in and out of your Hadoop cluster. Considerations for File Transfers versus Other Ingest Methods Simple file transfers are suitable in some cases, particularly when you have an existing set of files that needs to be ingested into HDFS, and keeping the files in their source format is acceptable. Otherwise, the following are some considerations to take into account when you’re trying to determine whether a file transfer is acceptable, or whether you should use a tool such as Flume: • Do you need to ingest data into multiple locations? For example, do you need to ingest data into both HDFS and Solr, or into HDFS and HBase? In this case using a file transfer will require additional work after the files are ingested, and using Flume is likely more suitable. • Is reliability important? If so, remember that an error mid-transfer will require a restart of the file transfer process. Here again, Flume is likely a more appropriate solution. • Is transformation of the data required before ingestion? In that case, Flume is almost certainly the correct tool. One option to consider if you have files to ingest is using the Flume spooling direc‐ tory source. This allows you to ingest files by simply placing them into a specific Data Ingestion Options 55directory on disk. This will provide a simple and reliable way to ingest files as well as the option to perform in flight transformations of the data if required. Sqoop: Batch Transfer Between Hadoop and Relational Databases We’ve already discussed Sqoop quite a bit in this chapter, so let’s get into more detail on considerations and best practices for it. As we’ve discussed, Sqoop is a tool used to import data in bulk from a relational database management system to Hadoop and vice versa. When used for importing data into Hadoop, Sqoop generates map-only MapReduce jobs where each mapper connects to the database using a Java database connectivity (JDBC) driver, selects a portion of the table to be imported, and writes the data to HDFS. Sqoop is quite flexible and allows not just importing full tables, but also addingwhere clauses to filter the data we import and even supply our own query. For example, here is how you can import a single table: sqoop import connect jdbc:oracle:thin:localhost:1521/oracle \ username scott password tiger \ table HR.employees target-dir /etl/HR/landing/employee \ input-fields-terminated-by "\t" \ compression-codec org.apache.hadoop.io.compress.SnappyCodec compress And here is how to import the result of a join: sqoop import \ connect jdbc:oracle:thin:localhost:1521/oracle \ username scott password tiger \ query 'SELECT a., b. FROM a JOIN b on (a.id == b.id) WHERE CONDITIONS' \ split-by a.id target-dir /user/foo/joinresults Note that in this example, CONDITIONS is a literal value to type in as part of the com‐ mand line. The CONDITIONS placeholder is used by Sqoop to control the parallelism of the job, and will be replaced by Sqoop with generated conditions when the com‐ mand is run. In this section we’ll outline some patterns of using Sqoop as a data ingestion method. Choosing a split-by column When importing data, Sqoop will use multiple mappers to parallelize the data ingest and increase throughput. By default, Sqoop will use four mappers and will split work between them by taking the minimum and maximum values of the primary key col‐ umn and dividing the range equally among the mappers. The split-by parameter lets you specify a different column for splitting the table between mappers, and num- mappers lets you control the number of mappers. As we’ll discuss shortly, one reason to specify a parameter for split-by is to avoid data skew. Note that each mapper will have its own connection to the database, and each will retrieve its portion of the table when we specify its portion limits in a where clause. It is important to choose a split 56 Chapter 2: Data Movement column that has an index or is a partition key to avoid each mapper having to scan the entire table. If no such key exists, specifying only one mapper is the preferred sol‐ ution. Usingdatabase-specific connectors whenever available Different RDBMSs support different dialects of SQL language. In addition they each have their own implementation and their own methods of optimizing data transfers. Sqoop ships with a generic JDBC connector that will work with any database that supports JDBC, but there are also vendor-specific connectors that translate across different dialects and optimize data transfer. For example, the Teradata connector will use Teradata’s FastExport utility to optimally execute data import, while the Oracle connector will disable parallel queries to avoid bottlenecks on the query coordinator. Using the Goldilocks method of Sqoop performance tuning In most cases, Hadoop cluster capacity will vastly exceed that of the RDBMS. If Sqoop uses too many mappers, Hadoop will effectively run a denial-of-service attack against your database. If we use too few mappers, data ingest rates may be too slow to meet requirements. It is important to tune the Sqoop job to use a number of mappers that 2 is “just right”—that is, one that adheres to the Goldilocks principle. Since the risk of overloading the database is much greater than the risk of a slower ingest, we typically start with a very low number of mappers and gradually increase it to achieve a balance between Sqoop ingest rates and keeping the database and net‐ work responsive to all users. Loading many tables in parallel with fair scheduler throttling There is a very common use case of having to ingest many tables from the same RDBMS. There are two different approaches to implementing fair scheduler throt‐ 3 tling: Load the tables sequentially This solution is by far the simplest but has the drawback of not optimizing the bandwidth between the RDBMS and the Hadoop cluster. To illustrate this, imag‐ ine that we have five tables. Each table is ingested using a different number of mappers because not all table sizes require allocating the full number of allowed mappers. The result of this example will look like Figure 2-5. 2 The Goldilocks principle states that something must fall within certain margins, as opposed to reaching extremes. 3 Fair schedulers are a method of sharing resources between jobs that allocate, on average, an equal share of the resources to each job. To read more on the Hadoop fair scheduler and how it can be used, see Chapter 7, “Resource Management” in Hadoop Operations by Eric Sammer (O’Reilly). Data Ingestion Options 57Figure 2-5. Sqoop tasks executing sequentially As you can see from the figure, there are mappers left idle during certain jobs. This leads to the idea of running these jobs in parallel so we can optimally lever‐ age the available mappers and decrease the processing and network time. Load the tables in parallel Running Sqoop jobs in parallel will allow you to use resources more effectively, but it adds the complexity of managing the total number of mappers being run against the RDBMS at the same time. You can solve the problem of managing the total number of mappers by using the fair scheduler to make a pool that has the maximum mappers set to the maximum number of mappers Sqoop should be allowed to use to interact with the RDBMS. If done correctly, the execution of the five jobs illustrated in the synchronized solution will look like Figure 2-6. Figure 2-6. Sqoop tasks executing in parallel with number of tasks limited to four by the fair scheduler Diagnosing bottlenecks Sometimes we discover that as we increase the number of mappers, the ingest rate does not increase proportionally. Ideally, adding 50% more mappers will result in 50% more rows ingested per minute. While this is not always realistic, if adding more mappers has little impact on ingest rates, there is a bottleneck somewhere in the pipe. Here are a few likely bottlenecks: 58 Chapter 2: Data Movement

Advise: Why You Wasting Money in Costly SEO Tools, Use World's Best Free SEO Tool Ubersuggest.