Thursday 4 October 2012

Hadoop Distributed File System (HDFS) in MapReduce - Hadoop Online Training

The HDFS is a powerful companion to Hadoop MapReduce. By setting the fs.default.nameconfiguration option to point to the NameNode (as was done above), Hadoop MapReduce jobs will automatically draw their input files from HDFS. Using the regular FileInputFormat subclasses, Hadoop will automatically draw its input data sources from file paths within HDFS, and will distribute the work over the cluster in an intelligent fashion to exploit block locality where possible. The mechanics of Hadoop MapReduce are discussed in much greater detail in Module 4.

Hadoop Distributed File System (HDFS) Admin Command Reference - Hadoop Online Training


While the dfs module for bin/hadoop provides common file and directory manipulation commands, they all work with objects within the file system. The dfsadmin module manipulates or queries the file system as a whole. The operation of the commands in this module is described in this section.
Getting overall status: A brief status report for HDFS can be retrieved with bin/hadoop dfsadmin -report. This returns basic information about the overall health of the HDFS cluster, as well as some per-server metrics.
More involved status: If you need to know more details about what the state of the NameNode's metadata is, the command bin/hadoop dfsadmin -metasave filename will record this information in filename. The metasave command will enumerate lists of blocks which are under-replicated, in the process of being replicated, and scheduled for deletion. NB: The help for this command states that it "saves NameNode's primary data structures," but this is a misnomer; the NameNode's state cannot be restored from this information. However, it will provide good information about how the NameNode is managing HDFS's blocks.
Safemode: Safemode is an HDFS state in which the file system is mounted read-only; no replication is performed, nor can files be created or deleted. This is automatically entered as the NameNode starts, to allow all DataNodes time to check in with the NameNode and announce which blocks they hold, before the NameNode determines which blocks are under-replicated, etc. The NameNode waits until a specific percentage of the blocks are present and accounted-for; this is controlled in the configuration by thedfs.safemode.threshold.pct parameter. After this threshold is met, safemode is automatically exited, and HDFS allows normal operations. The bin/hadoop dfsadmin -safemode whatcommand allows the user to manipulate safemode based on the value of what, described below:
  • enter - Enters safemode
  • leave - Forces the NameNode to exit safemode
  • get - Returns a string indicating whether safemode is ON or OFF
  • wait - Waits until safemode has exited and returns
Changing HDFS membership - When decommissioning nodes, it is important to disconnect nodes from HDFS gradually to ensure that data is not lost. See the section on decommissioning later in this document for an explanation of the use of the -refreshNodes dfsadmin command.
Upgrading HDFS versions - When upgrading from one version of Hadoop to the next, the file formats used by the NameNode and DataNodes may change. When you first start the new version of Hadoop on the cluster, you need to tell Hadoop to change the HDFS version (or else it will not mount), using the command: bin/start-dfs.sh -upgrade. It will then begin upgrading the HDFS version. The status of an ongoing upgrade operation can be queried with the bin/hadoop dfsadmin -upgradeProgress status command. More verbose information can be retrieved with bin/hadoop dfsadmin -upgradeProgress details. If the upgrade is blocked and you would like to force it to continue, use the command: bin/hadoop dfsadmin -upgradeProgress force. (Note: be sure you know what you are doing if you use this last command.)
When HDFS is upgraded, Hadoop retains backup information allowing you to downgrade to the original HDFS version in case you need to revert Hadoop versions. To back out the changes, stop the cluster, re-install the older version of Hadoop, and then use the command: bin/start-dfs.sh -rollback. It will restore the previous HDFS state.
Only one such archival copy can be kept at a time. Thus, after a few days of operation with the new version (when it is deemed stable), the archival copy can be removed with the command bin/hadoop dfsadmin -finalizeUpgrade. The rollback command cannot be issued after this point. This must be performed before a second Hadoop upgrade is allowed.
Getting help - As with the dfs module, typing bin/hadoop dfsadmin -help cmd will provide more usage information about the particular command.

Hadoop Distributed File System (HDFS) Command Reference - Hadoop Online Training


There are many more commands in bin/hadoop dfs than were demonstrated here, although these basic operations will get you started. Running bin/hadoop dfs with no additional arguments will list all commands which can be run with the FsShell system. Furthermore, bin/hadoop dfs -helpcommandName will display a short usage summary for the operation in question, if you are stuck.
A table of all operations is reproduced below. The following conventions are used for parameters:
  • italics denote variables to be filled out by the user.
  • "path" means any file or directory name.
  • "path..." means one or more file or directory names.
  • "file" means any filename.
  • "src" and "dest" are path names in a directed operation.
  • "localSrc" and "localDest" are paths as above, but on the local file system. All other file and path names refer to objects inside HDFS.
  • Parameters in [brackets] are optional.
