How to design Hbase schema

data modeling considerations in hadoop and hive and how to create hbase schema and how to check hbase schema and how to create schema in hbase
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Data Modeling in Hadoop At its core, Hadoop is a distributed data store that provides a platform for implement‐ ing powerful parallel processing frameworks. The reliability of this data store when it comes to storing massive volumes of data, coupled with its flexibility in running mul‐ tiple processing frameworks makes it an ideal choice for your data hub. This charac‐ teristic of Hadoop means that you can store any type of data as is, without placing any constraints on how that data is processed. A common term one hears in the context of Hadoop is Schema-on-Read. This simply refers to the fact that raw, unprocessed data can be loaded into Hadoop, with the structure imposed at processing time based on the requirements of the processing application. This is different from Schema-on-Write, which is generally used with traditional data management systems. Such systems require the schema of the data store to be defined before the data can be loaded. This leads to lengthy cycles of analysis, data modeling, data transformation, loading, testing, and so on before data can be accessed. Further‐ more, if a wrong decision is made or requirements change, this cycle must start again. When the application or structure of data is not as well understood, the agility pro‐ vided by the Schema-on-Read pattern can provide invaluable insights on data not previously accessible. Relational databases and data warehouses are often a good fit for well-understood and frequently accessed queries and reports on high-value data. Increasingly, though, Hadoop is taking on many of these workloads, particularly for queries that need to operate on volumes of data that are not economically or technically practical to pro‐ cess with traditional systems. 1 Although being able to store all of your raw data is a powerful feature, there are still many factors that you should take into consideration before dumping your data into Hadoop. These considerations include: Data storage formats There are a number of file formats and compression formats supported on Hadoop. Each has particular strengths that make it better suited to specific appli‐ cations. Additionally, although Hadoop provides the Hadoop Distributed File System (HDFS) for storing data, there are several commonly used systems imple‐ mented on top of HDFS, such as HBase for additional data access functionality and Hive for additional data management functionality. Such systems need to be taken into consideration as well. Multitenancy It’s common for clusters to host multiple users, groups, and application types. Supporting multitenant clusters involves a number of important considerations when you are planning how data will be stored and managed. Schema design Despite the schema-less nature of Hadoop, there are still important considera‐ tions to take into account around the structure of data stored in Hadoop. This includes directory structures for data loaded into HDFS as well as the output of data processing and analysis. This also includes the schemas of objects stored in systems such as HBase and Hive. Metadata management As with any data management system, metadata related to the stored data is often as important as the data itself. Understanding and making decisions related to metadata management are critical. We’ll discuss these items in this chapter. Note that these considerations are funda‐ mental to architecting applications on Hadoop, which is why we’re covering them early in the book. Another important factor when you’re making storage decisions with Hadoop, but one that’s beyond the scope of this book, is security and its associated considerations. This includes decisions around authentication, fine-grained access control, and encryption—both for data on the wire and data at rest. For a comprehensive discus‐ sion of security with Hadoop, see Hadoop Security by Ben Spivey and Joey Echeverria (O’Reilly). Data Storage Options One of the most fundamental decisions to make when you are architecting a solution on Hadoop is determining how data will be stored in Hadoop. There is no such thing 2 Chapter 1: Data Modeling in Hadoop as a standard data storage format in Hadoop. Just as with a standard filesystem, Hadoop allows for storage of data in any format, whether it’s text, binary, images, or something else. Hadoop also provides built-in support for a number of formats opti‐ mized for Hadoop storage and processing. This means users have complete control and a number of options for how data is stored in Hadoop. This applies to not just the raw data being ingested, but also intermediate data generated during data pro‐ cessing and derived data that’s the result of data processing. This, of course, also means that there are a number of decisions involved in determining how to optimally store your data. Major considerations for Hadoop data storage include: File format There are multiple formats that are suitable for data stored in Hadoop. These include plain text or Hadoop-specific formats such as SequenceFile. There are also more complex but more functionally rich options, such as Avro and Parquet. These different formats have different strengths that make them more or less suitable depending on the application and source-data types. It’s possible to cre‐ ate your own custom file format in Hadoop, as well. Compression This will usually be a more straightforward task than selecting file formats, but it’s still an important factor to consider. Compression codecs commonly used with Hadoop have different characteristics; for example, some codecs compress and uncompress faster but don’t compress as aggressively, while other codecs cre‐ ate smaller files but take longer to compress and uncompress, and not surpris‐ ingly require more CPU. The ability to split compressed files is also a very important consideration when you’re working with data stored in Hadoop—we’ll discuss splittability considerations further later in the chapter. Data storage system While all data in Hadoop rests in HDFS, there are decisions around what the underlying storage manager should be—for example, whether you should use HBase or HDFS directly to store the data. Additionally, tools such as Hive and Impala allow you to define additional structure around your data in Hadoop. Before beginning a discussion on data storage options for Hadoop, we should note a couple of things: • We’ll cover different storage options in this chapter, but more in-depth discus‐ sions on best practices for data storage are deferred to later chapters. For exam‐ ple, when we talk about ingesting data into Hadoop we’ll talk more about considerations for storing that data. • Although we focus on HDFS as the Hadoop filesystem in this chapter and throughout the book, we’d be remiss in not mentioning work to enable alternate filesystems with Hadoop. This includes open source filesystems such as Glus‐ Data Storage Options 3 terFS and the Quantcast File System, and commercial alternatives such as Isilon OneFS and NetApp. Cloud-based storage systems such as Amazon’s Simple Stor‐ age System (S3) are also becoming common. The filesystem might become yet another architectural consideration in a Hadoop deployment. This should not, however, have a large impact on the underlying considerations that we’re discus‐ sing here. Standard File Formats We’ll start with a discussion on storing standard file formats in Hadoop—for exam‐ ple, text files (such as comma-separated value CSV or XML) or binary file types (such as images). In general, it’s preferable to use one of the Hadoop-specific con‐ tainer formats discussed next for storing data in Hadoop, but in many cases you’ll want to store source data in its raw form. As noted before, one of the most powerful features of Hadoop is the ability to store all of your data regardless of format. Having online access to data in its raw, source form—“full fidelity” data—means it will always be possible to perform new processing and analytics with the data as requirements change. The following discussion provides some considerations for storing standard file formats in Hadoop. Text data A very common use of Hadoop is the storage and analysis of logs such as web logs and server logs. Such text data, of course, also comes in many other forms: CSV files, or unstructured data such as emails. A primary consideration when you are storing text data in Hadoop is the organization of the files in the filesystem, which we’ll dis‐ cuss more in the section “HDFS Schema Design” on page 14. Additionally, you’ll want to select a compression format for the files, since text files can very quickly con‐ sume considerable space on your Hadoop cluster. Also, keep in mind that there is an overhead of type conversion associated with storing data in text format. For example, storing 1234 in a text file and using it as an integer requires a string-to-integer con‐ version during reading, and vice versa during writing. It also takes up more space to store 1234 as text than as an integer. This overhead adds up when you do many such conversions and store large amounts of data. Selection of compression format will be influenced by how the data will be used. For archival purposes you may choose the most compact compression available, but if the data will be used in processing jobs such as MapReduce, you’ll likely want to select a splittable format. Splittable formats enable Hadoop to split files into chunks for pro‐ cessing, which is critical to efficient parallel processing. We’ll discuss compression types and considerations, including the concept of splittability, later in this chapter. Note also that in many, if not most cases, the use of a container format such as SequenceFiles or Avro will provide advantages that make it a preferred format for 4 Chapter 1: Data Modeling in Hadoop most file types, including text; among other things, these container formats provide functionality to support splittable compression. We’ll also be covering these container formats later in this chapter. Structured text data A more specialized form of text files is structured formats such as XML and JSON. These types of formats can present special challenges with Hadoop since splitting XML and JSON files for processing is tricky, and Hadoop does not provide a built-in InputFormat for either. JSON presents even greater challenges than XML, since there are no tokens to mark the beginning or end of a record. In the case of these formats, you have a couple of options: • Use a container format such as Avro. Transforming the data into Avro can pro‐ vide a compact and efficient way to store and process the data. • Use a library designed for processing XML or JSON files. Examples of this for XML include XMLLoader in the PiggyBank library for Pig. For JSON, the Ele‐ phant Bird project provides the LzoJsonInputFormat. For more details on pro‐ cessing these formats, see the book Hadoop in Practice by Alex Holmes (Manning), which provides several examples for processing XML and JSON files with MapReduce. Binary data Although text is typically the most common source data format stored in Hadoop, you can also use Hadoop to process binary files such as images. For most cases of storing and processing binary files in Hadoop, using a container format such as SequenceFile is preferred. If the splittable unit of binary data is larger than 64 MB, you may consider putting the data in its own file, without using a container format. Hadoop File Types There are several Hadoop-specific file formats that were specifically created to work well with MapReduce. These Hadoop-specific file formats include file-based data structures such as sequence files, serialization formats like Avro, and columnar for‐ mats such as RCFile and Parquet. These file formats have differing strengths and weaknesses, but all share the following characteristics that are important for Hadoop applications: Splittable compression These formats support common compression formats and are also splittable. We’ll discuss splittability more in the section “Compression” on page 12, but note that the ability to split files can be a key consideration for storing data in Hadoop Data Storage Options 5 because it allows large files to be split for input to MapReduce and other types of jobs. The ability to split a file for processing by multiple tasks is of course a fun‐ damental part of parallel processing, and is also key to leveraging Hadoop’s data locality feature. Agnostic compression The file can be compressed with any compression codec, without readers having to know the codec. This is possible because the codec is stored in the header met‐ adata of the file format. We’ll discuss the file-based data structures in this section, and subsequent sections will cover serialization formats and columnar formats. File-based data structures The SequenceFile format is one of the most commonly used file-based formats in Hadoop, but other file-based formats are available, such as MapFiles, SetFiles, Array‐ Files, and BloomMapFiles. Because these formats were specifically designed to work with MapReduce, they offer a high level of integration for all forms of MapReduce jobs, including those run via Pig and Hive. We’ll cover the SequenceFile format here, because that’s the format most commonly employed in implementing Hadoop jobs. For a more complete discussion of the other formats, refer to Hadoop: The Definitive Guide. SequenceFiles store data as binary key-value pairs. There are three formats available for records stored within SequenceFiles: Uncompressed For the most part, uncompressed SequenceFiles don’t provide any advantages over their compressed alternatives, since they’re less efficient for input/output (I/O) and take up more space on disk than the same data in compressed form. Record-compressed This format compresses each record as it’s added to the file. Block-compressed This format waits until data reaches block size to compress, rather than as each record is added. Block compression provides better compression ratios compared to record-compressed SequenceFiles, and is generally the preferred compression option for SequenceFiles. Also, the reference to block here is unrelated to the HDFS or filesystem block. A block in block compression refers to a group of records that are compressed together within a single HDFS block. Regardless of format, every SequenceFile uses a common header format containing basic metadata about the file, such as the compression codec used, key and value class names, user-defined metadata, and a randomly generated sync marker. This sync 6 Chapter 1: Data Modeling in Hadoop marker is also written into the body of the file to allow for seeking to random points in the file, and is key to facilitating splittability. For example, in the case of block com‐ pression, this sync marker will be written before every block in the file. SequenceFiles are well supported within the Hadoop ecosystem, however their sup‐ port outside of the ecosystem is limited. They are also only supported in Java. A com‐ mon use case for SequenceFiles is as a container for smaller files. Storing a large number of small files in Hadoop can cause a couple of issues. One is excessive mem‐ ory use for the NameNode, because metadata for each file stored in HDFS is held in memory. Another potential issue is in processing data in these files—many small files can lead to many processing tasks, causing excessive overhead in processing. Because Hadoop is optimized for large files, packing smaller files into a SequenceFile makes the storage and processing of these files much more efficient. For a more complete discussion of the small files problem with Hadoop and how SequenceFiles provide a solution, refer to Hadoop:The Definitive Guide. Figure 1-1 shows an example of the file layout for a SequenceFile using block com‐ pression. An important thing to note in this diagram is the inclusion of the sync marker before each block of data, which allows readers of the file to seek to block boundaries. Figure 1-1. An example of a SequenceFile using block compression Serialization Formats Serialization refers to the process of turning data structures into byte streams either for storage or transmission over a network. Conversely, deserialization is the process of converting a byte stream back into data structures. Serialization is core to a dis‐ tributed processing system such as Hadoop, since it allows data to be converted into a Data Storage Options 7format that can be efficiently stored as well as transferred across a network connec‐ tion. Serialization is commonly associated with two aspects of data processing in dis‐ tributed systems: interprocess communication (remote procedure calls, or RPC) and data storage. For purposes of this discussion we’re not concerned with RPC, so we’ll focus on the data storage aspect in this section. The main serialization format utilized by Hadoop is Writables. Writables are compact and fast, but not easy to extend or use from languages other than Java. There are, however, other serialization frameworks seeing increased use within the Hadoop eco‐ system, including Thrift, Protocol Buffers, and Avro. Of these, Avro is the best suited, because it was specifically created to address limitations of Hadoop Writables. We’ll examine Avro in more detail, but let’s first briefly cover Thrift and Protocol Buffers. Thrift Thrift was developed at Facebook as a framework for implementing cross-language interfaces to services. Thrift uses an Interface Definition Language (IDL) to define interfaces, and uses an IDL file to generate stub code to be used in implementing RPC clients and servers that can be used across languages. Using Thrift allows us to imple‐ ment a single interface that can be used with different languages to access different underlying systems. The Thrift RPC layer is very robust, but for this chapter, we’re only concerned with Thrift as a serialization framework. Although sometimes used for data serialization with Hadoop, Thrift has several drawbacks: it does not support internal compression of records, it’s not splittable, and it lacks native MapReduce support. Note that there are externally available libraries such as the Elephant Bird project to address these drawbacks, but Hadoop does not provide native support for Thrift as a data storage format. ProtocolBuffers The Protocol Buffer (protobuf) format was developed at Google to facilitate data exchange between services written in different languages. Like Thrift, protobuf struc‐ tures are defined via an IDL, which is used to generate stub code for multiple lan‐ guages. Also like Thrift, Protocol Buffers do not support internal compression of records, are not splittable, and have no native MapReduce support. But also like Thrift, the Elephant Bird project can be used to encode protobuf records, providing support for MapReduce, compression, and splittability. Avro Avro is a language-neutral data serialization system designed to address the major downside of Hadoop Writables: lack of language portability. Like Thrift and Protocol Buffers, Avro data is described through a language-independent schema. Unlike Thrift and Protocol Buffers, code generation is optional with Avro. Since Avro stores the schema in the header of each file, it’s self-describing and Avro files can easily be 8 Chapter 1: Data Modeling in Hadoop read later, even from a different language than the one used to write the file. Avro also provides better native support for MapReduce since Avro data files are compressible and splittable. Another important feature of Avro that makes it superior to Sequence‐ Files for Hadoop applications is support for schema evolution; that is, the schema used to read a file does not need to match the schema used to write the file. This makes it possible to add new fields to a schema as requirements change. Avro schemas are usually written in JSON, but may also be written in Avro IDL, which is a C-like language. As just noted, the schema is stored as part of the file meta‐ data in the file header. In addition to metadata, the file header contains a unique sync marker. Just as with SequenceFiles, this sync marker is used to separate blocks in the file, allowing Avro files to be splittable. Following the header, an Avro file contains a series of blocks containing serialized Avro objects. These blocks can optionally be compressed, and within those blocks, types are stored in their native format, provid‐ ing an additional boost to compression. At the time of writing, Avro supports Snappy and Deflate compression. While Avro defines a small number of primitive types such as Boolean, int, float, and string, it also supports complex types such as array, map, and enum. Columnar Formats Until relatively recently, most database systems stored records in a row-oriented fash‐ ion. This is efficient for cases where many columns of the record need to be fetched. For example, if your analysis heavily relied on fetching all fields for records that belonged to a particular time range, row-oriented storage would make sense. This option can also be more efficient when you’re writing data, particularly if all columns of the record are available at write time because the record can be written with a sin‐ gle disk seek. More recently, a number of databases have introduced columnar stor‐ age, which provides several benefits over earlier row-oriented systems: • Skips I/O and decompression (if applicable) on columns that are not a part of the query. • Works well for queries that only access a small subset of columns. If many col‐ umns are being accessed, then row-oriented is generally preferable. • Is generally very efficient in terms of compression on columns because entropy within a column is lower than entropy within a block of rows. In other words, data is more similar within the same column, than it is in a block of rows. This can make a huge difference especially when the column has few distinct values. • Is often well suited for data-warehousing-type applications where users want to aggregate certain columns over a large collection of records. Data Storage Options 9 Not surprisingly, columnar file formats are also being utilized for Hadoop applica‐ tions. Columnar file formats supported on Hadoop include the RCFile format, which has been popular for some time as a Hive format, as well as newer formats such as the Optimized Row Columnar (ORC) and Parquet, which are described next. RCFile The RCFile format was developed specifically to provide efficient processing for Map‐ Reduce applications, although in practice it’s only seen use as a Hive storage format. The RCFile format was developed to provide fast data loading, fast query processing, and highly efficient storage space utilization. The RCFile format breaks files into row splits, then within each split uses column-oriented storage. Although the RCFile format provides advantages in terms of query and compression performance compared to SequenceFiles, it also has some deficiencies that prevent optimal performance for query times and compression. Newer columnar formats such as ORC and Parquet address many of these deficiencies, and for most newer applications, they will likely replace the use of RCFile. RCFile is still a fairly common format used with Hive storage. ORC The ORC format was created to address some of the shortcomings with the RCFile format, specifically around query performance and storage efficiency. The ORC for‐ mat provides the following features and benefits, many of which are distinct improve‐ ments over RCFile: • Provides lightweight, always-on compression provided by type-specific readers and writers. ORC also supports the use of zlib, LZO, or Snappy to provide further compression. • Allows predicates to be pushed down to the storage layer so that only required data is brought back in queries. • Supports the Hive type model, including new primitives such as decimal and complex types. • Is a splittable storage format. A drawback of ORC as of this writing is that it was designed specifically for Hive, and so is not a general-purpose storage format that can be used with non-Hive MapRe‐ duce interfaces such as Pig or Java, or other query engines such as Impala. Work is under way to address these shortcomings, though. 10 Chapter 1: Data Modeling in HadoopParquet Parquet shares many of the same design goals as ORC, but is intended to be a general-purpose storage format for Hadoop. In fact, ORC came after Parquet, so some could say that ORC is a Parquet wannabe. As such, the goal is to create a format that’s suitable for different MapReduce interfaces such as Java, Hive, and Pig, and also suitable for other processing engines such as Impala and Spark. Parquet provides the following benefits, many of which it shares with ORC: • Similar to ORC files, Parquet allows for returning only required data fields, thereby reducing I/O and increasing performance. • Provides efficient compression; compression can be specified on a per-column level. • Is designed to support complex nested data structures. • Stores full metadata at the end of files, so Parquet files are self-documenting. • Fully supports being able to read and write to with Avro and Thrift APIs. • Uses efficient and extensible encoding schemas—for example, bit-packaging/run length encoding (RLE). Avro and Parquet. Over time, we have learned that there is great value in having a sin‐ gle interface to all the files in your Hadoop cluster. And if you are going to pick one file format, you will want to pick one with a schema because, in the end, most data in Hadoop will be structured or semistructured data. So if you need a schema, Avro and Parquet are great options. However, we don’t want to have to worry about making an Avro version of the schema and a Parquet version. Thankfully, this isn’t an issue because Parquet can be read and written to with Avro APIs and Avro schemas. This means we can have our cake and eat it too. We can meet our goal of having one interface to interact with our Avro and Parquet files, and we can have a block and columnar options for storing our data. Comparing Failure Behavior forDifferent File Formats An important aspect of the various file formats is failure handling; some formats han‐ dle corruption better than others: • Columnar formats, while often efficient, do not work well in the event of failure, since this can lead to incomplete rows. • Sequence files will be readable to the first failed row, but will not be recoverable after that row. Data Storage Options 11 • Avro provides the best failure handling; in the event of a bad record, the read will continue at the next sync point, so failures only affect a portion of a file. Compression Compression is another important consideration for storing data in Hadoop, not just in terms of reducing storage requirements, but also to improve data processing per‐ formance. Because a major overhead in processing large amounts of data is disk and network I/O, reducing the amount of data that needs to be read and written to disk can significantly decrease overall processing time. This includes compression of source data, but also the intermediate data generated as part of data processing (e.g., MapReduce jobs). Although compression adds CPU load, for most cases this is more than offset by the savings in I/O. Although compression can greatly optimize processing performance, not all com‐ pression formats supported on Hadoop are splittable. Because the MapReduce frame‐ work splits data for input to multiple tasks, having a nonsplittable compression format is an impediment to efficient processing. If files cannot be split, that means the entire file needs to be passed to a single MapReduce task, eliminating the advantages of parallelism and data locality that Hadoop provides. For this reason, splittability is a major consideration in choosing a compression format as well as file format. We’ll discuss the various compression formats available for Hadoop, and some considera‐ tions in choosing between them. Snappy Snappy is a compression codec developed at Google for high compression speeds with reasonable compression. Although Snappy doesn’t offer the best compression sizes, it does provide a good trade-off between speed and size. Processing perfor‐ mance with Snappy can be significantly better than other compression formats. It’s important to note that Snappy is intended to be used with a container format like SequenceFiles or Avro, since it’s not inherently splittable. LZO LZO is similar to Snappy in that it’s optimized for speed as opposed to size. Unlike Snappy, LZO compressed files are splittable, but this requires an additional indexing step. This makes LZO a good choice for things like plain-text files that are not being stored as part of a container format. It should also be noted that LZO’s license pre‐ vents it from being distributed with Hadoop and requires a separate install, unlike Snappy, which can be distributed with Hadoop. 12 Chapter 1: Data Modeling in HadoopGzip Gzip provides very good compression performance (on average, about 2.5 times the compression that’d be offered by Snappy), but its write speed performance is not as good as Snappy’s (on average, it’s about half of Snappy’s). Gzip usually performs almost as well as Snappy in terms of read performance. Gzip is also not splittable, so it should be used with a container format. Note that one reason Gzip is sometimes slower than Snappy for processing is that Gzip compressed files take up fewer blocks, so fewer tasks are required for processing the same data. For this reason, using smaller blocks with Gzip can lead to better performance. bzip2 bzip2 provides excellent compression performance, but can be significantly slower than other compression codecs such as Snappy in terms of processing performance. Unlike Snappy and Gzip, bzip2 is inherently splittable. In the examples we have seen, bzip2 will normally compress around 9% better than GZip, in terms of storage space. However, this extra compression comes with a significant read/write perfor‐ mance cost. This performance difference will vary with different machines, but in general bzip2 is about 10 times slower than GZip. For this reason, it’s not an ideal codec for Hadoop storage, unless your primary need is reducing the storage foot‐ print. One example of such a use case would be using Hadoop mainly for active archival purposes. Compression recommendations In general, any compression format can be made splittable when used with container file formats (Avro, SequenceFiles, etc.) that compress blocks of records or each record individually. If you are doing compression on the entire file without using a container file format, then you have to use a compression format that inherently supports split‐ ting (e.g., bzip2, which inserts synchronization markers between blocks). Here are some recommendations on compression in Hadoop: • Enable compression of MapReduce intermediate output. This will improve per‐ formance by decreasing the amount of intermediate data that needs to be read and written to and from disk. • Pay attention to how data is ordered. Often, ordering data so that like data is close together will provide better compression levels. Remember, data in Hadoop file formats is compressed in chunks, and it is the entropy of those chunks that will determine the final compression. For example, if you have stock ticks with the columns timestamp, stock ticker, and stock price, then ordering the data by a repeated field, such as stock ticker, will provide better compression than ordering by a unique field, such as time or stock price. Data Storage Options 13 • Consider using a compact file format with support for splittable compression, such as Avro. Figure 1-2 illustrates how Avro or SequenceFiles support splittabil‐ ity with otherwise nonsplittable compression formats. A single HDFS block can contain multiple Avro or SequenceFile blocks. Each of the Avro or SequenceFile blocks can be compressed and decompressed individually and independently of any other Avro/SequenceFile blocks. This, in turn, means that each of the HDFS blocks can be compressed and decompressed individually, thereby making the data splittable. Figure 1-2. An example of compression with Avro HDFS Schema Design As pointed out in the previous section, HDFS and HBase are two very commonly used storage managers. Depending on your use case, you can store your data in HDFS or HBase (which internally stores it on HDFS). In this section, we will describe the considerations for good schema design for data that you decide to store in HDFS directly. As mentioned earlier, Hadoop’s Schema- on-Read model does not impose any requirements when loading data into Hadoop. Data can be simply ingested into HDFS by one of many methods (which we will dis‐ cuss further in Chapter 2) without our having to associate a schema or preprocess the data. While many people use Hadoop for storing and processing unstructured data (such as images, videos, emails, or blog posts) or semistructured data (such as XML docu‐ 14 Chapter 1: Data Modeling in Hadoop ments and logfiles), some order is still desirable. This is especially true since Hadoop often serves as a data hub for the entire organization, and the data stored in HDFS is intended to be shared among many departments and teams. Creating a carefully structured and organized repository of your data will provide many benefits. To list a few: • A standard directory structure makes it easier to share data between teams work‐ ing with the same data sets. • It also allows for enforcing access and quota controls to prevent accidental dele‐ tion or corruption. • Oftentimes, you’d “stage” data in a separate location before all of it was ready to be processed. Conventions regarding staging data will help ensure that partially loaded data will not get accidentally processed as if it were complete. • Standardized organization of data allows for reuse of some code that may process the data. • Some tools in the Hadoop ecosystem sometimes make assumptions regarding the placement of data. It is often simpler to match those assumptions when you are initially loading data into Hadoop. The details of the data model will be highly dependent on the specific use case. For example, data warehouse implementations and other event stores are likely to use a schema similar to the traditional star schema, including structured fact and dimen‐ sion tables. Unstructured and semistructured data, on the other hand, are likely to focus more on directory placement and metadata management. The important points to keep in mind when designing the schema, regardless of the project specifics, are: • Develop standard practices and enforce them, especially when multiple teams are sharing the data. • Make sure your design will work well with the tools you are planning to use. For example, the version of Hive you are planning to use may only support table par‐ titions on directories that are named a certain way. This will impact the schema design in general and how you name your table subdirectories, in particular. • Keep usage patterns in mind when designing a schema. Different data processing and querying patterns work better with different schema designs. Understanding the main use cases and data retrieval requirements will result in a schema that will be easier to maintain and support in the long term as well as improve data processing performance. HDFS Schema Design 15Location of HDFS Files To talk in more concrete terms, the first decisions to make when you’re designing an HDFS schema is the location of the files. Standard locations make it easier to find and share data between teams. The following is an example HDFS directory structure that we recommend. This directory structure simplifies the assignment of permissions to various groups and users: /user/username Data, JARs, and configuration files that belong only to a specific user. This is usu‐ ally scratch type data that the user is currently experimenting with but is not part of a business process. The directories under /user will typically only be readable and writable by the users who own them. /etl Data in various stages of being processed by an ETL (extract, transform, and load) workflow. The /etl directory will be readable and writable by ETL processes (they typically run under their own user) and members of the ETL team. The /etl directory tree will have subdirectories for the various groups that own the ETL processes, such as business analytics, fraud detection, and marketing. The ETL workflows are typically part of a larger application, such as clickstream analysis or recommendation engines, and each application should have its own subdirec‐ tory under the /etl directory. Within each application-specific directory, you would have a directory for each ETL process or workflow for that application. Within the workflow directory, there are subdirectories for each of the data sets. For example, if your Business Intelligence (BI) team has a clickstream analysis application and one of its processes is to aggregate user preferences, the recom‐ mended name for the directory that contains the data would be /etl/BI/click‐ stream/aggregate_preferences. In some cases, you may want to go one level further and have directories for each stage of the process: input for the landing zone where the data arrives, processing for the intermediate stages (there may be more than one processing directory), output for the final result, and bad where records or files that are rejected by the ETL process land for manual troubleshooting. In such cases, the final structure will look similar to /etl/group/application/ process/input,processing,output,bad /tmp Temporary data generated by tools or shared between users. This directory is typically cleaned by an automated process and does not store long-term data. This directory is typically readable and writable by everyone. /data Data sets that have been processed and are shared across the organization. Because these are often critical data sources for analysis that drive business deci‐ 16 Chapter 1: Data Modeling in Hadoop sions, there are often controls around who can read and write this data. Very often user access is read-only, and data is written by automated (and audited) ETL processes. Since data in /data is typically business-critical, only automated ETL processes are typically allowed to write them—so changes are controlled and audited. Different business groups will have read access to different directories under /data, depending on their reporting and processing needs. Since /data serves as the location for shared processed data sets, it will contain subdirectories for each data set. For example, if you were storing all orders of a pharmacy in a table called medication_orders, we recommend that you store this data set in a directory named /data/medication_orders. /app Includes everything required for Hadoop applications to run, except data. This includes JAR files, Oozie workflow definitions, Hive HQL files, and more. The application code directory /app is used for application artifacts such as JARs for Oozie actions or Hive user-defined functions (UDFs). It is not always necessary to store such application artifacts in HDFS, but some Hadoop applications such as Oozie and Hive require storing shared code and configuration on HDFS so it can be used by code executing on any node of the cluster. This directory should have a subdirectory for each group and application, similar to the structure used in /etl. For a given application (say, Oozie), you would need a directory for each version of the artifacts you decide to store in HDFS, possibly tagging, via a sym‐ link in HDFS, the latest artifact as latest and the currently used one as current. The directories containing the binary artifacts would be present under these ver‐ sioned directories. This will look similar to: /app/group/application/ver‐ sion/artifact directory/artifact. To continue our previous example, the JAR for the latest build of our aggregate preferences process would be in a directory structure like /app/BI/clickstream/latest/aggregate_preferences/uber-aggregate- preferences.jar. /metadata Stores metadata. While most table metadata is stored in the Hive metastore, as described later in the “Managing Metadata” on page 31, some extra metadata (for example, Avro schema files) may need to be stored in HDFS. This directory would be the best location for storing such metadata. This directory is typically readable by ETL jobs but writable by the user used for ingesting data into Hadoop (e.g., Sqoop user). For example, the Avro schema file for a data set called movie may exist at a location like this: /metadata/movielens/movie/movie.avsc. We will discuss this particular example in more detail in Chapter 10. Advanced HDFS Schema Design Once the broad directory structure has been decided, the next important decision is how data will be organized into files. While we have already talked about how the for‐ HDFS Schema Design 17mat of the ingested data may not be the most optimal format for storing it, it’s impor‐ tant to note that the default organization of ingested data may not be optimal either. There are a few strategies to best organize your data. We will talk about partitioning, bucketing, and denormalizing here. Partitioning Partitioning a data set is a very common technique used to reduce the amount of I/O required to process the data set. When you’re dealing with large amounts of data, the savings brought by reducing I/O can be quite significant. Unlike traditional data warehouses, however, HDFS doesn’t store indexes on the data. This lack of indexes plays a large role in speeding up data ingest, but it also means that every query will have to read the entire data set even when you’re processing only a small subset of the data (a pattern called full table scan). When the data sets grow very big, and queries only require access to subsets of data, a very good solution is to break up the data set into smaller subsets, or partitions. Each of these partitions would be present in a sub‐ directory of the directory containing the entire data set. This will allow queries to read only the specific partitions (i.e., subdirectories) they require, reducing the amount of I/O and improving query times significantly. For example, say you have a data set that stores all the orders for various pharmacies in a data set called medication_orders, and you’d like to check order history for just one physician over the past three months. Without partitioning, you’d need to read the entire data set and filter out all the records that don’t pertain to the query. However, if we were to partition the entire orders data set so each partition included only a single day’s worth of data, a query looking for information from the past three months will only need to read 90 or so partitions and not the entire data set. When placing the data in the filesystem, you should use the following directory for‐ mat for partitions: data set name/partition_column_name=partition_col‐ umn_value/files . In our example, this translates to: medication_orders/ date=20131101/order1.csv, order2.csv This directory structure is understood by various tools, like HCatalog, Hive, Impala, and Pig, which can leverage partitioning to reduce the amount of I/O required during processing. Bucketing Bucketing is another technique for decomposing large data sets into more managea‐ ble subsets. It is similar to the hash partitions used in many relational databases. In the preceding example, we could partition the orders data set by date because there are a large number of orders done daily and the partitions will contain large enough files, which is what HDFS is optimized for. However, if we tried to partition the data by physician to optimize for queries looking for specific physicians, the resulting 18 Chapter 1: Data Modeling in Hadoop number of partitions may be too large and resulting files may be too small in size. This leads to what’s called the small files problem. As detailed in “File-based data structures” on page 6, storing a large number of small files in Hadoop can lead to excessive memory use for the NameNode, since metadata for each file stored in HDFS is held in memory. Also, many small files can lead to many processing tasks, causing excessive overhead in processing. The solution is to bucket by physician, which will use a hashing function to map physicians into a specified number of buckets. This way, you can control the size of the data subsets (i.e., buckets) and optimize for query speed. Files should not be so small that you’ll need to read and manage a huge number of them, but also not so large that each query will be slowed down by having to scan through huge amounts of data. A good average bucket size is a few multiples of the HDFS block size. Having an even distribution of data when hashed on the bucketing column is important because it leads to consistent bucketing. Also, having the number of buckets as a power of two is quite common. An additional benefit of bucketing becomes apparent when you’re joining two data sets. The word join here is used to represent the general idea of combining two data sets to retrieve a result. Joins can be done via SQL-on-Hadoop systems but also in MapReduce, or Spark, or other programming interfaces to Hadoop. When both the data sets being joined are bucketed on the join key and the number of buckets of one data set is a multiple of the other, it is enough to join corresponding buckets individually without having to join the entire data sets. This significantly reduces the time complexity of doing a reduce-side join of the two data sets. This is because doing a reduce-side join is computationally expensive. However, when two bucketed data sets are joined, instead of joining the entire data sets together, you can join just the corresponding buckets with each other, thereby reducing the cost of doing a join. Of course, the buckets from both the tables can be joined in parallel. Moreover, because the buckets are typically small enough to easily fit into memory, you can do the entire join in the map stage of a Map-Reduce job by loading the smaller of the buckets in memory. This is called a map-side join, and it improves the join performance as compared to a reduce-side join even further. If you are using Hive for data analysis, it should automatically recognize that tables are bucketed and apply this optimization. If the data in the buckets is sorted, it is also possible to use a merge join and not store the entire bucket in memory when joining. This is somewhat faster than a simple bucket join and requires much less memory. Hive supports this optimization as well. Note that it is possible to bucket any table, even when there are no logical partition keys. It is recommended to use both sorting and bucketing on all large tables that are frequently joined together, using the join key for bucketing. HDFS Schema Design 19 As you can tell from the preceding discussion, the schema design is highly dependent on the way the data will be queried. You will need to know which columns will be used for joining and filtering before deciding on partitioning and bucketing of the data. In cases when there are multiple common query patterns and it is challenging to decide on one partitioning key, you have the option of storing the same data set mul‐ tiple times, each with different physical organization. This is considered an anti- pattern in relational databases, but with Hadoop, this solution can make sense. For one thing, in Hadoop data is typically write-once, and few updates are expected. Therefore, the usual overhead of keeping duplicated data sets in sync is greatly reduced. In addition, the cost of storage in Hadoop clusters is significantly lower, so there is less concern about wasted disk space. These attributes allow us to trade space for greater query speed, which is often desirable. Denormalizing Although we talked about joins in the previous subsections, another method of trad‐ ing disk space for query performance is denormalizing data sets so there is less of a need to join data sets. In relational databases, data is often stored in third normal form. Such a schema is designed to minimize redundancy and provide data integrity by splitting data into smaller tables, each holding a very specific entity. This means that most queries will require joining a large number of tables together to produce final result sets. In Hadoop, however, joins are often the slowest operations and consume the most resources from the cluster. Reduce-side joins, in particular, require sending entire tables over the network. As we’ve already seen, it is important to optimize the schema to avoid these expensive operations as much as possible. While bucketing and sorting do help there, another solution is to create data sets that are prejoined—in other words, preaggregated. The idea is to minimize the amount of work queries have to do by doing as much as possible in advance, especially for queries or subqueries that are expected to execute frequently. Instead of running the join operations every time a user tries to look at the data, we can join the data once and store it in this form. Looking at the difference between a typical Online Transaction Processing (OLTP) schema and an HDFS schema of a particular use case, you will see that the Hadoop schema consolidates many of the small dimension tables into a few larger dimensions by joining them during the ETL process. In the case of our pharmacy example, we consolidate frequency, class, admin route, and units into the medications data set, to avoid repeated joining. Other types of data preprocessing, like aggregation or data type conversion, can be done to speed up processes as well. Since data duplication is a lesser concern, almost any type of processing that occurs frequently in a large number of queries is worth doing once and reusing. In relational databases, this pattern is often known as Materi‐ 20 Chapter 1: Data Modeling in Hadoop

Advise: Why You Wasting Money in Costly SEO Tools, Use World's Best Free SEO Tool Ubersuggest.