What Hadoop is? (Tutorial 2019)
A Hadoop distribution is made of a number of separate frameworks that are designed to work together. The frameworks are extensible as well as the Hadoop framework platform.
Hadoop has evolved to support fast data as well as big data. Big data was initially about the large batch processing of data.
Now organizations also need to make business decisions in real time or near real time as the data arrives. Fast data involves the capability to act on the data as it arrives.
Hadoop’s flexible framework architecture supports the processing of data with different run-time characteristics.
The tutorial discussed the Hadoop ecosystem including Hadoop Distributed File System (HDFS), HBase NoSQL database, Hive data warehouse solution, and the Pig query language for ad-hoc analytical requirements.
The tutorial provides a snapshot overview of other components of the Hadoop environment, namely, Kafka, Flume, Sqoop, Whirr, Impala, and Drill.
A software framework is an abstraction in which software providing generic functionality can be selectively changed by additional user-written code, thus providing application- specific software.
A software framework is a universal and reusable software platform to develop applications, products, and solutions.
Software frameworks include support programs, compilers, code libraries, tool sets, and application programming interfaces (APIs) that bring together all the different components to enable development of a project or solution.
Frameworks contain the following key distinguishing features that separate them from normal libraries:
Inversion of control: In a framework, unlike in libraries or normal user applications, the overall program’s flow of control is not dictated by the caller but by the framework.
Default behavior: A framework has a default behavior. This default behavior must be some useful behavior and not a series of no-ops.
Non-modifiable framework code: The framework code, in general, is not supposed to be modified, excepting extensibility. Users can extend the framework but should not modify its code.
Extensibility: A framework can be extended by the user usually by selective overriding or specialized by user code to provide specific functionality.
The main daemons for Hadoop frameworks are Java applications that run in JVMs. There are also separate frameworks that are not part of a standard distribution that can be added to Hadoop to add additional functionality.
Popular open source frameworks can be added to new Hadoop distributions. The significance of using Java for these core essentials is its property of write once and run everywhere: Java enables portability. The current release of Hadoop is a single merged Java code set for both Linux and Windows.
A Hadoop cluster is made up of a number of daemons and software processes that work together to provide a complete solution to meet today’s data needs.
The Hadoop master and worker daemons all run as Java processes in their individual JVMs. Like any high- performance computing environment, tuning of the JVMs and their garbage collection is important.
There are three types of Hadoop processes:
Primary management daemons coordinate with slave servers. Master servers manage the framework for which they are responsible. The master servers include the following:
NameNode: Responsible for all I/O and storage metadata.
ResourceManager: The scheduler responsible for all processing.
Zookeeper processes: Provides coordination service.
Standby NameNode: Provides failover capability for the NameNode.
Cleans out the edit logs of the NameNodes metadata if a standby NameNode is not used. The cluster can have a standby NameNode (HA) or a secondary NameNode (clean edit.log) but not both. If there is a standby NameNode, it will clean the edit.log, so a secondary NameNode is not needed.
Similar to the secondary NameNode, it receives journal streams of file system edits from the NameNode as opposed to downloading fsi- mage or edit files. This stream is persisted to disk and to memory immediately.
Secondary ResourceManager: Provides failover capability for the resource manager.
Ozzie server: Schedules jobs and manages the workflow of jobs submitted.
HiveServer2: Provides the JDBC or ODBC interface for Hive queries.
HBase Master(s): Manages the HBase environment.
Ambari server: Used for provisioning, managing, and monitoring a Hadoop cluster.
Ganglia server: Manages the Ganglia environment that collects.
Nagios server: Collects information for alerts.
JobHistoryServer: Collects and manages job history for MapReduce jobs.
WebHCatServer: Processes HTTP requests for running YARN applications such as MapReduce, Hive, and Pig, as well as HCatalog DDL commands.
2. Slave servers: Responsible for all data storage and computational activities. I/O operations occur on the data nodes where the slave server processes run.
All data are stored on the worker nodes (slave servers). Typically, data processing occurs on the worker nodes.
The worker node servers are also referred to as slave servers or DataNodes. DataNode is the term most commonly used. A slave server can contain a DataNode, NodeManager, or both.
Slave servers include the following:
DataNodes: Handle all I/O locally on their own nodes. These are slave servers for the name nod.
NodeManagers: Manage all YARN processes running on their slave servers. These are slave servers for the resource manager.
HBase region servers: Responsible for all local processing of HBase applications. These are slave servers for the HBase masters. These processes run on the DataNodes.
Different frameworks such as Storm, Kafka, Spark, Accumulo, and Cassandra.
These frameworks can have different processes that may be running in the Hadoop cluster or in a different cluster. Each framework has separate processes for its operations.
3.Clients: Responsible for launching applications.
Clients run the applications in the cluster.
Hadoop client launches Tez and MapReduce applications.
Pig client launches Pig scripts. Converts to Tez or MapReduce.
Hive client launches Hive SQL statements. Converts to Tez or MapReduce.
The Hadoop execution environment supports a constellation of distributed data processing capabilities that are designed to run using the Hadoop MapReduce architecture.
Several of these have become official Hadoop subprojects within the Apache Software Foundation. These include a distributed file system called Hadoop Distributed File System (HDFS), which is analogous to GFS in the Google MapReduce implementation.
HBase is a distributed column-oriented database that provides similar random access read/write capabilities as and is modeled after Bigtable implemented by Google.
HBase is not relational and does not support SQL, but provides a Java API and a command-line shell for table management.
Hive is a data warehouse system built on top of Hadoop that provides SQL-like query capabilities for data summarization, ad hoc queries, and analysis of large datasets.
Other Apache-sanctioned projects for Hadoop include Pig—a high-level data-flow language and execution framework for parallel computation, and ZooKeeper—a high-performance coordination service for distributed applications.
The Hadoop ecosystem is constituted of:
HDFS: A distributed file system that provides high-throughput access to application data
MapReduce: A software framework for distributed processing of large data sets on compute clusters
HBase: A scalable, distributed database that supports structured data storage for large tables
ZooKeeper: A high-performance coordination service for distributed applications
Oozie: A scalable workflow system that is used to coordinate the execution of multiple MapReduce jobs
Pig: A high-level data-flow language and execution framework for parallel computation
Hive: A data warehouse infrastructure that provides data summarization and ad hoc querying
Mahout: A scalable machine learning and data mining library
The Hadoop ecosystem also contains several frameworks for integration with the rest of the enterprise:
Sqoop: A connectivity tool for moving data between relational databases and data warehouses, and Hadoop
Flume: A distributed, reliable, and highly available service for efficiently collecting, aggregating, and moving large amounts of data from individual machines to HDFS
Hadoop’s ecosystem is growing beyond the core components listed above to provide newer capabilities and components, such as the following:
Mahout: A machine-learning and data-mining library that provides MapReduce implementations for popular algorithms used for clustering, regression testing, and statistical modeling
BigTop: A formal process and framework for packaging and interoperability testing of Hadoop’s subprojects and related components
Whirr: A set of libraries that allows users to easily spin up Hadoop clusters on top of Amazon EC2, Rackspace, or any virtual infrastructure
Ambari: A project aimed at simplifying Hadoop management by providing support for provisioning, managing, and monitoring Hadoop clusters.
The time scale ranges from the batch, through ad hoc, to streaming and real time. Batch processing is done typically on all the data that accumulate over a period of minutes to days or even a month or longer.
The emphasis in batch processing is on total throughput, and it usually does not matter how long it takes to process any single input record as long as many records are processed quickly.
In ad hoc processing, the emphasis is on a quick (in human terms) response, but the data being processed is very much like the input for a batch process in that it typically consists of all the data available or all the data for a recent time period.
In fact, ad hoc processing can be thought of as batch processing that is initiated by some user action instead of being based on a schedule.
Ad hoc processing is sometimes mislabeled as real-time because there is user-visible response time, but this is a serious misnomer that could lead to some confusion about which tool is appropriate to use.
With streaming or real-time processing, records are processed as they arrive or in very small batches known as micro batches. Real-time processing adds the additional constraint that records must not only be processed as they arrive, but that processing must also be completed before a prespecified deadline passes.
Requiring that records be processed one at a time is more expensive than processing large batches of records since it does not allow certain economies of scale that are possible in batch processing.
The general purpose of usage is spread across ingestion, data processing or transformation, persistence, and extraction. Ingestion is the process of getting data into a system with minimal processing or transformation applied during ingestion. Processing is where all significant computing and transformation is done.
The most common operation in processing raw data is that it is aggregated into summaries or arranged into profiles.
Data is commonly persisted after processing, but in the Hadoop systems, data is commonly persisted in nearly raw form as it is ingested before it is processed, thus enabling the retention of relatively raw data that makes it possible for an error to be performed in stream processing.
Different forms of persistence lend themselves to different kinds of processing. For example, files can be used to achieve very high scan rates that are particularly useful in batch programming, while HBase is very useful in real-time or stream processing where updates may have to be made each time a record is processed.
Hadoop’s MapReduce computing platform consists of two components: One is JobTracker, which acts as a master in the Hadoop cluster, while the other is called TaskTrackers.
A typical activity flow in Hadoop is as follows:
1. A server, called JobTracker, accepts jobs (MapReduce programs) from clients. A job is associated with a mapper method, a reducer method, and a set of input files and output files.
2.The JobTracker contacts the Name node to get the location of the input files. The JobTracker applies an appropriate scheduling algorithm to assign tasks to a set of computing nodes, called TaskTrackers. The scheduling algorithm takes data locality into account to optimize data movements.
3. The JobTracker distributes the mapper and reducer methods among the scheduled TaskTrackers. In addition to the mapper and reducer methods, the job tracker also distributes the job configuration so that the mapper and reducer methods run based on the provided configuration.
4. The TaskTrackers performs the assigned mapper task. The TaskTrackers reads input from data nodes and applies the given method on the input.
5. The map task creates and writes to intermediate key-value pairs to a file on the local file system of the TaskTracker.
6. The partitioner reads the results from the map task and finds an appropriate TaskTracker to run reduce task. The intermediate results emitted by map tasks are then propagated to reduce tasks.
7. The TaskTrackers that are scheduled to run reduce tasks apply operations programmed in reduce function on the data elements streamed from map tasks and emit key-value pairs as output. The output data elements are then written to HDFS.
JobTracker is a server that has the implementation for necessary user interfaces needed to submit and run a map reduce job. Once a map reduces job is submitted to a Hadoop cluster, the JobTracker of the Hadoop cluster engineers a scheme to run the submitted job.
The scheme involves identifying TaskTrackers in the cluster to perform map operations, triggering the mappers on Tasktrackers, monitoring the task while it is running, etc.
JobTracker runs in listening mode to take requests from clients. When a client submits a job, the job- Tracker communicates with the NameNode to obtain a list of machines that carry the input data for the job.
The list is used in an optimization algorithm, and the JobTracker comes up with an optimized scheme to run the job on TaskTrackers.
As mentioned earlier, the scheme attempts to reduce network bandwidth utilization within the cluster by adopting the data locality feature. By data locality, the preference in decreasing priority to run a task on a data chunk is
The machine where the data is located
The rack where the data is located
A computing machine in the cluster
It is not always possible to find a node for a map task to run on local data; then, the next best possibility is to find a node in the same rack.
If there is no node available on the same rack to take the task, then whatsoever machine in the cluster that can handle the task, even if not optimally, performs the task.
One of the reasons a TaskTracker may not be available to take a task is that the TaskTracker may have been running the tasks up to its maximum capacity.
Whenever the JobTracker identifies a TaskTracker to run a task, it monitors the task until it terminates. If a task fails on a TaskTracker, the JobTracker finds another optimal TaskTracker to handle the task.
By restarting a task on another TaskTracker, the JobTracker ensures that the job does not terminate even if there is a single task that has failed even once. A job cannot be deemed to have been completed unless all the tasks of the job complete without errors.
The responsibilities of a JobTracker can be summarized as follows:
1.Manage TaskTrackers and their resources as jobs are being submitted
2.Schedule tasks among available resources
3.Monitor the tasks of jobs and restart the failed tasks for the configured number of attempts
TaskTracker is a daemon running on the computing nodes of a Hadoop cluster. The TaskTracker receives instructions to perform map and/or reduce tasks from the JobTracker.
The TaskTracker posts the available resources in the node and a heartbeat message at every specific interval of time to the JobTracker.
The JobTracker performs bookkeeping of the TaskTracker and the corresponding resource and exploits the information in job scheduling. The TaskTracker performs the assigned tasks (map, reduce) on the given data.
The JobTracker tracks the TaskTrackers for the tasks that are running and instructs the TaskTrackers where to send the output data.
A TaskTracker performs a mapper method or a reduce method.
MapReduce Enhancements and Extensions
While the basic implementation of the MapReduce is very useful for handling data processing and data loading in a heterogeneous system with many different storage systems, this basic architecture suffers from some limitations.
In this section, we present an overview of some of these limitations as also the workarounds, enhancements, and extensions to MapReduce that have been devised to address these limitations.
Supporting Iterative Processing
Many data analysis techniques (e.g., PageRank algorithm, recursive relational queries, social network analysis) require iterative computations.
These techniques have a common requirement, which is that data are processed iteratively until the computation satisfies a convergence or stopping condition.
The basic MapReduce framework does not directly support these iterative data analysis applications. Instead, programmers must implement iterative programs by manually issuing multiple MapReduce jobs and orchestrating their execution using a driver program.
In practice, there are two key problems with manually orchestrating an iterative program in MapReduce:
1. Even though much of the data may be unchanged from iteration to iteration, the data must be reloaded and reprocessed at each iteration, wasting I/O, network bandwidth, and CPU resources.
2. The termination condition may involve the detection of when a fixpoint has been reached. This condition may itself require an extra MapReduce job on each iteration, again incurring overhead in terms of scheduling extra tasks, reading extra data from disk, and moving data across the network.
To accommodate the requirements of iterative data analysis applications, Hadoop has incorporated the following changes to the basic Hadoop MapReduce framework:
It exposes a new application programming interface to users that simplifies the expression of iterative MapReduce programs.
Hadoop's master node contains a new loop control module that repeatedly starts new map-reduce steps that compose the loop body until a user-specified stopping condition is met.
It uses a new task scheduler that leverages data locality.
It caches and indices application data on slave nodes. In principle, the TaskTracker manages not only task execution but also caches and indices on the slave node and redirects each task’s cache and index accesses to the local file system.
The HaLoop system achieves this by extending the basic MapReduce framework with two main functionalities:
1.Caching the invariant data in the first iteration and then reusing them in later iterations
2.Caching the reducer outputs, which makes checking for a fixpoint more efficient, without an extra MapReduce job
HaLoop relies on the same file system and has the same task queue structure as Hadoop but the task scheduler and TaskTracker modules are modified, and the loop control, caching, and indexing modules are newly introduced to the architecture.
The TaskTracker manages not only task execution but also caches and indices on the slave node, and redirects each task’s cache and index accesses to the local file system.
In the MapReduce framework, each map or reduce task contains its portion of the input data and the task runs by performing the map/reduce function on its input data records where the life cycle of the task ends when the processing of all the input data records has been completed.
The MapReduce framework supports the feature of iterative processing by keeping alive each map and reduces task during the entire iterative process. In particular, when all the input data of a persistent task are parsed and processed, the task becomes dormant, waiting for the newly updated input data:
For a map task, it waits for the results from the reduce tasks and is activated to work on the new input records when the required data from the reduce tasks arrive.
For the reduce tasks, they wait for the map tasks’ output and are activated synchronously as in MapReduce.
The MapReduce runtime system does the termination check after each iteration. To terminate the iterations by a fixed number of iterations, the persistent map/reduce task records its iteration number and terminates itself when the number exceeds a threshold.
To bound the distance between the output from two consecutive iterations, the reduce tasks can save the output from two consecutive iterations and compute the distance.
If the termination condition is satisfied, the master will notify all the map and reduce tasks to terminate their execution. Jobs can terminate their iterative process in one of two ways:
1. Defining the fixed number of iterations: Iterative algorithm stops after it iterates n times.
2. Bounding the distance between two consecutive iterations: Iterative algorithm stops when the distance is less than a threshold.
One main limitation of the MapReduce framework is that it does not support the joining of multiple data sets in one task. However, this can still be achieved with additional MapReduce steps such as the following:
1. Standard repartition joins: The two input relations are dynamically partitioned on the join key and the corresponding pairs of partitions are joined using the standard partitioned sort-merge join approach.
2. Improved repartition join: One potential problem with the standard repartition join is that all the records for a given join key from both input relations have to be buffered.
Therefore, when the key cardinality is small or when the data is highly skewed, all the records for a given join key may not fit in memory. The improved repartition join strategy fixes the buffering problem by introducing the following key changes:
a. In the map function, the output key is changed to a composite of the join key and the table tag. The table tags are generated in a way that ensures records from one input relation will be sorted ahead of those from the other input relation on a given join key.
b.The partitioning function is customized so that the hashcode is computed from just the join key part of the composite key. This way records with the same join key are still assigned to the same reduce task.
c. As records from the smaller input are guaranteed to be ahead of those from L for a given join key, only the records from the smaller input are buffered and the records of the larger input are streamed to generate the join output.
3. Broadcast join: Instead of moving both input relations across the network as in the repartition-based joins, the broadcast join approach moves only the smaller input relation so that it avoids the preprocessing sorting requirement of both input relations and more importantly avoids the network overhead for moving the larger relation.
4. Semijoin: This join approach tries to avoid the problem of the broadcast join approach where it is possible to send many records of the smaller input relation across the network while they may not be actually referenced by any records in the other relation.
It achieves this goal at the cost of an extra scan of the smaller input relation where it determines the set of unique join keys in the smaller relation, send them to the other relation to specify the list of the actual referenced join keys and then send only these records across the network for executing the real execution of the join operation.
5. Per-split semijoin: This join approach tries to improve the semijoin approach with a further step to address the fact that not every record in the filtered version of the smaller relation will join with a particular split of the larger relation.
Therefore, an extra process step is executed to determine the target split(s) of each filtered join key.
Based on statistics, such as the relative data size and the fraction of the join key referenced, the tradeoffs of the join strategies can be surmised as follows:
If the data is not preprocessed, the right join strategy depends on the size of the data transferred via the network.
If the net cost of broadcasting an input relation R to every node is less expensive than transferring both R and projected L, then the broadcast join algorithm should be used.
When preprocessing is allowed, semijoin, per split semijoin, and directed join with sufficient partitions are the best choices.
Semijoin and pre-split semijoin offer further flexibility since their preprocessing steps are insensitive to how the log table is organized and thus suitable for any number of reference tables.
In addition, the preprocessing steps of these two algorithms are cheaper since there is no shuffling of the log data.
One of the main limitations of the original implementation of the MapReduce framework is that it is designed in a way that the jobs can scan the input data only in a sequential- oriented fashion.
Hence, the query-processing performance of the MapReduce framework is unable to match the performance of a well-configured parallel DBMS.
The Hadoop++ system aims to boost the query performance of the Hadoop system without changing any of the system internals, by injecting their changes through a user-defined function (UDF), which only affect the Hadoop system from inside without any external effect:
1.Trojan Index: The original Hadoop implementation does not provide index access due to the lack of a priori knowledge of schema and the MapReduce jobs being executed;
The Hadoop++ system is based on the assumption that if we know the schema and the anticipated MapReduce jobs, then we can create appropriate indices for the Hadoop tasks. These indices are created during the data-loading time and thus have no penalty at query time.
Each Trojan index provides an optional index access path, which can be used for selective MapReduce jobs. These indices are created by injecting appropriate UDFs inside the Hadoop implementation.
The main features of Trojan indices can be summarized as follows:
a. Optional access path: They provide an optional index access path that can be used for selective MapReduce jobs. However, the scan access path can still be used for other MapReduce jobs.
b. Seamless splitting: Data indexing adds an index overhead for each data split. Therefore, the logical split includes the data as well as the index, as it automatically splits the indexed data at logical split boundaries.
c. Partial index: A Trojan index need not be built on the entire split. However, it can be built on any contiguous subset of the split as well.
d. Multiple indices: Several Trojan indices can be built on the same split. However, only one of them can be the primary index. During query processing, an appropriate index can be chosen for data access based on the logical query plan and the cost model.
e. Noninvasive: They do not change the existing Hadoop framework. The index structure is implemented by providing the right UDFs.
f. No external library or engine: Trojan indices integrate indexing capability natively into the Hadoop framework without imposing a distributed SQL query engine on top of it.
2. Trojan joins: the Hadoop++ system assumes that if we know the schema and the expected workload, then we can co-partition the input data during the loading time.
In particular, given any two input relations, they apply the same partitioning function on the join attributes of both the relations at data loading time and place the co-group pairs, having the same join key from the two relations, on the same split and, hence, on the same node.
As a result, join operations can be then processed locally within each node at query time. Implementing the Trojan joins does not require any changes to be made to the existing implementation of the Hadoop framework.
The only changes are made to the internal management of the data splitting process. In addition, Trojan indices can be freely combined with Trojan joins.
This is related to the design and implementation of a column-oriented and binary back-end storage format for Hadoop. In general, a straightforward way to implement a column-oriented storage format for Hadoop is to store each column of the input data set in a separate file. However, this raises two main challenges:
It requires generating roughly equal-sized splits so that a job can be effectively parallelized over the cluster.
It needs to ensure that the corresponding values from different columns in the data set are co-located on the same node running the map task.
The first challenge can be tackled by horizontally partitioning the data set and storing each partition in a separate subdirectory.
The second challenge is harder to tackle because of the default three-way block-level replication strategy of HDFS that provides fault tolerance on commodity servers but does not provide any co-location guarantees.
But this can be tackled by implementing a modified HDFS block placement policy that guarantees that the files corresponding to the different columns of a split are always co-located across replicas.
Hence, when reading a data set, the column input format can actually assign one or more split directories to a single split, and the column files of a split directory are scanned sequentially and the records are reassembled using values from corresponding positions in the files.
A lazy record construction technique is used to mitigate the deserialization overhead in Hadoop, as well as eliminate unnecessary disk I/O. The basic idea behind lazy record construction is to deserialize only those columns of a record that are actually accessed in a map function.
Each column of the input data set can be compressed using one of the following compression schemes:
1. Compressed blocks: This scheme uses a standard compression algorithm to compress a block of contiguous column values. Multiple compressed blocks may fit into a single HDFS block.
A header indicates the number of records in a compressed block and the block’s size. This allows the block to be skipped if no values are accessed in it. However, when a value in the block is accessed, the entire block needs to be decompressed.
2. Dictionary compressed skip list: This scheme is tailored for map-typed columns. It takes advantage of the fact that the keys used in maps are often strings that are drawn from a limited universe. Such strings are well suited for dictionary compression.
A dictionary is built of keys for each block of map values and stores the compressed keys in a map using a skip-list format. The main advantage of this scheme is that a value can be accessed without having to decompress an entire block of values.
One advantage of this approach is that adding a column to a data set is not an expensive operation. This can be done by simply placing an additional file for the new column in each of the split directories.
However, a potential disadvantage of this approach is that the available parallelism may be limited for smaller data sets. Maximum parallelism is achieved for a MapReduce job when the number of splits is at least equal to the number of map tasks.
The fundamental idea of YARN (Yet Another Resource Negotiator) is to split the two major responsibilities of the Job-Tracker—resource management and job scheduling/monitoring— into separate daemons:
A global Resource Manager
A per-application ApplicationMaster (AM)
The Resource Manager and per-node slave, the NodeManager (NM), form the new, and generic, operating system for managing applications in a distributed manner. The Resource Manager is the ultimate authority that arbitrates division of resources among all the applications in the system.
The per-application ApplicationMaster is a framework-specific entity and is tasked with negotiating for resources from the Resource Manager and working with the NodeManager(s) to execute and monitor the component tasks.
The per-application ApplicationMaster is responsible for negotiating appropriate resource containers from the Scheduler, tracking their status, and monitoring their progress. From the system perspective, the ApplicationMaster runs as a normal container.
A container is a resource lease that has the privilege of using CPU and memory on a specific worker node. Containers are allocated to the NodeManagers for running distributed processes on the DataNodes. There is flexibility where storage and networking may be added in the future.
A container is a logical definition of resources (for example, 4GB of RAM, 1 CPU core) for a specific worker node. A job may run multiple containers spread across the worker nodes in the cluster.
Application Masters can work with Node Managers to launch containers that are written in C, Java, Python, Scala, and so on.
A container will have information, such as the command line to start the process or JVM in the container, defined environmental variables, security tokens, jars, libraries, data files, or any additional objects required to run the code in the container.
Following is an example of determining how many containers per node can be allocated:
# of containers = min (2*CORES, 1.8*DISKS, [Total available RAM] / MIN_CONTAINER_SIZE) where MIN_CONTAINER_SIZE is minimum container size in GB (RAM).
The Resource Manager has a pluggable scheduler component, which is responsible for allocating resources to the various running applications subject to the familiar constraints of capacity, queues, and other factors.
The Scheduler is a pure scheduler in the sense that it performs no monitoring or tracking of status for the application, offering no guarantees on restarting tasks that are not carried out due to either application failure or hardware failure.
The scheduler performs its scheduling function based on the resource requirements of an application by using the abstract notion of a resource container, which incorporates resource dimensions such as memory, CPU, disk, and network.
The NodeManager is the per-machine slave, which is responsible for launching the applications’ containers, monitoring their resource usage (CPU, memory, disk, network), and reporting the same to the Resource Manager.
[Note: You can free download the complete Office 365 and Office 2019 com setup Guide.]
Hadoop Distributed File System (HDFS)
The distributed file system in Hadoop is designed to run on commodity hardware. Although it has many similarities with other distributed file systems, the differences are significant.
It is highly faulted tolerant and is designed to run on low-cost hardware. It also provides high throughput to stored data, and hence can be used to store and process large datasets.
To enable this streaming of data, it relaxes some POSIX standards. HDFS was originally built for the Apache Nutch project and later forked into an individual project under Apache.
HDFS, by design, is able to provide reliable storage to large datasets, allowing high- bandwidth data streaming to user applications. By distributing the storage and computation across several servers, the resource can scale up and down with demand while remaining economical.
Characteristics of HDFS
1.Hardware failure: Hardware failure is fairly common in clusters. A Hadoop cluster consists of thousands of machines, each of which stores a block of data.
HDFS consists of a huge number of components, and with that, there is a good chance of failure among them at any point in time. The detection of these failures and the ability to recover quickly is part of the core architecture.
2. Streaming data access: Applications that run on the Hadoop HDFS need access to streaming data.
These applications cannot be run on general-purpose file systems. HDFS is designed to enable large-scale batch processing, which is enabled by the high-throughput data access. Several POSIX requirements are relaxed to enable these special needs of high throughput rates.
3.Large data sets: The HDFS-based applications feed on large datasets. Typical file size is in the range of high gigabytes to low terabytes. It should provide high data bandwidth and support millions of files across hundreds of nodes in a single cluster.
4.Heterogeneous hardware and software portability: HDFS is designed to run on commodity hardware, which hosts multiple platforms. This feature helps the widespread adoption of this platform for large-scale computations.
5.Simple coherency model: The write-once-read-many access model of files enables high-throughput data access as the data once written need not be changed, thus simplifying data coherency issues. A MapReduce-based application takes advantage of this model.
6. Moving computes instead of data: Any computation is efficient if it executes near the data because it avoids the network transfer bottleneck.
Migrating the computation closer to the data is a cornerstone of HDFS-based programming. HDFS provides all the necessary application interfaces to move the computation close to the data prior to execution.
HDFS is analogous to GFS in the Google MapReduce implementation. A block in HDFS is equivalent to a chunk in GFS and is also very large, 64 Mb by default, but 128 Mb is used in some installations.
The large block size is intended to reduce the number of seeks and improve data transfer times. Each block is an independent unit stored as a dynamically allocated file in the Linux local file system in a DataNode directory.
If the node has multiple disk drives, multiple DataNode directories can be specified.
An additional local file per block stores metadata for the block. HDFS also follows a master-slave architecture, which consists of a single master server that manages the distributed file system namespace and regulates access to files by clients called the NameNode.
In addition, there are multiple DataNodes, one per node in the cluster, which manage the disk storage attached to the nodes and assigned to Hadoop. The NameNode determines the mapping of blocks to DataNodes.
The DataNodes are responsible for serving read and write requests from file system clients such as MapReduce tasks, and they also perform block creation, deletion, and replication based on commands from the NameNode.
HDFS is a file system, and like any other file system architecture, it needs to manage consistency, recoverability, and concurrency for reliable operations. These requirements have been addressed in the architecture by creating image, journal, and checkpoint files:
1.NameNode (master node): The NameNode is a single master server that manages the file system namespace and regulates access to files by clients.
Additionally, the NameNode manages all the operations such as opening, closing, moving, naming, and renaming of files and directories. It also manages the mapping of blocks to DataNodes.
2.DataNodes (slave nodes): DataNodes represent the slaves in the architecture that manage data and the storage attached to the data.
A typical HDFS cluster can have thousands of DataNodes and tens of thousands of HDFS clients per cluster since each DataNode may execute multiple application tasks simultaneously.
The DataNodes are responsible for managing read and write requests from the file system’s clients, block maintenance, and perform replication as directed by the NameNode. The block management in HDFS is different from a normal file system. The size of the data file equals the actual length of the block.
This means if a block is half full, it needs only half of the space of the full block on the local drive, thereby optimizing storage space for compactness, and there is no extra space consumed on the block, unlike a regular file system.
3.Image: An image represents the metadata of the namespace (inodes and lists of blocks). On startup, the NameNode pins the entire namespace image in memory. The in-memory persistence enables the NameNode to service multiple client requests concurrently.
4.Journal: The journal represents the modification log of the image in the local host’s native file system.
During normal operations, each client transaction is recorded in the journal, and the journal file is flushed and synced before the Client’s data Read acknowledgment is sent to the client. The NameNode upon startup or from a recovery can replay this journal.
5.Checkpoint: To enable recovery, the persistent record of the image is also stored in the local host’s native files system and is called a checkpoint. Once the system starts up, the NameNode never modifies or updates the checkpoint file.
A new checkpoint file can be created during the next startup, on a restart, or on demand when requested by the administrator or by the CheckpointNode.
The data flow in a write operation: HDFS client who wants to write some data file onto HDFS contacts NameNode for a list of DataNodes that the client can connect to and write the contents of the file.
The NameNode updates its metadata of the request and responds with a block id and DataNode details.
The clients upload the content of the file to the DataNode, while DataNode copies the received content into the block specified by the NameNode. NameNode then finds another DataNode to comply with the replication factor.
The NameNode instructs the DataNode to copy the block to other DataNode. The replication continues among the DataNodes until the system satisfies the replication factor.
The similar and reverse approach is involved in reading the contents of a file from HDFS.
A client node, who wants to read a file, contacts NameNode for a list of DataNodes where the contents of the file reside. NameNode responds with a list of DataNodes, three DataNodes when the replication factor is three.
Client node chooses one from the list received and contacts the DataNodes to get served by the requested data.
HBase is an open-source, nonrelational, column-oriented, multidimensional, distributed database developed on Google’s BigTable architecture. It is designed with high availability and high performance as drivers to support the storage and processing of large data sets on the Hadoop framework.
HBase is not a database in the purest definition of a database. It provides unlimited scalability and performance and supports certain features of an ACID- compliant database.
HBase is classified as a NoSQL database due to its architecture and design being closely aligned to Being Available and Same Everywhere (Base). Why do we need HBase when the data is stored in the HDFS file system, which is the core data storage layer within Hadoop?
HBase is very useful for operations other than MapReduce execution, operations that are not easy to work within HDFS, and when you need random access to data.
First, it provides a database-style interface to Hadoop, which enables developers to deploy programs that can quickly read or write to specific subsets of data in an extremely voluminous data set, without having to search and process through the entire data set.
Second, it provides a transactional platform for running high-scale, real-time applications as an ACID-compliant database (meeting standards for Atomicity, Consistency, Isolation, and Durability) while handling the incredible volume, variety, and complexity of data encountered on the Hadoop platform. HBase supports the following properties of ACID compliance:
1.Atomicity: All mutations are atomic within a row. For example, a read or write operation will either succeed or fail.
2.Consistency: All rows returned for any execution will consist of a complete row that existed or exists in the table.
3.Isolation: The isolation level is called “read committed” in the traditional DBMS.
4. Durability: All visible data in the system is durable data. For example, to phrase durability, a read will never return data that has not been made durable on disk.
Data is organized in HBase as rows and columns, and tables, very similar to a database; however, here is where the similarity ends.
HBase architecture is described as follows:
a. Tables are made of rows and columns.
b.Table cells are the intersection of a row and column coordinates. Each cell is versioned by default with a timestamp. The contents of a cell are treated as an uninterpreted array of bytes.
c.A table row has a sortable row key and an arbitrary number of columns.
a. Table row keys are also byte arrays. In this configuration, anything can serve as the row key as opposed to strongly typed data types in the traditional database.
b.Table rows are sorted byte-ordered by row key, the table’s primary key, and all table accesses are via the table primary key.
c.Columns are grouped as families and a row can have as many columns as loaded.
3. Columns and column groups (families):
a. In HBase, row columns are grouped into column families.
b.All column family members will mandatorily have a common prefix; for example, the column person: name and person: comments are both members of the person column family, whereas email: identifier belongs to the email family.
c.A table’s column families must be specified upfront as part of the table schema definition.
d.New column family members can be added on demand.
Apache ZooKeeper is a distributed, consensus-based coordination system used to support distributed applications.
Distributed applications that require leader election, locking, group membership, service location, and configuration services can use ZooKeeper rather than reimplement the complex coordination and error handling that comes with these functions.
In fact, many projects within the Hadoop ecosystem, especially HBase, use ZooKeeper for exactly this purpose.
Apache Hive is a data warehouse infrastructure built on top of Hadoop provided by Facebook. Similar to Pig, Hive was initially designed as an in-house solution for large-scale data analysis.
As the company expanded, the parallel RDBMS infrastructure originally deployed at Facebook began to choke at the amount of data that had to be processed on a daily basis.
Following the decision to switch to Hadoop to overcome these scalability problems in 2008, the Hive project was developed internally to provide the high-level interface required for quick adoption of the new warehouse infrastructure inside the company.
Since 2009, Hive is also available for the general public as an open-source project under the Apache umbrella.
Inside Facebook, Hive runs thousands of jobs per day on different Hadoop clusters, ranging from 300 nodes to 1,200 nodes, to perform a wide range of tasks including periodical reporting of click counts, ad hoc analysis, and training machine learning models for ad optimization.
Other companies working with data in the petabyte magnitude like Netflix are reportedly using Hive for the analysis of website streaming logs and catalog metadata information.
The fundamental goals of designing Hive were as follows:
Build a system for managing and querying data using structured techniques on Hadoop
Use native MapReduce for execution at HDFS and Hadoop layers
Use HDFS for storage of Hive data
Store key metadata in an RDBMS
Extend SQL interfaces, a familiar data warehousing tool in use at enterprises
Provide high extensibility: user-defined types, user-defined functions, formats, and scripts
Leverage extreme scalability and performance of Hadoop
Ensure interoperability with other platforms
The main difference between Hive and the other languages discussed above comes from the fact that Hive’s design is more influenced by classic relational warehousing systems, which is evident both at the data model and at the query language level.
Hive thinks of its data in relational terms—data sources are stored in tables, consisting of a fixed number of rows with predefined data types.
Similar to Pig and Jaql, Hive’s data model provides support for semistructured and nested data in the form of complex data types such as associative arrays (maps), lists and structs, which facilitate the use of de-normalized inputs.
On the other hand, Hive differs from the other higher-level languages for Hadoop in that it uses a catalog to hold metadata about its input sources.
This means that the table schema must be declared and the data loaded before any queries involving the table are submitted to the system (which mirrors the standard RDBMS process).
The schema definition language extends the classic DDL CREATE TABLE syntax. Currently, Hive does not provide support for updates, which means that any data load statement will enforce the removal of any old data in the specified target table or partition.
The standard way to append data to an existing table in Hive is to create a new partition for each append set.
Since appends in an OLAP environment are typically performed periodically in a batch manner, this strategy is a good fit for most real-world scenarios.
The Hive Query Language (HiveQL) is a SQL dialect with various syntax extensions.
HiveQL supports many traditional SQL features such as from clause subqueries, various join types, group bys, and aggregations, as well as many useful built-in data processing functions that provide an intuitive syntax for writing Hive queries to all users familiar with the SQL basics.
In addition, HiveQL provides native support for inline MapReduce job specification. The semantics of the mapper and the reducer are specified in external scripts, which communicate with the parent Hadoop task through the standard input and output streams (similar to the Streaming API for user-defined functions (UDFs) in Pig).
As data volumes and processing complexities increase, analyzing large data sets introduces dataflow complexitiesthat become harder to implement in a MapReduce program.
There was a need for an abstraction layer over MapReduce: a high-level language that is more user-friendly, is SQL-like in terms of expressing dataflows, has the flexibility to manage multistep data transformations, and handles joins with simplicity and easy program flow.
Apache Pig was the first system to provide a higher-level language on top of Hadoop.
Pig started as an internal research project at Yahoo! (one of the early adopters of Hadoop), but due to its popularity, subsequently was promoted to a production level system and adopted as an open-source project by the Apache Software Foundation.
Pig is widely used both inside and outside Yahoo! for a wide range of tasks including ad hoc data analytics, ETL tasks, log processing, and training collaborative filtering models for recommendation systems.
The fundamental goals of designing Pig were as follows:
Programming flexibility: The ability to break down complex tasks comprised of multiple steps and interprocess-related data transformations should be encoded as data flow sequences that are easy to design, develop, and maintain.
Automatic optimization: Tasks are encoded to let the system optimize their execution automatically. This allows the user to have a greater focus on program development, allowing the user to focus on semantics rather than efficiency.
Extensibility: Users can develop user-defined functions (UDFs) for more complex processing requirements.
Pig queries are expressed in a declarative scripting language called Pig Latin, which provides SQL-like functionality tailored for big data-specific needs.
Most notably from the syntax point of view, Pig Latin enforces implicit specification of the data flow as a sequence of expressions chained together through the use of variables.
This style of programming is different from SQL, where the order of computation is not reflected at the language level and is better suited to the ad hoc nature of Pig as it makes query development and maintenance easier due to the increased readability of the code.
Unlike traditional SQL systems, the data does not have to be stored in a system- specific format before it can be used by a query. Instead, the input and output formats are specified through storage functions inside the load and store expressions.
In addition to ASCII and binary storage, users can implement their own storage functions to add support for other custom formats. Pig uses a dynamic type system to provide native support for nonnormalized data models.
In addition to the simple data types used by relational databases, Pig defines three complex types—tuple, bag, and map—which can be nested arbitrarily to reflect the semi-structured nature of the processed data.
For better support of ad hoc queries, Pig does not maintain a catalog with schema information about the source data.
Instead, input schema is defined at the query level either explicitly by the user or implicitly through type inference. At the top level, all input sources are treated as bags of tuples; the tuple schema can be optionally supplied as part of the load expression.
Apache Kafka is a robust pub-sub (publish, subscribe) framework that allows highly available, dependable message streaming.
It is a paradox that a key feature of Kakfa is its small number of features. It has far fewer features and is much less configurable than Flume, which is described later.
All Kafka does is store messages relatively reliably and at high volumes and rates. All computational considerations are outside of Kafka’s scope.
Such computation can be implemented using a variety of computational frameworks such as Spark Streaming, Apache Storm, or Apache Samza. This simplicity and focus of Kafka have helped make it very good at what it does.
Apache Flume is a streaming data collection and aggregation system designed to transport massive volumes of data into systems such as Hadoop.
It supports native connectivity and support for writing directly to HDFS and simplifies reliable, streaming data delivery from a variety of sources including RPC services, log4j appenders, Syslog, and even the output from OS commands.
Data can be routed, load-balanced, replicated to multiple destinations and aggregated from thousands of hosts by a tier of agents.
A flume is a complex tool that allows processing units to be strung together to transport data, typically with the aim of doing minimal ETL on the fly and then storing in HDFS files.
Flume is nominally stream based, but a de facto batch orientation is often imposed by storing data in HDFS files. If data is pushed instead into a pub-sub framework like Kafka, then the true streaming operation can be achieved.
Flume has limited and complex provisions for high availability and guaranteed delivery.
Flume was originally limited to processing textual data arranged one record per line as is normally done in log files, and there are still echoes of this limitation in various parts of the framework.
In general, the complexity of Flume makes the use of Kafka plus either Storm or Spark Streaming a preferable option.
Sqoop is a batch-oriented program to import data from a database or export data back to a database. Sqoop can create files in a variety of formats in a Hadoop cluster.
Sqoop is a very useful tool due to the wide range of databases that it supports, but Sqoop is, by design, entirely batch oriented. Sqoop, short for “SQL to Hadoop,” performs bidirectional data transfer between Hadoop and almost any database with a JDBC driver.
For even greater performance, Sqoop supports database-specific plug-ins that use native features of the RDBMS rather than incurring the overhead of JDBC.
Many of these connectors are open source, while others are free or available from commercial vendors at a cost. Today, Sqoop includes native connectors (called direct support) for MySQL and PostgreSQL.
Free connectors exist for Teradata, Netezza, SQL Server, and Oracle (from Quest Software), and are available for download from their respective company websites.
Impala is a system that is designed to scan flat files at a very high rate to compute the result of aggregation queries written in SQL.
The preferred format for data to be queried by Impala is Parquet, and Impala is able to use the features of Parquet to great advantage to accelerate the scanning of large files.
For the most part, Impala uses the table management capabilities of Hive to define tables and their schema. The data model used by Impala is currently very similar to the model used by relational systems.
Impala differs from most of the Hadoop ecosystem projects that existed at the time in that it is not based on MapReduce.
Designed to optimize latency, its architecture is similar to that of traditional massively parallel processing (MPP) data warehouses, such as Netezza, Greenplum, and Teradata.
Impala delivers query latency and concurrency similar to traditional data warehouses and significantly lower than that of Hive running on MapReduce.
To avoid creating silos of data within Hadoop, Impala uses the same SQL dialect as Hive and uses the Hive metastore. This allows users to define tables once and use them in Hive, Pig, and Impala.
Similar to Hive, Impala supports both HDFS and HBase as data sources, and most of the popular data formats (delimited text, SequenceFiles, Avro, and Parquet).
This allows Impala to query all data in Hadoop without requiring special transformations. Impala has a shared-nothing architecture, which allows for system-level fault tolerance and huge scalability that allows Impala to remain performant as the number of users and concurrent queries increases.
Impala’s architecture includes the Impala daemons (“impaired”), the “catalog service,” and the “state store.” Impala daemons run on every node in the cluster, and each daemon is capable of acting as the query planner, the query coordinator, and a query execution engine.
To connect to Impala, the client uses JDBC, ODBC, impala-shell, or connects directly via Apache Thrift to connect to one of the Impala daemons.
All Impala daemons are identical and interchangeable, so the client will typically connect to a load balancer that will direct the connection to an active daemon. The daemon the client connects to will act as a query planner and coordinator for this query.
Note that unlike other database management systems such as an RDBMS, Impala does not have to implement the underlying data store, because this is off-loaded to HDFS and HBase.
Impala also does not have to implement the table and database management solution since this is implemented by the Hive metastore. This allows Impala to focus on its core functionality, which is executing queries as fast as possible.
Apache Drill is a newly graduated top-level Apache project that offers the user an unusual level of flexibility. It provides standard SQL (not SQL-like) query capabilities that can access a surprisingly diverse range of data sources and formats, including nested formats such as Parquet and JSON. Drill queries can be schema-less, allowing flexibility in data exploration.
The Drill optimizer is a sophisticated cost-based optimizer that can radically restructure queries based on characteristics of the input files. Drill also offers useful extensibility, so it is a useful tool for business analysts as well as for developers.
Apache Whirr was developed to simplify the creation and deployment of ephemeral clusters in cloud environments such as Amazon’s AWS. Run as a command line tool either locally or within the cloud, Whirr can spin up instances, deploy Hadoop, configure the software, and tear it down on demand.
Under the hood, Whirr uses the powerful jclouds library so that it is cloud provider neutral. Whirr supports both Amazon EC2 and RackspaceCloud. In addition to Hadoop, Whirr understands how to provision Apache Cassandra, Apache ZooKeeper, Apache HBase, ElasticSearch, Voldemort, and Apache Hama.
Big Data Analysis Languages, Tools, and Environments
Success with Hadoop requires a new way of thinking as well as a sense of urgency. Organizations now want the capability of batch processing and interactive and real-time queries with their big data platforms.
This requires building the right combination of software frameworks, tools, in-memory software, distributed search, and NoSQL databases around Hadoop, and leveraging the existing software from proprietary software firms.
The analytical systems that maximize business value are those systems that allow data from multiple sources of different types to be correlated to find new data patterns that provide significantly increased accuracy.
The world of relational databases and data warehouses that require deleting, ignoring, aggregating, and summarizing data because of the high costs of storage is a losing formula for descriptive and predictive analytics. It is the detailed data that contains the golden information (insights) for success.
Big data platforms bring together a number of very important components required for fast and accurate analytics to address the key goals of big data, such as
Be able to make business decisions faster than your competition with a higher degree of confidence and less risk.
Increase the type and number of questions you can ask of your data for more business insight and value.
Increase the level of efficiency and competitiveness of an organization.
Create an environment that provides new business insights through data.
Spark is a fault-tolerant and distributed data analytics tool capable of implementing large-scale data-intensive applications on commodity hardware.
Hadoop and other technologies have already popularized acyclic data flow techniques for building data-intensive applications on commodity clusters, but these are not suitable for applications that reuse a working dataset for multiple parallel operations such as iterative machine- learning algorithms and interactive data analysis tools.
Spark not only is scalable and fault-tolerant but also addresses these problems by introducing a data storage and processing abstraction called Resilient Distributed Datasets (RDDs). An RDD is a read-only distribution of objects that are partitioned across a set of machines and can be rebuilt if a partition is lost.
MapReduce revolutionized computation over huge datasets by offering a simple model for writing programs that could execute in parallel across hundreds to thousands of machines.
The MapReduce engine achieves near linear scalability—as the data size increases, one can throw more computers at it and see jobs complete in the same amount of time—and is resilient to the fact that failures that occur rarely on a single machine occur all the time on clusters of thousands.
It breaks up work into small tasks and can gracefully accommodate task failures without compromising the job to which they belong.
Hadoop users have a challenge in using MapReduce to address the following:
Iterative jobs: Gradient-Descent is an excellent example of an algorithm that is repeatedly applied to the same dataset to optimize a parameter. While it is easy to represent each iteration as a MapReduce job, the data from each iteration has to be loaded from the disk, incurring a significant performance penalty.
Interactive analytics: Interfaces such as Pig and Hive are commonly used running SQL queries on large data sets using Hadoop.
Ideally, this dataset is loaded into memory and queried repeatedly, but with Hadoop, every query is executed as a MapReduce job that incurs significant latency from disk read.
Distributed Shared Memory (DSM) is a mechanism by which processes can access shared data without interprocess communication. The challenges of implementing a DSM system include addressing problems such as data location, data access, sharing and locking data, and data coherence.
These problems have connections with transactional models, data migrations, concurrent programming, distributed systems, etc. RDDs are an important abstraction in Spark that allows a read-only collection of objects capable of rebuilding lost partitions across clusters.
These RDDs can be reused in multiple parallel operations through memory caching. RDDs use lineage information about the lost partition in the rebuilding process.
Systems such as MapReduce provide locality-aware scheduling, fault tolerance, and load- balancing features that simplify distributed programming, allowing users to analyze large datasets using commodity clusters.
However, RDDs are a distributed memory abstraction that allows the users to write in-memory computations while still maintaining the advantages of current data flow systems such as MapReduce.
RDD performs exceptionally well on iterative algorithms, machine learning, and interactive data mining, while the other data flows become inadequate. This is possible because RDDs provide only read-only datasets, avoiding the need to the checkpoint, which is common in other shared-memory techniques.
RDDs are easy to program and capable of efficiently expressing computations. Fault tolerance is perhaps most difficult to support. Checkpointing datasets has a huge disadvantage as it would require replicating datasets across clusters. This can slow down the machines due to bandwidth restrictions and memory storage restrictions.
RDDs ensure fault tolerance by supporting stages of transformation. At any point in time, a partition can be recovered by repeating the transformation steps on the parent (Lineage).
RDDs are well suited for applications that require executing a single function on many data records such as graph and machine learning algorithms.
Spark maintains MapReduce’s linear scalability and fault tolerance but extends it in a few important ways. First, rather than relying on a rigid map-then-reduce format, its engine can execute a more general directed acyclic graph (DAG) of operators.
This means that, in situations where MapReduce must write out intermediate results to the distributed file system, Spark can pass them directly to the next step in the pipeline.
Spark extends its predecessors (such as MapReduce) with in-memory processing. RDD enables developers to materialize any point in a processing pipeline into memory across the cluster, meaning that future steps that want to deal with the same dataset need not recompute it or reload it from disk.
This capability opens up use cases that distributed processing engines could not previously approach. Spark is well suited for highly iterative algorithms that require multiple passes over a dataset, as well as reactive applications that quickly respond to user queries by scanning large in-memory datasets.
Components of Spark are as follows:
1.Driver: The driver defines RDD objects and their relationships; it is the code that includes the “main” function and defines the resilient RDDs and their transformations.
2.DAG scheduler: The DAG scheduler optimizes the code and arrives at an efficient DAG that represents the data processing steps in the application. The resulting DAG is sent to the cluster manager.
3.Cluster manager: The cluster manager is responsible for assigning specific processing tasks to workers. The cluster manager has information about the workers, assigned threads, and the location of data blocks.
The cluster manager is also the service that handles DAG play-back in the case of worker failure. The cluster manager can be YARN, Mesos, or Spark’s cluster manager.
4.Worker: The worker receives units of work and data to manage. The worker executes its specific task without knowledge of the entire DAG, and its results are sent back to the driver application.
Spark includes two types of variables that allow sharing of information between the execution nodes, which are as follows:
Broadcast variables: Broadcast variables are sent to all the remote execution nodes, where they can be used for data processing. This is similar to the role that configuration objects play in MapReduce.
Accumulator variables: Accumulators are also sent to the remote execution nodes, but unlike broadcast variables, they also can be “added” by the executors. Accumulators are somewhat similar to MapReduce counters.
SparkContext is an object that represents the connection to a Spark cluster. It is used to create RDDs, broadcast data, and initialize accumulators.
Resilient Distributed Datasets
RDDs are collections of serializable elements that are partitioned so that they can be stored on multiple nodes. An RDD may reside in memory or on disk.
Spark uses RDDs to reduce I/O and maintain the processed data set in memory while still tolerating node failures without having to restart the entire job. RDDs are typically created from either of the following:
A Hadoop input format (e.g., a file on HDFS): Spark determines the number of partitions by the input format, very similar to the way splits are determined in MapReduce jobs.
Transformations applied on existing RDDs: Spark can shuffle the data and repartition it to any number of partitions.
RDDs store their “lineage,” that is, the set of transformations that was used to create the current state, starting from the first input format that was used to create the RDD. If the data is lost, Spark will replay the lineage to rebuild the lost RDDs so the job can continue.
Transformations are functions that take one RDD and return another. RDDs are “immutable”; so transformations will never modify their input, and only return the modified RDD.
Transformations in Spark are always “lazy” in that they do not compute their own results. Instead, calling a transformation function only creates a new RDD with this specific transformation as part of its lineage;
The complete set of transformations is executed only when an “action” is called. This improves Spark’s efficiency and allows it to cleverly optimize the execution graph.
There are many transformations in Spark, which are as follows:
Applies a function on every element of an RDD to produce a new RDD. This is similar to the way the MapReducemap() method is applied to every element in the input data.
For example lines.map(s=>s.length) takes an RDD of Strings (“lines”) and returns an RDD with the length of the strings.
Takes a Boolean function as a parameter, executes this function on every element of the RDD and returns a new RDD containing only the elements for which the function returned true.
For example, lines.filter(s=>(s.length>35)) returns an RDD containing only the lines with more than 35 characters.
Takes every element in an RDD and turns it into a key-value pair in a new RDD.
For example, lines.keyBy(s=>s.length) return, an RDD of key-value pairs with the length of the line as the key, and the line as the value.
Joins two key-value RDDs by their keys.
For example, let us assume we have two RDDs: lines and more_lines. Each entry in both RDDs contains the line length as the key and the line as the value.
lines.join(more_lines) will return for each line length a pair of Strings, one from the lines RDD and one from the more_lines RDD.
Each resulting element looks like <length,<line,more_line>>.
Performs a group-by operation on an RDD by the keys.
For example lines.groupByKey() will return an RDD where each element has a length as the key and a collection of lines with that length as the value.
Performs a sort on an RDD and returns a sorted RDD.
“Actions” are methods that take an RDD, perform a computation, and return the result to the driver application. Recall that transformations are “lazy” and are not executed when called.
Actions trigger the computation of transformations. The result of the computation can be a collection, values printed to the screen, values saved to file, or similar. However, action will never return an RDD.
Benefits of Spark
1. Interactive shell (REPL): Spark jobs can be easily deployed as an application, similar to how MapReduce jobs are executed. In addition, Spark also includes a shell called REPL (Read-Eval-Print-Loop).
This allows for fast interactive experimentation with the data and easy validation of code. REPL enables quick access to data as well as interactive queries.
2. Reduced disk I/O: MapReduce writes to the local disk at the end of the map phase and to HDFS at the end of the reduce phase.
This means that while processing 1 Tb of data, the system may write 4 Tb of data to disk and send 2 Tb of data over the network. When the application is stringing multiple MapReduce jobs together, the situation is even worse.
Spark’s RDDs can be stored in memory and processed in multiple steps or iterations without additional I/O. Because there are no special map and reduce phases, data is typically read from disk when processing starts and written to disk only when there is a need to persist results.
3.Storage: Spark gives developers the flexibility to store RDDs:
a.In memory on a single node
b.In memory but replicated to multiple nodes
c.Persisted to disk
An RDD can go through multiple stages of transformation (equivalent to multiple maps and reduce phases) without storing anything to disk.
4. Multilanguage: While Spark itself is developed in Scala, Spark APIs are implemented for Java, Scala, and Python. This allows developers to use Spark in the language in which they are most productive.
Hadoop developers often use Java APIs, whereas data scientists often prefer the Python implementation so that they can use Spark combined with Python’s powerful numeric processing libraries.
5. Resource manager independence: Spark supports both YARN and Mesos as resource managers, as well as in a standalone mode. Since each resource manager has its own strengths and limitations, Spark enables developers to use the resource manager(s) of their choice. This allows Spark to be enabled for changes in resource managers in the future.
6.Simplicity: Spark APIs are significantly cleaner and simpler than those of MapReduce. The APIs are so usable that they obviate the need for any high-level abstractions on top of Sparks like those that are essential for MapReduce, such as Hive or Pig.
Consequently, Spark codes are significantly shorter than their MapReduce equivalents and are easily readable even by someone not familiar with Spark.
7.Versatility: Spark was built from the ground up to be an extensible, general- purpose parallel-processing framework. It is generic enough to support a stream-processing framework called Spark Streaming and a graph processing engine called GraphX.
With this flexibility, Spark can be expected to see many new special-purpose libraries in the future.