CommandOperation
-ls pathLists the contents of the directory specified by path, showing the names, permissions, owner, size and modification date for each entry.
-lsr pathBehaves like -ls, but recursively displays entries in all subdirectories of path.
-du pathShows disk usage, in bytes, for all files which match path; filenames are reported with the full HDFS protocol prefix.
-dus pathLike -du, but prints a summary of disk usage of all files/directories in the path.
-mv src destMoves the file or directory indicated by src to dest, within HDFS.
-cp src destCopies the file or directory identified by src to dest, within HDFS.
-rm pathRemoves the file or empty directory identified by path.
-rmr pathRemoves the file or directory identified by path. Recursively deletes any child entries (i.e., files or subdirectories of path).
-put localSrcdestCopies the file or directory from the local file system identified by localSrc to destwithin the DFS.
-copyFromLocallocalSrc destIdentical to -put
-moveFromLocallocalSrc destCopies the file or directory from the local file system identified by localSrc to destwithin HDFS, then deletes the local copy on success.
-get [-crc] srclocalDestCopies the file or directory in HDFS identified by src to the local file system path identified by localDest.
-getmerge srclocalDest[addnl]Retrieves all files that match the path src in HDFS, and copies them to a single, merged file in the local file system identified by localDest.
-cat filenameDisplays the contents of filename on stdout.
-copyToLocal [-crc] srclocalDestIdentical to -get
-moveToLocal [-crc] srclocalDestWorks like -get, but deletes the HDFS copy on success.
-mkdir pathCreates a directory named path in HDFS. Creates any parent directories in path that are missing (e.g., like mkdir -p in Linux).
-setrep [-R] [-w]rep pathSets the target replication factor for files identified by path to rep. (The actual replication factor will move toward the target over time)
-touchz pathCreates a file at path containing the current time as a timestamp. Fails if a file already exists at path, unless the file is already size 0.
-test -[ezd]pathReturns 1 if path exists; has zero length; or is a directory, or 0 otherwise.
-stat [format]pathPrints information about pathformat is a string which accepts file size in blocks (%b), filename (%n), block size (%o), replication (%r), and modification date (%y, %Y).
-tail [-f] fileShows the lats 1KB of file on stdout.
-chmod [-R]mode,mode,...path...Changes the file permissions associated with one or more objects identified bypath.... Performs changes recursively with -Rmode is a 3-digit octal mode, or{augo}+/-{rwxX}. Assumes a if no scope is specified and does not apply a umask.
-chown [-R] [owner][:[group]] path...Sets the owning user and/or group for files or directories identified by path.... Sets owner recursively if -R is specified.
-chgrp [-R]group path...Sets the owning group for files or directories identified by path.... Sets group recursively if -R is specified.
-help cmdReturns usage information for one of the commands listed above. You must omit the leading '-' character in cmd

Shutting Down Hadoop Distributed File System (HDFS) - Hadoop Online Training


If you want to shut down the HDFS functionality of your cluster (either because you do not want Hadoop occupying memory resources when it is not in use, or because you want to restart the cluster for upgrading, configuration changes, etc.), then this can be accomplished by logging in to the NameNode machine and running:
  someone@namenode:hadoop$ bin/stop-dfs.sh
This command must be performed by the same user who started HDFS with bin/start-dfs.sh.

Retrieving data from Hadoop Distributed File System (HDFS) - Hadoop Online Training


There are multiple ways to retrieve files from the distributed file system. One of the easiest is to use catto display the contents of a file on stdout. (It can, of course, also be used to pipe the data into other applications or destinations.)
Step 1: Display data with cat.
If you have not already done so, upload some files into HDFS. In this example, we assume that a file named "foo" has been loaded into your home directory on HDFS.
  someone@anynode:hadoop$ bin/hadoop dfs -cat foo
  (contents of foo are displayed here)
  someone@anynode:hadoop$
Step 2: Copy a file from HDFS to the local file system.
The get command is the inverse operation of put; it will copy a file or directory (recursively) from HDFS into the target of your choosing on the local file system. A synonymous operation is called -copyToLocal.
  someone@anynode:hadoop$ bin/hadoop dfs -get foo localFoo
  someone@anynode:hadoop$ ls
  localFoo
  someone@anynode:hadoop$ cat localFoo
  (contents of foo are displayed here)
Like the put command, get will operate on directories in addition to individual files.

Listing files - Interacting With Hadoop Distributed File System (HDFS) - Hadoop Online Training


If we attempt to inspect HDFS, we will not find anything interesting there:
  someone@anynode:hadoop$ bin/hadoop dfs -ls
  someone@anynode:hadoop$
The "-ls" command returns silently. Without any arguments, -ls will attempt to show the contents of your "home" directory inside HDFS. Don't forget, this is not the same as /home/$USER (e.g.,/home/someone) on the host machine (HDFS keeps a separate namespace from the local files). There is no concept of a "current working directory" or cd command in HDFS.
If you provide -ls with an argument, you may see some initial directory contents:
  someone@anynode:hadoop$ bin/hadoop dfs -ls /
  Found 2 items
  drwxr-xr-x   - hadoop supergroup          0 2008-09-20 19:40 /hadoop
  drwxr-xr-x   - hadoop supergroup          0 2008-09-20 20:08 /tmp
