How to move data from Mongodb to Hadoop

moving data warehouse to hadoop and how to transfer data from teradata to hadoop and moving data from rdbms to hadoop, moving data from one hadoop cluster to another
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Comment
Moving data in and out of Hadoop This chapter covers ■ Understanding key design considerations for data ingress and egress tools ■ Techniques for moving log files into HDFS and Hive ■ Using relational databases and HBase as data sources and data sinks Moving data in and out of Hadoop, which I’ll refer to in this chapter as data ingress and egress, is the process by which data is transported from an external system into an internal system, and vice versa. Hadoop supports ingress and egress at a low level in HDFS and MapReduce. Files can be moved in and out of HDFS, and data can be pulled from external data sources and pushed to external data sinks using MapReduce. Figure 2.1 shows some of Hadoop’s ingress and egress mechanisms. The fact that your data exists in various forms and locations throughout your environments complicates the process of ingress and egress. How do you bring in data that’s sitting in an OLTP (online transaction processing) database? Or ingress log data that’s being produced by tens of thousands of production servers? Or work with binary data sitting behind a firewall? 27 28 CHAPTER 2 Moving data in and out of Hadoop Copy data Copy data out into Hadoop. Perform some processing, of Hadoop. such as joining different data sources together. Data ingress Data egress Log data Log collectors File egress Files File ingress HDFS Files HBase/ MapReduce NoSQL HBase/ NoSQL OLTP/ OLTP OLAP Figure 2.1 Hadoop data ingress and egress transports data to and from an external system to an internal one. Further, how do you automate your data ingress and egress process so that your data is moved at regular intervals? Automation is a critical part of the process, along with monitoring and data integrity responsibilities to ensure correct and safe transporta- tion of data. In this chapter we’ll survey the tools that simplify the process of ferrying data in and out of Hadoop. We’ll also look at how to automate the movement of log files, ubiqui- tous data sources for Hadoop, but which tend to be scattered throughout your environ- ments and therefore present a collection and aggregation challenge. In addition, we’ll cover using Flume for moving log data into Hadoop, and in the process we’ll evaluate two competing log collection and aggregation tools, Chukwa and Scribe. We’ll also walk through how to move relational data in and out of Hadoop. This is an emerging usage pattern where you can use Hadoop to join data sitting in your data- bases with data ingressed from other sources, such as log files, and subsequently push result data back out to databases. Finally, we’ll cover how to use Sqoop for database ingress and egress activities, and we’ll look at how to ingress and egress data in HBase. We’ll cover a lot of ground in this chapter, and it’s likely that you have specific types of data you need to ingress or egress. If this is the case, feel free to jump directly to a particular section that provides the details you need. In addition, if you’re looking for lower-level HDFS ingress and egress options‚ take a look at appendix B where I cover using tools such as WebHDFS and Hoop. Let’s start things off with a look at key ingress and egress system considerations. Key elements of ingress and egress 29 2.1 Key elements of ingress and egress Moving large quantities of data in and out of Hadoop has logistical challenges that include consistency guarantees and resource impacts on data sources and destina- tions. Before we dive into the techniques, however, we need to discuss the design ele- ments to be aware of when working with data ingress and egress. IDEMPOTENCE An idempotent operation produces the same result no matter how many times it’s executed. In a relational database the inserts typically aren’t idempotent, because exe- cuting them multiple times doesn’t produce the same resulting database state. Alter- natively, updates often are idempotent, because they’ll produce the same end result. Any time data is being written idempotence should be a consideration, and data ingress and egress in Hadoop is no different. How well do distributed log collection frameworks deal with data retransmissions? How do you ensure idempotent behavior in a MapReduce job where multiple tasks are inserting into a database in parallel? We’ll examine and answer these questions in this chapter. AGGREGATION The data aggregation process combines multiple data elements. In the context of data ingress this can be useful because moving large quantities of small files into HDFS potentially translates into NameNode memory woes, as well as slow MapReduce exe- cution times. Having the ability to aggregate files or data together mitigates this prob- lem, and is a feature to consider. DATA FORMAT TRANSFORMATION The data format transformation process converts one data format into another. Often your source data isn’t in a format that’s ideal for processing in tools such as Map- Reduce. If your source data is multiline XML or JSON form, for example, you may want to consider a preprocessing step. This would convert the data into a form that can be split, such as a JSON or an XML element per line, or convert it into a format such as Avro. Chapter 3 contains more details on these data formats. RECOVERABILITY Recoverability allows an ingress or egress tool to retry in the event of a failed opera- tion. Because it’s unlikely that any data source, sink, or Hadoop itself can be 100 percent available, it’s important that an ingress or egress action be retried in the event of failure. CORRECTNESS In the context of data transportation, checking for correctness is how you verify that no data corruption occurred as the data was in transit. When you work with heteroge- neous systems such as Hadoop data ingress and egress tools, the fact that data is being transported across different hosts, networks, and protocols only increases the poten- tial for problems during data transfer. Common methods for checking correctness of raw data such as storage devices include Cyclic Redundancy Checks (CRC), which are what HDFS uses internally to maintain block-level integrity. 30 CHAPTER 2 Moving data in and out of Hadoop RESOURCE CONSUMPTION AND PERFORMANCE Resource consumption and performance are measures of system resource utilization and system efficiency, respectively. Ingress and egress tools don’t typically incur signif- icant load (resource consumption) on a system, unless you have appreciable data vol- umes. For performance, the questions to ask include whether the tool performs ingress and egress activities in parallel, and if so, what mechanisms it provides to tune the amount of parallelism. For example, if your data source is a production database, don’t use a large number of concurrent map tasks to import data. MONITORING Monitoring ensures that functions are performing as expected in automated systems. For data ingress and egress, monitoring breaks down into two elements: ensuring that the process(es) involved in ingress and egress are alive, and validating that source and destination data are being produced as expected. On to the techniques. Let’s start with how you can leverage Hadoop’s built-in ingress and egress mechanisms. 2.2 Moving data into Hadoop The first step in working with data in Hadoop is to make it available to Hadoop. As I mentioned earlier in this chapter, there are two primary methods that can be used for moving data into Hadoop: writing external data at the HDFS level (a data push), or reading external data at the MapReduce level (more like a pull). Reading data in MapReduce has advantages in the ease with which the operation can be parallelized and fault tolerant. Not all data is accessible from MapReduce, however, such as in the case of log files, which is where other systems need to be relied upon for transporta- tion, including HDFS for the final data hop. In this section we’ll look at methods to move source data into Hadoop, which I’ll refer to as data ingress. I’ll use the data ingress design considerations in the previous section as the criteria to examine and understand the different tools as I go through the techniques. We’ll look at Hadoop data ingress across a spectrum of data sources, starting with log files, then semistructured or binary files, then databases, and finally HBase. We’ll start by looking at data ingress of log files. LOW-LEVEL HADOOP INGRESS MECHANISMS This section will focus on high-level data ingress tools that provide easy and automated mechanisms to get data into Hadoop. All these tools use one of a finite set of low-level mechanisms, however, which Hadoop provides to get data in and out. These mechanisms include Hadoop’s Java HDFS API, WebHDFS, the new Hadoop 0.23 REST API, and MapReduce. An extensive evaluation of these mechanisms and tools is outside the scope of this chap- ter, but I provide them for reference in appendix B. Moving data into Hadoop 31 2.2.1 Pushing log files into Hadoop Log data has long been prevalent across all applications, but with Hadoop came the ability to process the large volumes of log data produced by production systems. Various systems produce log data, from network devices and operating systems to web servers and applications. These log files all offer the potential for valuable insights into how sys- tems and applications operate as well as how they’re used. What unifies log files is that they tend to be in text form and line-oriented, making them easy to process. In this section we’ll look at tools that can help transport log data from source to HDFS. We’ll also perform a deep dive into one of these tools and look at how to trans- port system log files into HDFS and Hive. I’ll provide what you need to know to deploy, configure, and run an automated log collection and distribution infrastructure, and kick-start your own log data-mining activities. COMPARING FLUME, CHUKWA, AND SCRIBE Flume, Chukwa, and Scribe are log collecting and distribution frameworks that have the capability to use HDFS as a data sink for that log data. It can be challenging to dif- ferentiate between them because they share the same features. FLUME Apache Flume is a distributed system for collecting streaming data. It’s an Apache project in incubator status, originally developed by Cloudera. It offers various levels of reliability and transport delivery guarantees that can be tuned to your needs. It’s highly customizable and supports a plugin architecture where you can add custom data sources and data sinks. Figure 2.2 shows Flume’s architecture. CHUKWA Chukwa is an Apache subproject of Hadoop that also offers a large-scale mechanism to collect and store data in HDFS. And it’s also in incubator status. Chukwa’s reliability log4j appender Agent Node Tail-able HDFS iles such as logs Agent Node (Apache, Collector Node syslog,...) HBase Agent Node syslog UDP/TCP Flat files Collector Node Agent Node stdout of launched Custom Agent Node executable data sinks Custom data Master Master Master sources Figure 2.2 Flume architecture for collecting streaming data 32 CHAPTER 2 Moving data in and out of Hadoop Demultiplexer MapReduce log4j Agent appender Collector HDFS MySQL Text log Agent files HBase Collector syslog Agent UDP/TCP HICC Figure 2.3 Chukwa architecture for collecting and storing data in HDFS model supports two levels: end-to-end reliability, and fast-path delivery, which minimizes latencies. After writing data into HDFS Chukwa runs a MapReduce job to demultiplex the data into separate streams. Chukwa also offers a tool called Hadoop Infrastructure Care Center (HICC), which is a web interface for visualizing system performance. Figure 2.3 shows Chukwa’s architecture. SCRIBE Scribe is a rudimentary streaming log distribution service, developed and used heavily by Facebook. A scribe server that collects logs runs on every node and forwards them to a central Scribe server. Scribe supports multiple data sinks, including HDFS, regular filesystems, and NFS. Scribe’s reliability comes from a file-based mechanism where the server persists to a local disk in the event it can’t reach the downstream server. Unlike Flume or Chukwa, Scribe doesn’t include any convenience mechanisms to pull log data. Instead the onus is on the user to stream the source data to the Scribe server running on the local system. For example, if you want to push your Apache log files, you would need to write a daemon that tails and forwards the log data to the Scribe server. Figure 2.4 shows Scribe’s architecture. The Scribe project is hosted on GitHub at https://github.com/facebook/scribe. log4j appender Server Central server HDFS Server Custom tail + Text log forward Filesystem Central server mechanism Server Custom tail + syslog forward UDP/TCP mechanism Figure 2.4 Scribe architecture also pushes log data into HDFS.TECHNIQUE 1 Pushing system log messages into HDFS with Flume 33 Table 2.1 Feature comparison of log collecting projects Popularity (number of Centralized Level of Commercial Project Reliability Failover mailing list messages configuration documentation support from 08/11/2011) Flume Yes Yes Yes High Yes High (348) (Cloudera) � Best effort � None � Disk failover � Manually � End-to-end configurable � Automated configuration Chukwa No Yes Yes Low No Medium (85) � Fast � None � End-to-end � Manually configurable � Automated Scribe No Yes No Low No Medium (46) � Disk-based (not end-to-end) Each of these three tools can fulfill the criteria of providing mechanisms to push log data into HDFS. Table 2.1 compares the various tools based on features such as reli- ability and configuration. From a high-level perspective there’s not much feature difference between these tools, other than that Scribe doesn’t offer any end-to-end delivery guarantees. It’s also clear that the main downside for Chukwa and Scribe is their limited user documenta- tion. Their mailing list activities were also moderate. I had to pick one of these three tools for this chapter, so I picked Flume. My rea- sons for selecting Flume include its centralized configuration, its flexible reliability and failover modes, and also the popularity of its mailing list. Let’s look at how you can deploy and set up Flume to collect logs, using a prob- lem/solution scenario. I’ll continue to introduce techniques in this manner through- out the rest of this chapter. TECHNIQUE 1 Pushing system log messages into HDFS with Flume You have a bunch of log files being produced by multiple applications and systems across multiple servers. There’s no doubt there’s valuable information to be mined out of these logs, but your first challenge is a logistical one of moving these logs into your Hadoop cluster so that you can perform some analysis. Problem You want to push all of your production server system log files into HDFS. Solution For this technique you’ll use Flume, a data collection system, to push a Linux log file into HDFS. We will also cover configurations required to run Flume in a distributed environment, as well as an examination of Flume’s reliability modes. 34 CHAPTER 2 Moving data in and out of Hadoop Discussion Figure 2.5 shows a full-fledged Flume deployment, and its four primary components: ■ Nodes—Flume data paths that ferry data from a data source to a data sink. Agents and Collectors are simply Flume Nodes that are deployed in a way to efficiently and reliably work with a large number of data sources. ■ Agents—Collect streaming data from the local host and forward it to the Collectors. ■ Collectors—Aggregate data sent from the Agents and write that data into HDFS. ■ Masters—Perform configuration management tasks and also help with reliable data flow. This figure also shows data sources and data sinks. Data sources are streaming data origins whose data you want to transport to a different destination. Examples include application logs and Linux system logs, as well as nontext data that can be supported with custom data sources. Data sinks are the destination of that data, which can be HDFS, flat files, and any data target that can be supported with custom data sinks. You’ll run Flume in pseudo-distributed mode, which means you’ll run the Flume Col- lector, Agent, and Master daemons on your single host. The first step is to install Flume, 1 the Flume Master, and the Flume Node packages from the CDH3 Installation Guide. After you’ve installed these packages, start up the Master and Agent daemons: /etc/init.d/flume-master start /etc/init.d/flume-node start Data sources Flume Data sinks Tail-able Agent Node files such Collector as logs Node (Apache, Agent Node HDFS syslog, ...) Collector Agent Node Node syslog UDP/TCP Sequence Agent Node Collector file Node Sequence file syslog Agent TCP stdout of Node server launched executable Text file Custom data sources Master Master Master Figure 2.5 Example of Flume deployment for collecting streaming data 1 Appendix A contains installation instructions and additional resources for working with Flume.TECHNIQUE 1 Pushing system log messages into HDFS with Flume 35 Host Node : Agent Source: tail("/var/log/messages") Sink: agentSink("localhost",35853) /var/log/messages Flume agent Flume collector Node: Collector Source : collectorSource(35853) Sink : collectorSink("hdfs://localhost:8020/tmp/ flume/collected/%Y/%m/%d/","%host-raw" HDFS Figure 2.6 Data flow from /var/log/messages into HDFS The data source in this exercise is the file /var/log/messages, the central file in Linux for system messages, and your ultimate data destination is HDFS. Figure 2.6 shows this data flow, and the Agent and Collector configuration settings you’ll use to make it work. By default, Flume will write data in Avro JSON format, which we’ll discuss shortly. You’ll want to preserve the original format of your syslog file, so you’ll need to create and edit flume-site.xml and indicate the raw output format. The file should look like this: cat /etc/flume/conf/flume-site.xml ?xml version="1.0"? ?xml-stylesheet type="text/xsl" href="configuration.xsl"? configuration property nameflume.collector.output.format/name valueraw/value /property /configuration If you’ve set up your cluster with LZO compression, you’ll need to create a flume-env.sh file and set the directory that contains the native compression codecs: cp /usr/lib/flume/bin/flume-env.sh.template \ /usr/lib/flume/bin/flume-env.sh vi /usr/lib/flume/bin/flume-env.sh add the following line for 64-bit environments export JAVA_LIBRARY_PATH=/usr/lib/hadoop/lib/native/Linux-amd64-64 or the following line for 32-bit environments export JAVA_LIBRARY_PATH=/usr/lib/hadoop/lib/native/Linux-i386-32 www.allitebooks.com 36 CHAPTER 2 Moving data in and out of Hadoop Table 2.2 Flume UI endpoints Daemon URL Flume Master http://localhost:35871 Flume Node (Agent) http://localhost:35862 Flume Node (Collector) http://localhost:35863 You’ll also need to copy the LZO JAR into Flume’s lib directory: cp /usr/lib/hadoop/lib/hadoop-lzo-0.4.1 /usr/lib/flume/lib/ Flume runs as the flume user, so you need to make sure that it has permissions to read any data source files (such as files under /var/log, for example). Previously, when you launched the Flume Master and Node you were a Node short. Let’s launch another Node, which you’ll name collector, to perform the Collector duties: flume node_nowatch -n collector Each of the Flume daemons have embedded web user interfaces. If you’ve followed the previous instructions, table 2.2 shows the locations where they’ll be available. The advantage of using a Flume Master is that you can make configuration changes in a central location, and they’ll be pulled by the Flume Nodes. There are two ways you can make configuration changes in the Flume Master: using the UI or the Flume shell. I’ll show the configuration in the UI. You’ll need to configure the Agent and Collector Nodes according to the setup illustrated in figure 2.6. You’ll connect to the Flume Master UI, and select the config menu from the top, as highlighted in figure 2.7. The drop-down box contains all of the Nodes, and you’ll select the Agent node. The Agent node has the same name as Select this menu item to get to this screen. This drop-down contains a list of all the Agent and Collector Nodes connected to the master. 35853 is the port that the Collector listens on. Figure 2.7 Flume Agent Node configuration to followTECHNIQUE 1 Pushing system log messages into HDFS with Flume 37 Figure 2.8 Flume Collector Node configuration to follow the hostname that you’re running on—in my case cdh. You should see one other Node in the drop-down called collector, which you’ll configure next. For the Agent Node, you’ll specify that the data source is a tail of the syslog file and the data sink is the port that your Collector will run on. Now select the Collector from the drop-down, and in a similar fashion, set the data source, which is the local port that you’ll listen on, and the data sink, which is the final destination in HDFS for your log data. Figure 2.8 shows the configuration. The main Flume Master screen displays all of the Nodes and their configured data sources and sinks, as shown in figure 2.9. All the actions you just performed can be executed in the Flume shell. Here’s an example of how you can view the same information that you just saw in the UI. To bet- ter identify the text you entered, the shell prompts are surrounded by square brackets. Your first command is connect ... to connect to the Flume Master on port 35873 (the Figure 2.9 The main screen in the Flume Master UI after the configuration of your Nodes 38 CHAPTER 2 Moving data in and out of Hadoop default Flume Master port), and your second command is getconfigs, which dumps the current configuration for the Nodes: flume shell flume (disconnected) connect localhost:35873 flume localhost:35873:45678 getconfigs NODE FLOW SOURCE SINK collector default-flow collectorSource(35853) collectorSink(...) cdh default-flow tail("/var/log/messages") agentSink(...) After you make the configuration changes in the UI, they’ll be picked up by the Flume Nodes after a few seconds. The Agent will then start to pipe the syslog file, and feed the output to the Collector, which in turn will periodically write output into HDFS. Let’s examine HDFS to check on the progress: hadoop fs -lsr /tmp/flume /tmp/flume/collected /tmp/flume/collected/2011 /tmp/flume/collected/2011/11 /tmp/flume/collected/2011/11/20 /tmp/flume/collected/2011/11/20/cdh-raw20111120-133115126-... /tmp/flume/collected/2011/11/20/cdh-raw20111120-134449503-... /tmp/flume/collected/2011/11/20/cdh-raw20111120-135653300-... ... We went through a whole lot of work without discussing some of the key concepts involved in setting up the data pipeline. Let’s go back now and inspect the previous work in detail. The first thing we’ll look at is the definition of the data source. FLUME DATA SOURCES A data source is required for both the Agent and Collector Nodes; it determines where they collect their data. The Agent Node’s data source is your application or system data that you want to transfer to HDFS, and the Collector Node’s data source is the Agent’s data sink. Figure 2.10 shows a subset of data sources supported by the Agent Node. As you can see, Flume supports a number of Agent sources, a complete list of which can be viewed on the Cloudera website http://archive.cloudera.com/cdh/3/ flume/UserGuide/index.html_flume_source_catalog. Figure 2.7 uses the tail data Data source Available data sources text: text file consumed once, one event per line Flume agent tail: generates events for each additional line added for a file multitail: tail facility that supports multiple files syslogUdp: consumes syslog UDP messages Flume collector syslogTcp: consumes syslog TCP messages execPeriodic: arbitrary command periodically executed, entire output is one event execStream: arbitrary command execution, each line is a separate event exec: arbitrary command execution with options for periodic execution, and to determine event per line of whole of output HDFS scribe: consumes data generated by Scribe's collection system Figure 2.10 Flume Agent Node data sources supported by the Agent NodeTECHNIQUE 1 Pushing system log messages into HDFS with Flume 39 Table 2.3 Flume Agent data sink reliability modes Acronym Description E2E (end to end) Guarantees that once an event enters Flume, the event will make its way to the end data sink. DFO (disk failover) Events are persisted on local disk in the event that a failure occurs when attempt- ing to transmit the event to the downstream Node. Acknowledgement messages sent from downstream Nodes result in the persisted data being removed. BE (best effort) Events are dropped in the event of failed communication with a downstream Node. source as an example, which works well for text files that are appended and rotated, specifying the file whose output you want to capture: tail("/var/log/messages"). You can configure the tail data source to emit the complete file as events, or just start from the current end of the file. The default is to read the whole file. The multi- tail data source accepts a list of filenames, and the tailDir takes a directory name with a regular expression to filter files that should be tailed. Flume also supports TCP/UDP data sources that can receive logs from syslog. Their data source names are syslogUdp for syslogd, and syslogTcp for syslog-ng. In addition, Flume can periodically execute a command and capture its output as an event for processing, via the execPeriodic data source. And the exec data source gives you more control, allowing you to specify if each line of the processes output should be considered a separate message, or if the whole output should be considered the message. It also can be run periodically. Flume supports a variety of other data sources out of the box as well, and it can be extended with a custom data source, as documented at http://archive.cloudera.com/ cdh/3/flume/UserGuide/index.html_arbitrary_data_flows_and_custom_architectures. AGENT SINKS Agent sinks are destinations for an Agent’s data source. Flume offers three different levels of reliability guarantees, which are summarized in table 2.3. Flume also has three levels of availability, as described in table 2.4. Figure 2.11 shows how the Flume failover options work. These reliability and availability modes are combined to form nine separate Agent sink options, as shown in figure 2.12. Table 2.4 Flume failover options Failover mode Description None Configure each Agent to write to a single Collector. If the Collector goes down, the Agent waits until it comes back up again. Manually Configure each Agent with one or more Collectors in addition to the primary. If the specified failover primary Collector fails, the events will be routed to backup Collectors. Automatic failover In this mode the Flume Master will allocate failover Collectors for Agents whose Collectors fail. The advantage is that Flume balances the Agent/Collector event links to ensure that individual Collectors aren’t overwhelmed. This rebalancing also occurs in normal situations where Collectors are added and removed. 40 CHAPTER 2 Moving data in and out of Hadoop No failover Manually specified failover Automatic failover Agent Node Agent Node Agent Node Collector Node Collector Node Collector Node Agent Node Agent Node Agent Node Collector Node Agent Node Collector Node Agent Node Collector Node Agent Node Agent Node Agent Node Agent Node Collector Node Collector Node Collector Node Agent Node Agent Node Agent Node Figure 2.11 Flume failover architecture shows the three levels available. All of these Agent sinks take as arguments the Collector Node host, and the port that it’s listening on. In the example in figure 2.7 I utilized the default agentSink option, which guarantees end-to-end delivery, but has no failover support. The Collector is running on the same host, on port 35853: agentSink("localhost",35853) FLUME COLLECTOR DATA SINK Flume contains a single Collector data sink, called collectorSink, which you’ll config- ure to write to a directory on a local disk or HDFS. The sink takes two parameters: the directory, and the filename prefix for files written in that directory. Both of these argu- ments support the Flume functionality called output bucketing, which permits some macro substitutions. Let’s review how to configure the Collector sink: collectorSink("hdfs://localhost:8020/tmp/flume/collected/%Y/%m/%d/","%host-raw") 'DWDVRXUFH DJHQW)OXPH )OXPHFROOHFWRU VNQLVWQHJDHOEDOLDY +')6 PRGHV5HOLDELOLW\ %HVWHIIRUWIDLORYHUN'LVHQGR(QGW 1RQHDJHQW%(6LQNDJH')26LQNQWDJH((6LQNQW DJH6LQNQW WKHGHIDXOW )DLOYRHU LQVLQN 0DQXDOIDLORYHUDJHQW%(&KDLQDJH')2&KDQLQWDJH((&KDLQQW RSWLRQV)OXPH XWRPDWLFIDLORYHUDXWR%(&KDLQDXWR')2&KDLQKDLQDXWR((& LQNVJHQWV Figure 2.12 Flume Agent sinks that are availableTECHNIQUE 1 Pushing system log messages into HDFS with Flume 41 Data source Available options (set in /etc/flume/conf/flume-site.xml) Flume agent syslog: outputs events in a syslog-like format log4j: outputs events in a pattern similar to Hadoop's log4j pattern avrojson: this outputs data as json encoded by Avro Flume collector avrodata: this outputs data as Avro binary encoded data debug: used only for debugging raw: output only the event body, no metadata HDFS Figure 2.13 Flume Collector data sink supports a variety of output formats. The %Y, %m, and %d are date escape sequences that are substituted with the date at which the event was received. The%host is substituted with the agent host that generated the event. A full list of the escape sequences is available at http://archive.cloudera.com/ cdh/3/flume-0.9.1+1/UserGuide.html_output_bucketing. The Collection data sink supports a variety of output formats for events, some of which are shown in figure 2.13. Earlier in this chapter you created a flume-site.xml file and specified raw as your output format. By default Flume chooses avrojson, an example of which is shown in the following code. The body field contains the raw contents of a single line from the syslog: "body":"Nov 20 13:25:40 cdh aholmes: Flume test", "timestamp":1321813541326, "pri":"INFO", "nanos":2748053914369042, "host":"cdh", "fields": "AckTag":"20111120-132531557-0500.2748044145743042.00000145", "AckType":"msg", "AckChecksum":"\u0000\u0000\u0000\u00004\u0002?g", "tailSrcFile":"messages", "rolltag":"20111120-132532809-0500.2748045397574042.00000021" This approach uses Flume to show how to capture syslog appends and write them into HDFS. You can use this same approach for a variety of line-based text files. Summary We’ve used Flume on a single machine, using the default configuration settings, which assumes everything runs on the local host and on standard ports. In a fully dis- tributed setup the Node hosts would need to specify where the Flume Master is located in flume-site.xml, as the next example demonstrates. Consult the user guide for more details at http://goo.gl/8YNsU. 42 CHAPTER 2 Moving data in and out of Hadoop ?xml version="1.0"? ?xml-stylesheet type="text/xsl" href="configuration.xsl"? configuration property nameflume.master.servers/name valueflume-master1,flume-master2,flume-master3/value /property /configuration How do you determine the number of Flume masters that you should run? Let's say you want to be able to support the failure of two master Flume nodes. To figure out the number of Flume masters you should run, take the number of Flume master nodes that could fail, double it, and add 1. Flume uses an embedded ZooKeeper in each of the Master daemons, but you can configure it to use an external ZooKeeper if one already exists in your environment. If you’re capturing Apache logs, you can configure the web server to launch a Flume ad hoc Node and pipe the log output directly to Flume. If that Node is talking to a remote Collector, then, unfortunately, the client Node can’t be configured for automatic failover or end-to-end reliability, because the Node can’t be managed by the Flume Master. The workaround to this is to have the ad hoc Node forward to a local Flume Agent Node, which can have these reliability and failover properties. Flume also includes a log4j appender, and you can find more details on this at http:// archive.cloudera.com/cdh/3/flume/UserGuide/index.html_logging_via_log4j_directly. Using Flume for log distribution has many advantages over its peers, primarily because of its high level of documentation, its ease of use, and its customizable reli- ability modes. We’ve looked at an automated way to shuttle log data into HDFS. But now imagine that the data you want to move into Hadoop isn’t log data, but instead data that these tools have a harder time working with, such as semistructured or binary files. 2.2.2 Pushing and pulling semistructured and binary files You’ve learned how to use log collecting tools like Flume to automate moving data into HDFS. But these tools don’t support working with semistructured or binary data out of the box. This section looks at techniques to help you automate moving such files into HDFS. Production networks typically have network silos where your Hadoop clusters are segmented away from other production applications. In such cases it’s possible that your Hadoop cluster isn’t able to pull data from other data sources, leaving you only the option to push data into Hadoop. You need a mechanism to automate the process of copying files of any format into HDFS, similar to the Linux tool rsync. The mechanism should be able to compress files written in HDFS and offer a way to dynamically determine the HDFS destination for data partitioning purposes.TECHNIQUE 2 An automated mechanism to copy files into HDFS 43 TECHNIQUE 2 An automated mechanism to copy files into HDFS Existing file transportation mechanisms such as Flume, Scribe, and Chukwa are geared towards supporting log files. What if you have different file formats for your files, such as semistructured or binary? If the files were siloed in a way that the Hadoop slave nodes couldn’t directly access, then you couldn’t use Oozie to help with file ingress either. Problem You need to automate the process by which files on remote servers are copied into HDFS. Solution The HDFS File Slurper open source project can copy files of any format in and out of HDFS. This technique covers how it can be configured and used to copy data into HDFS. Discussion 2 You can use the HDFS File Slurper project that I wrote to assist with your automation. The HDFS File Slurper is a simple utility that supports copying files from a local direc- tory into HDFS and vice versa. Figure 2.14 provides a high-level overview of the Slurper (my nickname for the project), with an example of how you can use it to copy files. The Slurper reads any files that exist in a source directory and optionally consults with a script to determine the file placement in the destination directory. It then writes the file to the destination, after which there’s an optional verification step. Finally, the Client host Source Completed directory directory Scan Relocate file   Verify  HDFS HDFS file slurper Write  Determine  HDFS destination HDFS file location script Figure 2.14 The five-step process of the HDFS File Slurper data flow for copying files 2 See https://github.com/alexholmes/hdfs-file-slurper. 44 CHAPTER 2 Moving data in and out of Hadoop Slurper moves the source file to a completed folder upon successful completion of all of the previous steps. With this technique there are a few challenges you need to make sure you address: ■ How do you effectively partition your writes to HDFS so that you don’t lump everything into a single directory? ■ How do you determine that your data is ready in HDFS for processing in order to avoid reading files that are mid-copy? ■ How do you automate regular execution of your utility? The first step is to download and build the code. The following assumes that you have git, Java, and version 3.0 or newer of Maven installed locally: git clone git://github.com/alexholmes/hdfs-file-slurper.git cd hdfs-file-slurper/ mvn package Next you’ll need to untar the tarball that the build created under /usr/local: sudo tar -xzf target/hdfs-slurper-version-package.tar.gz \ -C /usr/local/ sudo ln -s /usr/local/hdfs-slurper-version /usr/local/hdfs-slurper CONFIGURATION Before you can run the code you’ll need to edit /usr/local/hdfs-slurper/conf/slurper- env.sh and set the Java home and Hadoop home directories. The Slurper comes bundled with a /usr/local/hdfs-slurper/conf/slurper.conf, which contains details on the source and destination directory, along with other options. The file contains the following default settings, which you can change: A name for the data being transferred. This is used for the log filename when launched via the Linux init daemon management system, which we’ll cover shortly. The source directory. Any files that are moved into here are automatically copied to the DATASOURCE_NAME = test destination directory (with an intermediary hop to the staging directory). The work directory. Files from the source directory are moved SRC_DIR = file:/tmp/slurper/in into this directory before the copy to the destination starts. After the copy has completed, the file is moved from the work WORK_DIR = file:/tmp/slurper/work directory into the complete directory. Alternatively the REMOVE_AFTER_COPY setting can be used to delete the source COMPLETE_DIR = file:/tmp/slurper/complete file, in which case the COMPLETE_DIR setting shouldn’t be supplied. TECHNIQUE 2 An automated mechanism to copy files into HDFS 45 ERROR_DIR = file:/tmp/slurper/error Any errors encountered during the copy result Staging directory. Files are first copied in the source file being into this directory in the destination file moved into this directory. system, and then the slurper performs an DEST_DIR = hdfs:/tmp/slurper/dest atomic move into the destination after the file has been copied. Final destination directory for DEST_STAGING_DIR = hdfs:/tmp/slurper/stage source files. You’ll notice that all of the directory names are HDFS URIs. HDFS distinguishes between different filesystems in this way. The file:/ URI denotes a path on the local filesystem, and the hdfs:/ URI denotes a path in HDFS. In fact, the Slurper supports any Hadoop filesystem, as long as you configure Hadoop to use it. RUNNING Let’s use the default settings, create a local directory called /tmp/slurper/in, write an empty file into it, and run the utility. If you’re running your environment on a Hadoop distribution other than CDH, the HADOOP_HOME environment variable needs to be set with the location of your Hadoop installation: mkdir -p /tmp/slurper/in touch /tmp/slurper/in/test-file.txt cd /usr/local/hdfs-slurper/ bin/slurper.sh config-file conf/slurper.conf Copying source file 'file:/tmp/slurper/work/test-file.txt' to staging destination 'hdfs:/tmp/slurper/stage/1354823335' Moving staging file 'hdfs:/tmp/slurper/stage/1354823335' to destination 'hdfs:/tmp/slurper/dest/test-file.txt' File copy successful, moving source file:/tmp/slurper/work/test-file.txt to completed file file:/tmp/slurper/complete/test-file.txt hadoop fs -ls /tmp/slurper/dest /tmp/slurper/dest/test-file.txt A key feature in the Slurper’s design is that it doesn’t work with partially written files. Files must be atomically moved into the source directory (file moves in both the 3 Linux and HDFS filesystems are atomic). Alternatively, you can write to a filename that starts with a period (.), which is ignored by the Slurper, and after the file write completes, you’d rename the file to a name without the period prefix. Another key consideration with the Slurper is the assurance that files being copied are globally unique. If they aren’t, the Slurper will overwrite that file in HDFS, which is likely an undesirable outcome. 3 Moving files is atomic only if both the source and destination are on the same partition. In other words, moving a file from a NFS mount to a local disk results in a copy, which isn’t atomic. www.allitebooks.com 46 CHAPTER 2 Moving data in and out of Hadoop DYNAMIC DESTINATION PATHS The previous approach works well if you’re working with a small number of files that you’re moving into HDFS on a daily basis. But if you’re dealing with a large volume of files you want to think about partitioning them into separate directories. This has the benefit of giving you more finely grained control over the input data for your MapReduce jobs, as well as helping with the overall organization of your data in the filesystem (you wouldn’t want all the files on your computer to reside in a single flat directory). How can you have more dynamic control over the destination directory and the filename that the Slurper uses? The Slurper configuration file (slurper.conf) has a SCRIPT option (which is mutually exclusive of the DEST_DIR option), where you can specify a script that can provide that dynamic mapping of the source files to destina- tion files. Let’s assume that the files you’re working with contain a date in the filename, and you’ve decided that you want to organize your data in HDFS by date. Let’s write a script that can perform this mapping activity. The following example is a Python script that does this: /usr/bin/python import sys, os, re read the local file from standard input input_file=sys.stdin.readline() extract the filename from the file filename = os.path.basename(input_file) extract the date from the filename match=re.search(r'(0-94)(0-92)(0-92)', filename) year=match.group(1) mon=match.group(2) day=match.group(3) construct our destination HDFS file hdfs_dest="hdfs:/data/%s/%s/%s/%s" % (year, mon, day, filename) write it to standard output print hdfs_dest, Now you can update /usr/local/hdfs-slurper/conf/slurper.conf, set SCRIPT, and com- ment out DEST_DIR, which results in the following changes to the file: DEST_DIR = hdfs:/tmp/slurper/dest SCRIPT = /usr/local/hdfs-slurper/bin/sample-python.py Run the Slurper again and see what happens:

Advise: Why You Wasting Money in Costly SEO Tools, Use World's Best Free SEO Tool Ubersuggest.