These entries are created by the system. This example output assumes that "hadoop" is the username under which the Hadoop daemons (NameNode, DataNode, etc) were started. "supergroup" is a special group whose membership includes the username under which the HDFS instances were started (e.g., "hadoop"). These directories exist to allow the Hadoop MapReduce system to move necessary data to the different job nodes; this is explained in more detail in Module 4.
So we need to create our home directory, and then populate it with some files.
class="sectionSubH">Inserting data into the cluster
Whereas a typical UNIX or Linux system stores individual users' files in /home/$USER, the Hadoop DFS stores these in /user/$USER. For some commands like ls, if a directory name is required and is left blank, this is the default directory name assumed. (Other commands require explicit source and destination paths.) Any relative paths used as arguments to HDFS, Hadoop MapReduce, or other components of the system are assumed to be relative to this base directory.
Step 1: Create your home directory if it does not already exist.
  someone@anynode:hadoop$ bin/hadoop dfs -mkdir /user
If there is no /user directory, create that first. It will be automatically created later if necessary, but for instructive purposes, it makes sense to create it manually ourselves this time.
Then we are free to add our own home directory:
  someone@anynode:hadoop$ bin/hadoop dfs -mkdir /user/someone
Of course, replace /user/someone with /user/yourUserName.
Step 2: Upload a file. To insert a single file into HDFS, we can use the put command like so:
  someone@anynode:hadoop$ bin/hadoop dfs -put /home/someone/interestingFile.txt /user/yourUserName/
This copies /home/someone/interestingFile.txt from the local file system into/user/yourUserName/interestingFile.txt on HDFS.
Step 3: Verify the file is in HDFS. We can verify that the operation worked with either of the two following (equivalent) commands:
  someone@anynode:hadoop$ bin/hadoop dfs -ls /user/yourUserName
  someone@anynode:hadoop$ bin/hadoop dfs -ls
You should see a listing that starts with Found 1 items and then includes information about the file you inserted.
The following table demonstrates example uses of the put command, and their effects:
Command:Assuming:Outcome:
bin/hadoop dfs -put foo barNo file/directory named/user/$USER/barexists in HDFSUploads local file foo to a file named/user/$USER/bar
bin/hadoop dfs -put foo bar/user/$USER/bar is a directoryUploads local file foo to a file named/user/$USER/bar/foo
bin/hadoop dfs -put foo somedir/somefile/user/$USER/somedirdoes not exist in HDFSUploads local file foo to a file named/user/$USER/somedir/somefile, creating the missing directory
bin/hadoop dfs -put foo bar/user/$USER/bar is already a file in HDFSNo change in HDFS, and an error is returned to the user.
When the put command operates on a file, it is all-or-nothing. Uploading a file into HDFS first copies the data onto the DataNodes. When they all acknowledge that they have received all the data and the file handle is closed, it is then made visible to the rest of the system. Thus based on the return value of the put command, you can be confident that a file has either been successfully uploaded, or has "fully failed;" you will never get into a state where a file is partially uploaded and the partial contents are visible externally, but the upload disconnected and did not complete the entire file contents. In a case like this, it will be as though no upload took place.
Step 4: Uploading multiple files at once. The put command is more powerful than moving a single file at a time. It can also be used to upload entire directory trees into HDFS.
Create a local directory and put some files into it using the cp command. Our example user may have a situation like the following:
  someone@anynode:hadoop$ ls -R myfiles
  myfiles:
  file1.txt  file2.txt  subdir/

  myfiles/subdir:
  anotherFile.txt
  someone@anynode:hadoop$
This entire myfiles/ directory can be copied into HDFS like so:
  someone@anynode:hadoop$ bin/hadoop -put myfiles /user/myUsername
  someone@anynode:hadoop$ bin/hadoop -ls
  Found 1 items
  /user/someone/myfiles   <dir>    2008-06-12 20:59    rwxr-xr-x    someone    supergroup
  user@anynode:hadoop bin/hadoop -ls myfiles
  Found 3 items
  /user/someone/myfiles/file1.txt   <r 1>   186731  2008-06-12 20:59        rw-r--r--       someone   supergroup
  /user/someone/myfiles/file2.txt   <r 1>   168     2008-06-12 20:59        rw-r--r--       someone   supergroup
  /user/someone/myfiles/subdir      <dir>           2008-06-12 20:59        rwxr-xr-x       someone   supergroup
Thus demonstrating that the tree was correctly uploaded recursively. You'll note that in addition to the file path, ls also reports the number of replicas of each file that exist (the "1" in <r 1>), the file size, upload time, permissions, and owner information.
Another synonym for -put is -copyFromLocal. The syntax and functionality are identical.

Interacting With Hadoop Distributed File System (HDFS) - Hadoop Online Training


This section will familiarize you with the commands necessary to interact with HDFS, loading and retrieving data, as well as manipulating files. This section makes extensive use of the command-line.
The bulk of commands that communicate with the cluster are performed by a monolithic script namedbin/hadoop. This will load the Hadoop system with the Java virtual machine and execute a user command. The commands are specified in the following form:
  user@machine:hadoop$ bin/hadoop moduleName -cmd args...
The moduleName tells the program which subset of Hadoop functionality to use. -cmd is the name of a specific command within this module to execute. Its arguments follow the command name.
Two such modules are relevant to HDFS: dfs and dfsadmin. Their use is described in the sections below.

COMMON EXAMPLE OPERATIONS

The dfs module, also known as "FsShell," provides basic file manipulation operations. Their usage is introduced here.
A cluster is only useful if it contains data of interest. Therefore, the first operation to perform is loading information into the cluster. For purposes of this example, we will assume an example user named "someone" -- but substitute your own username where it makes sense. Also note that any operation on files in HDFS can be performed from any node with access to the cluster, whose conf/hadoop-site.xml is configured to set fs.default.name to your cluster's NameNode. We will call the fictional machine on which we are operating anynode. Commands are being run from the "hadoop" directory where you installed Hadoop. This may be /home/someone/src/hadoop on your machine, or/home/foo/hadoop on someone else's. These initial commands are centered around loading information into HDFS, checking that it's there, and getting information back out of HDFS.

Configuring Hadoop Distributed File System (HDFS) - Hadoop Online Training


The HDFS for your cluster can be configured in a very short amount of time. First we will fill out the relevant sections of the Hadoop configuration file, then format the NameNode.

CLUSTER CONFIGURATION

These instructions for cluster configuration assume that you have already downloaded and unzipped a copy of Hadoop. Module 3 discusses getting started with Hadoop for this tutorial. Module 7 discusses how to set up a larger cluster and provides preliminary setup instructions for Hadoop, including downloading prerequisite software.
The HDFS configuration is located in a set of XML files in the Hadoop configuration directory; conf/under the main Hadoop install directory (where you unzipped Hadoop to). The conf/hadoop-defaults.xml file contains default values for every parameter in Hadoop. This file is considered read-only. You override this configuration by setting new values in conf/hadoop-site.xml. This file should be replicated consistently across all machines in the cluster. (It is also possible, though not advisable, to host it on NFS.)
Configuration settings are a set of key-value pairs of the format:
  <property>
    <name>property-name</name>
    <value>property-value</value>

  </property>
Adding the line <final>true</final> inside the property body will prevent properties from being overridden by user applications. This is useful for most system-wide configuration options.
The following settings are necessary to configure HDFS:
keyvalueexample
fs.default.nameprotocol://servername:porthdfs://alpha.milkman.org:9000
dfs.data.dirpathname/home/username/hdfs/data
dfs.name.dirpathname/home/username/hdfs/name
These settings are described individually below:
fs.default.name - This is the URI (protocol specifier, hostname, and port) that describes the NameNode for the cluster. Each node in the system on which Hadoop is expected to operate needs to know the address of the NameNode. The DataNode instances will register with this NameNode, and make their data available through it. Individual client programs will connect to this address to retrieve the locations of actual file blocks.
dfs.data.dir - This is the path on the local file system in which the DataNode instance should store its data. It is not necessary that all DataNode instances store their data under the same local path prefix, as they will all be on separate machines; it is acceptable that these machines are heterogeneous. However, it will simplify configuration if this directory is standardized throughout the system. By default, Hadoop will place this under /tmp. This is fine for testing purposes, but is an easy way to lose actual data in a production system, and thus must be overridden.
dfs.name.dir - This is the path on the local file system of the NameNode instance where the NameNode metadata is stored. It is only used by the NameNode instance to find its information, and does not exist on the DataNodes. The caveat above about /tmp applies to this as well; this setting must be overridden in a production system.
Another configuration parameter, not listed above, is dfs.replication. This is the default replication factor for each block of data in the file system. For a production cluster, this should usually be left at its default value of 3. (You are free to increase your replication factor, though this may be unnecessary and use more space than is required. Fewer than three replicas impact the high availability of information, and possibly the reliability of its storage.)
The following information can be pasted into the hadoop-site.xml file for a single-node configuration:
<configuration>
  <property>
    <name>fs.default.name</name>

    <value>hdfs://your.server.name.com:9000</value>
  </property>
  <property>
    <name>dfs.data.dir</name>

    <value>/home/username/hdfs/data</value>
  </property>
  <property>
    <name>dfs.name.dir</name>

    <value>/home/username/hdfs/name</value>
  </property>
</configuration>
Of course, your.server.name.com needs to be changed, as does username. Using port 9000 for the NameNode is arbitrary.
After copying this information into your conf/hadoop-site.xml file, copy this to the conf/directories on all machines in the cluster.
The master node needs to know the addresses of all the machines to use as DataNodes; the startup scripts depend on this. Also in the conf/ directory, edit the file slaves so that it contains a list of fully-qualified hostnames for the slave instances, one host per line. On a multi-node setup, the master node (e.g., localhost) is not usually present in this file.
Then make the directories necessary:
  user@EachMachine$ mkdir -p $HOME/hdfs/data

  user@namenode$ mkdir -p $HOME/hdfs/name
The user who owns the Hadoop instances will need to have read and write access to each of these directories. It is not necessary for all users to have access to these directories. Set permissions withchmod as appropriate. In a large-scale environment, it is recommended that you create a user named "hadoop" on each node for the express purpose of owning and running Hadoop tasks. For a single individual's machine, it is perfectly acceptable to run Hadoop under your own username. It is not recommended that you run Hadoop as root.

STARTING HDFS

Now we must format the file system that we just configured:
  user@namenode:hadoop$ bin/hadoop namenode -format
This process should only be performed once. When it is complete, we are free to start the distributed file system:
  user@namenode:hadoop$ bin/start-dfs.sh
This command will start the NameNode server on the master machine (which is where the start-dfs.sh script was invoked). It will also start the DataNode instances on each of the slave machines. In a single-machine "cluster," this is the same machine as the NameNode instance. On a real cluster of two or more machines, this script will ssh into each slave machine and start a DataNode instance.

Hadoop Distributed File System Basics - Hadoop Online Training


A distributed file system is designed to hold a large amount of data and provide access to this data to many clients distributed across a network. There are a number of distributed file systems that solve this problem in different ways.

NFS, the Network File System, is the most ubiquitous distributed file system. It is one of the oldest still in use. While its design is straightforward, it is also very constrained. NFS provides remote access to a single logical volume stored on a single machine. An NFS server makes a portion of its local file system visible to external clients. The clients can then mount this remote file system directly into their own Linux file system, and interact with it as though it were part of the local drive.
One of the primary advantages of this model is its transparency. Clients do not need to be particularly aware that they are working on files stored remotely. The existing standard library methods like open(),close()fread(), etc. will work on files hosted over NFS.
But as a distributed file system, it is limited in its power. The files in an NFS volume all reside on a single machine. This means that it will only store as much information as can be stored in one machine, and does not provide any reliability guarantees if that machine goes down (e.g., by replicating the files to other servers). Finally, as all the data is stored on a single machine, all the clients must go to this machine to retrieve their data. This can overload the server if a large number of clients must be handled. Clients must also always copy the data to their local machines before they can operate on it.
HDFS is designed to be robust to a number of the problems that other DFS's such as NFS are vulnerable to. In particular:
  • HDFS is designed to store a very large amount of information (terabytes or petabytes). This requires spreading the data across a large number of machines. It also supports much larger file sizes than NFS.
  • HDFS should store data reliably. If individual machines in the cluster malfunction, data should still be available.
  • HDFS should provide fast, scalable access to this information. It should be possible to serve a larger number of clients by simply adding more machines to the cluster.
  • HDFS should integrate well with Hadoop MapReduce, allowing data to be read and computed upon locally when possible.
But while HDFS is very scalable, its high performance design also restricts it to a particular class of applications; it is not as general-purpose as NFS. There are a large number of additional decisions and trade-offs that were made with HDFS. In particular:
  • Applications that use HDFS are assumed to perform long sequential streaming reads from files. HDFS is optimized to provide streaming read performance; this comes at the expense of random seek times to arbitrary positions in files.
  • Data will be written to the HDFS once and then read several times; updates to files after they have already been closed are not supported. (An extension to Hadoop will provide support for appending new data to the ends of files; it is scheduled to be included in Hadoop 0.19 but is not available yet.)
  • Due to the large size of files, and the sequential nature of reads, the system does not provide a mechanism for local caching of data. The overhead of caching is great enough that data should simply be re-read from HDFS source.
  • Individual machines are assumed to fail on a frequent basis, both permanently and intermittently. The cluster must be able to withstand the complete failure of several machines, possibly many happening at the same time (e.g., if a rack fails all together). While performance may degrade proportional to the number of machines lost, the system as a whole should not become overly slow, nor should information be lost. Data replication strategies combat this problem.
The design of HDFS is based on the design of GFS, the Google File System. Its design was described ina paper published by Google.
HDFS is a block-structured file system: individual files are broken into blocks of a fixed size. These blocks are stored across a cluster of one or more machines with data storage capacity. Individual machines in the cluster are referred to as DataNodes. A file can be made of several blocks, and they are not necessarily stored on the same machine; the target machines which hold each block are chosen randomly on a block-by-block basis. Thus access to a file may require the cooperation of multiple machines, but supports file sizes far larger than a single-machine DFS; individual files can require more space than a single hard drive could hold.
If several machines must be involved in the serving of a file, then a file could be rendered unavailable by the loss of any one of those machines. HDFS combats this problem by replicating each block across a number of machines (3, by default).
nodes-and-blocks
Figure 2.1: DataNodes holding blocks of multiple files with a replication factor of 2. The NameNode maps the filenames onto the block ids.
Most block-structured file systems use a block size on the order of 4 or 8 KB. By contrast, the default block size in HDFS is 64MB -- orders of magnitude larger. This allows HDFS to decrease the amount of metadata storage required per file (the list of blocks per file will be smaller as the size of individual blocks increases). Furthermore, it allows for fast streaming reads of data, by keeping large amounts of data sequentially laid out on the disk. The consequence of this decision is that HDFS expects to have very large files, and expects them to be read sequentially. Unlike a file system such as NTFS or EXT, which see many very small files, HDFS expects to store a modest number of very large files: hundreds of megabytes, or gigabytes each. After all, a 100 MB file is not even two full blocks. Files on your computer may also frequently be accessed "randomly," with applications cherry-picking small amounts of information from several different locations in a file which are not sequentially laid out. By contrast, HDFS expects to read a block start-to-finish for a program. This makes it particularly useful to the MapReduce style of programming described in Module 4. That having been said, attempting to use HDFS as a general-purpose distributed file system for a diverse set of applications will be suboptimal.
Because HDFS stores files as a set of large blocks across several machines, these files are not part of the ordinary file system. Typing ls on a machine running a DataNode daemon will display the contents of the ordinary Linux file system being used to host the Hadoop services -- but it will not include any of the files stored inside the HDFS. This is because HDFS runs in a separate namespace, isolated from the contents of your local files. The files inside HDFS (or more accurately: the blocks that make them up) are stored in a particular directory managed by the DataNode service, but the files will named only with block ids. You cannot interact with HDFS-stored files using ordinary Linux file modification tools (e.g., lscp,mv, etc). However, HDFS does come with its own utilities for file management, which act very similar to these familiar tools. A later section in this tutorial will introduce you to these commands and their operation.
It is important for this file system to store its metadata reliably. Furthermore, while the file data is accessed in a write once and read many model, the metadata structures (e.g., the names of files and directories) can be modified by a large number of clients concurrently. It is important that this information is never desynchronized. Therefore, it is all handled by a single machine, called the NameNode. The NameNode stores all the metadata for the file system. Because of the relatively low amount of metadata per file (it only tracks file names, permissions, and the locations of each block of each file), all of this information can be stored in the main memory of the NameNode machine, allowing fast access to the metadata.
To open a file, a client contacts the NameNode and retrieves a list of locations for the blocks that comprise the file. These locations identify the DataNodes which hold each block. Clients then read file data directly from the DataNode servers, possibly in parallel. The NameNode is not directly involved in this bulk data transfer, keeping its overhead to a minimum.
Of course, NameNode information must be preserved even if the NameNode machine fails; there are multiple redundant systems that allow the NameNode to preserve the file system's metadata even if the NameNode itself crashes irrecoverably. NameNode failure is more severe for the cluster than DataNode failure. While individual DataNodes may crash and the entire cluster will continue to operate, the loss of the NameNode will render the cluster inaccessible until it is manually restored. Fortunately, as the NameNode's involvement is relatively minimal, the odds of it failing are considerably lower than the odds of an arbitrary DataNode failing at any given point in time.
A more thorough overview of the architectural decisions involved in the design and implementation of HDFS is given in the official Hadoop HDFS documentation. Before continuing in this tutorial, it is advisable that you read and understand the information presented there.

Hadoop Distributed File System - Hadoop Online Training


Outline

  1. Introduction
  2. Goals for this Module
  3. Distributed File System Basics
  4. Configuring HDFS
  5. Interacting With HDFS
    1. Common Example Operations
    2. HDFS Command Reference
    3. DFSAdmin Command Reference
  6. Using HDFS in MapReduce
  7. Using HDFS Programmatically
  8. HDFS Permissions and Security
  9. Additional HDFS Tasks
    1. Rebalancing Blocks
    2. Copying Large Sets of Files
    3. Decommissioning Nodes
    4. Verifying File System Health
    5. Rack Awareness
  10. HDFS Web Interface
  11. References

Introduction

HDFS, the Hadoop Distributed File System, is a distributed file system designed to hold very large amounts of data (terabytes or even petabytes), and provide high-throughput access to this information. Files are stored in a redundant fashion across multiple machines to ensure their durability to failure and high availability to very parallel applications. This module introduces the design of this distributed file system and instructions on how to operate it.

Goals for this Module:

  • Understand the basic design of HDFS and how it relates to basic distributed file system concepts
  • Learn how to set up and use HDFS from the command line
  • Learn how to use HDFS in your applications

Introduction - Hadoop Online Training


Outline

  1. Introduction
  2. Goals for this Module
  3. Problem Scope
    1. Challenges at Large Scale
    2. Moore's Law
  4. The Hadoop Approach
    1. Comparison to Existing Techniques
    2. Data Distribution
    3. MapReduce: Isolated Processes
    4. Flat Scalability
  5. The Rest of the Tutorial

Introduction

Welcome to the Hadoop tutorial! This series of tutorial documents will walk you through many aspects of the Apache Hadoop system. You will be shown how to set up simple and advanced cluster configurations, use the distributed file system, and develop complex Hadoop MapReduce applications. Other related systems are also reviewed.

Goals for this Module:

  • Understand the scope of problems applicable to Hadoop
  • Understand how Hadoop addresses these problems differently from other distributed systems.

Problem Scope

Hadoop is a large-scale distributed batch processing infrastructure. While it can be used on a single machine, its true power lies in its ability to scale to hundreds or thousands of computers, each with several processor cores. Hadoop is also designed to efficiently distribute large amounts of work across a set of machines.
How large an amount of work? Orders of magnitude larger than many existing systems work with. Hundreds of gigabytes of data constitute the low end of Hadoop-scale. Actually Hadoop is built to process "web-scale" data on the order of hundreds of gigabytes to terabytes or petabytes. At this scale, it is likely that the input data set will not even fit on a single computer's hard drive, much less in memory. So Hadoop includes a distributed file system which breaks up input data and sends fractions of the original data to several machines in your cluster to hold. This results in the problem being processed in parallel using all of the machines in the cluster and computes output results as efficiently as possible.

CHALLENGES AT LARGE SCALE

Performing large-scale computation is difficult. To work with this volume of data requires distributing parts of the problem to multiple machines to handle in parallel. Whenever multiple machines are used in cooperation with one another, the probability of failures rises. In a single-machine environment, failure is not something that program designers explicitly worry about very often: if the machine has crashed, then there is no way for the program to recover anyway.
In a distributed environment, however, partial failures are an expected and common occurrence. Networks can experience partial or total failure if switches and routers break down. Data may not arrive at a particular point in time due to unexpected network congestion. Individual compute nodes may overheat, crash, experience hard drive failures, or run out of memory or disk space. Data may be corrupted, or maliciously or improperly transmitted. Multiple implementations or versions of client software may speak slightly different protocols from one another. Clocks may become desynchronized, lock files may not be released, parties involved in distributed atomic transactions may lose their network connections part-way through, etc. In each of these cases, the rest of the distributed system should be able to recover from the component failure or transient error condition and continue to make progress. Of course, actually providing such resilience is a major software engineering challenge.
Different distributed systems specifically address certain modes of failure, while worrying less about others. Hadoop provides no security model, nor safeguards against maliciously inserted data. For example, it cannot detect a man-in-the-middle attack between nodes. On the other hand, it is designed to handle hardware failure and data congestion issues very robustly. Other distributed systems make different trade-offs, as they intend to be used for problems with other requirements (e.g., high security).
In addition to worrying about these sorts of bugs and challenges, there is also the fact that the compute hardware has finite resources available to it. The major resources include:
  • Processor time
  • Memory
  • Hard drive space
  • Network bandwidth
Individual machines typically only have a few gigabytes of memory. If the input data set is several terabytes, then this would require a thousand or more machines to hold it in RAM -- and even then, no single machine would be able to process or address all of the data.
Hard drives are much larger; a single machine can now hold multiple terabytes of information on its hard drives. But intermediate data sets generated while performing a large-scale computation can easily fill up several times more space than what the original input data set had occupied. During this process, some of the hard drives employed by the system may become full, and the distributed system may need to route this data to other nodes which can store the overflow.
Finally, bandwidth is a scarce resource even on an internal network. While a set of nodes directly connected by a gigabit Ethernet may generally experience high throughput between them, if all of the machines were transmitting multi-gigabyte data sets, they can easily saturate the switch's bandwidth capacity. Additionally if the machines are spread across multiple racks, the bandwidth available for the data transfer would be much less. Furthermore RPC requests and other data transfer requests using this channel may be delayed or dropped.
To be successful, a large-scale distributed system must be able to manage the above mentioned resources efficiently. Furthermore, it must allocate some of these resources toward maintaining the system as a whole, while devoting as much time as possible to the actual core computation.
Synchronization between multiple machines remains the biggest challenge in distributed system design. If nodes in a distributed system can explicitly communicate with one another, then application designers must be cognizant of risks associated with such communication patterns. It becomes very easy to generate more remote procedure calls (RPCs) than the system can satisfy! Performing multi-party data exchanges is also prone to deadlock or race conditions. Finally, the ability to continue computation in the face of failures becomes more challenging. For example, if 100 nodes are present in a system and one of them crashes, the other 99 nodes should be able to continue the computation, ideally with only a small penalty proportionate to the loss of 1% of the computing power. Of course, this will require re-computing any work lost on the unavailable node. Furthermore, if a complex communication network is overlaid on the distributed infrastructure, then determining how best to restart the lost computation and propagating this information about the change in network topology may be non trivial to implement.

MOORE'S LAW

So why use a distributed system at all? They seem like more trouble than they're worth. And with the fast pace of computer hardware design, it seems inevitable that single-chip hardware will be able to "grow up" to handle the larger volumes of data. After all, Moore's Law (named after Gordon Moore, the founder of Intel) states that the number of transistors that can be placed in a processor will double approximately every two years, for half the cost. But trends in chip design are changing to face new realities. While we can still double the number of transistors per unit area at this pace, this does not necessarily result in faster single-threaded performance. New processors such as Intel Core 2 and Itanium 2 architectures now focus on embedding many smaller CPUs or "cores" onto the same physical device. This allows multiple threads to process twice as much data in parallel, but at the same speed at which they operated previously.
Even if hundreds or thousands of CPU cores are placed on a single machine, it would not be possible to deliver input data to these cores fast enough for processing. Individual hard drives can only sustain read speeds between 60-100 MB/second. These speeds have been increasing over time, but not at the same breakneck pace as processors. Optimistically assuming the upper limit of 100 MB/second, and assuming four independent I/O channels are available to the machine, that provides 400 MB of data every second. A 4 terabyte data set would thus take over 10,000 seconds to read--about three hours just to load the data! With 100 separate machines each with two I/O channels on the job, this drops to three minutes.

The Hadoop Approach

Hadoop is designed to efficiently process large volumes of information by connecting many commodity computers together to work in parallel. The theoretical 1000-CPU machine described earlier would cost a very large amount of money, far more than 1,000 single-CPU or 250 quad-core machines. Hadoop will tie these smaller and more reasonably priced machines together into a single cost-effective compute cluster.

COMPARISON TO EXISTING TECHNIQUES

Performing computation on large volumes of data has been done before, usually in a distributed setting. What makes Hadoop unique is its simplified programming model which allows the user to quickly write and test distributed systems, and its efficient, automatic distribution of data and work across machines and in turn utilizing the underlying parallelism of the CPU cores.
Grid scheduling of computers can be done with existing systems such as Condor. But Condor does not automatically distribute data: a separate SAN must be managed in addition to the compute cluster. Furthermore, collaboration between multiple compute nodes must be managed with a communication system such as MPI. This programming model is challenging to work with and can lead to the introduction of subtle errors.

DATA DISTRIBUTION

In a Hadoop cluster, data is distributed to all the nodes of the cluster as it is being loaded in. The Hadoop Distributed File System (HDFS) will split large data files into chunks which are managed by different nodes in the cluster. In addition to this each chunk is replicated across several machines, so that a single machine failure does not result in any data being unavailable. An active monitoring system then re-replicates the data in response to system failures which can result in partial storage. Even though the file chunks are replicated and distributed across several machines, they form a single namespace, so their contents are universally accessible.
Data is conceptually record-oriented in the Hadoop programming framework. Individual input files are broken into lines or into other formats specific to the application logic. Each process running on a node in the cluster then processes a subset of these records. The Hadoop framework then schedules these processes in proximity to the location of data/records using knowledge from the distributed file system. Since files are spread across the distributed file system as chunks, each compute process running on a node operates on a subset of the data. Which data operated on by a node is chosen based on its locality to the node: most data is read from the local disk straight into the CPU, alleviating strain on network bandwidth and preventing unnecessary network transfers. This strategy of moving computation to the data, instead of moving the data to the computation allows Hadoop to achieve high data locality which in turn results in high performance.
load-into-dfs
Figure 1.1: Data is distributed across nodes at load time.

MAPREDUCE: ISOLATED PROCESSES

Hadoop limits the amount of communication which can be performed by the processes, as each individual record is processed by a task in isolation from one another. While this sounds like a major limitation at first, it makes the whole framework much more reliable. Hadoop will not run just any program and distribute it across a cluster. Programs must be written to conform to a particular programming model, named "MapReduce."
In MapReduce, records are processed in isolation by tasks called Mappers. The output from the Mappers is then brought together into a second set of tasks called Reducers, where results from different mappers can be merged together.
mapreduce-process
Figure 1.2: Mapping and reducing tasks run on nodes where individual records of data are already present.
Separate nodes in a Hadoop cluster still communicate with one another. However, in contrast to more conventional distributed systems where application developers explicitly marshal byte streams from node to node over sockets or through MPI buffers, communication in Hadoop is performed implicitly. Pieces of data can be tagged with key names which inform Hadoop how to send related bits of information to a common destination node. Hadoop internally manages all of the data transfer and cluster topology issues.
By restricting the communication between nodes, Hadoop makes the distributed system much more reliable. Individual node failures can be worked around by restarting tasks on other machines. Since user-level tasks do not communicate explicitly with one another, no messages need to be exchanged by user programs, nor do nodes need to roll back to pre-arranged checkpoints to partially restart the computation. The other workers continue to operate as though nothing went wrong, leaving the challenging aspects of partially restarting the program to the underlying Hadoop layer.

FLAT SCALABILITY

One of the major benefits of using Hadoop in contrast to other distributed systems is its flat scalability curve. Executing Hadoop on a limited amount of data on a small number of nodes may not demonstrate particularly stellar performance as the overhead involved in starting Hadoop programs is relatively high. Other parallel/distributed programming paradigms such as MPI (Message Passing Interface) may perform much better on two, four, or perhaps a dozen machines. Though the effort of coordinating work among a small number of machines may be better-performed by such systems, the price paid in performance and engineering effort (when adding more hardware as a result of increasing data volumes) increases non-linearly.
A program written in distributed frameworks other than Hadoop may require large amounts of refactoring when scaling from ten to one hundred or one thousand machines. This may involve having the program be rewritten several times; fundamental elements of its design may also put an upper bound on the scale to which the application can grow.
Hadoop, however, is specifically designed to have a very flat scalability curve. After a Hadoop program is written and functioning on ten nodes, very little--if any--work is required for that same program to run on a much larger amount of hardware. Orders of magnitude of growth can be managed with little re-work required for your applications. The underlying Hadoop platform will manage the data and hardware resources and provide dependable performance growth proportionate to the number of machines available.

The Rest of the Tutorial

This module of the tutorial has highlighted the major benefits of using a system such as Hadoop. The rest of the tutorial is designed to show you how to effectively use it.
  • In Module 2, you'll learn how the Hadoop Distributed File System (HDFS) stores vast quantities of information, how to configure HDFS, and how to use it to store and retrieve your data.
  • Module 3 shows you how to get started setting up a Hadoop environment to experiment with. It reviews how to install a Hadoop virtual machine (included in this resource CD) so that you can run Hadoop regardless of what operating system you are running.
  • Module 4 explains the Hadoop MapReduce programming model itself, and how to write some MapReduce programs.
  • Module 5 goes into further detail about the specifics of Hadoop MapReduce, and how to use advanced features for more powerful control over a program's execution.
  • Module 6 describes some other components of the Hadoop ecosystem which can add further capabilities to your distributed system.
  • Module 7 describes how to configure Hadoop clusters of different sizes. It describes what particular parameters of Hadoop need to be tuned for setting up clusters of various sizes. In addition it describes the various performance monitoring tools available in Hadoop for monitoring the health of your cluster.
  • And to expand upon the Pig section described in Module 6, a separate Pig Tutorial is included in this package at the end as Module 8.
Good luck!