Thursday 4 July 2013

Hadoop Streaming API Training from Big Data Hadoop Experts

Hadoop Streaming

Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer. For example:
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc

How Does Streaming Work

In the above example, both the mapper and the reducer are executables that read the input from stdin (line by line) and emit the output to stdout. The utility will create a Map/Reduce job, submit the job to an appropriate cluster, and monitor the progress of the job until it completes.
When an executable is specified for mappers, each mapper task will launch the executable as a separate process when the mapper is initialized. As the mapper task runs, it converts its inputs into lines and feed the lines to the stdin of the process. In the meantime, the mapper collects the line oriented outputs from the stdout of the process and converts each line into a key/value pair, which is collected as the output of the mapper. By default, the prefix of a line up to the first tab character is the key and the the rest of the line (excluding the tab character) will be the value. If there is no tab character in the line, then entire line is considered as key and the value is null. However, this can be customized, as discussed later.
When an executable is specified for reducers, each reducer task will launch the executable as a separate process then the reducer is initialized. As the reducer task runs, it converts its input key/values pairs into lines and feeds the lines to the stdin of the process. In the meantime, the reducer collects the line oriented outputs from the stdout of the process, converts each line into a key/value pair, which is collected as the output of the reducer. By default, the prefix of a line up to the first tab character is the key and the the rest of the line (excluding the tab character) is the value. However, this can be customized, as discussed later.
This is the basis for the communication protocol between the Map/Reduce framework and the streaming mapper/reducer.
You can supply a Java class as the mapper and/or the reducer. The above example is equivalent to:
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer /bin/wc
User can specify stream.non.zero.exit.is.failure as true or false to make a streaming task that exits with a non-zero status to be Failure or Successrespectively. By default, streaming tasks exiting with non-zero status are considered to be failed tasks.

Package Files With Job Submissions

You can specify any executable as the mapper and/or the reducer. The executables do not need to pre-exist on the machines in the cluster; however, if they don't, you will need to use "-file" option to tell the framework to pack your executable files as a part of job submission. For example:
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper myPythonScript.py \
    -reducer /bin/wc \
    -file myPythonScript.py 
The above example specifies a user defined Python executable as the mapper. The option "-file myPythonScript.py" causes the python executable shipped to the cluster machines as a part of job submission.
In addition to executable files, you can also package other auxiliary files (such as dictionaries, configuration files, etc) that may be used by the mapper and/or the reducer. For example:
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper myPythonScript.py \
    -reducer /bin/wc \
    -file myPythonScript.py \
    -file myDictionary.txt

Streaming Options and Usage


Mapper-Only Jobs

Often, you may want to process input data using a map function only. To do this, simply set mapred.reduce.tasks to zero. The Map/Reduce framework will not create any reducer tasks. Rather, the outputs of the mapper tasks will be the final output of the job.
To be backward compatible, Hadoop Streaming also supports the "-reduce NONE" option, which is equivalent to "-D mapred.reduce.tasks=0".

Specifying Other Plugins for Jobs

Just as with a normal Map/Reduce job, you can specify other plugins for a streaming job:
   -inputformat JavaClassName
   -outputformat JavaClassName
   -partitioner JavaClassName
   -combiner JavaClassName
The class you supply for the input format should return key/value pairs of Text class. If you do not specify an input format class, the TextInputFormat is used as the default. Since the TextInputFormat returns keys of LongWritable class, which are actually not part of the input data, the keys will be discarded; only the values will be piped to the streaming mapper.
The class you supply for the output format is expected to take key/value pairs of Text class. If you do not specify an output format class, the TextOutputFormat is used as the default.



Large files and archives in Hadoop Streaming

The -files and -archives options allow you to make files and archives available to the tasks. The argument is a URI to the file or archive that you have already uploaded to HDFS. These files and archives are cached across jobs. You can retrieve the host and fs_port values from the fs.default.name config variable.
Here are examples of the -files option:
-files hdfs://host:fs_port/user/testfile.txt#testlink
In the above example, the part of the url after # is used as the symlink name that is created in the current working directory of tasks. So the tasks will have a symlink called testlink in the cwd that points to a local copy of testfile.txt. Multiple entries can be specified as:
-files hdfs://host:fs_port/user/testfile1.txt#testlink1 -files
                hdfs://host:fs_port/user/testfile2.txt#testlink2
The -archives option allows you to copy jars locally to the cwd of tasks and automatically unjar the files. For example:
-archives hdfs://host:fs_port/user/testfile.jar#testlink3
In the example above, a symlink testlink3 is created in the current working directory of tasks. This symlink points to the directory that stores the unjarred contents of the uploaded jar file.
Here's another example of the -archives option. Here, the input.txt file has two lines specifying the names of the two files: testlink/cache.txt and testlink/cache2.txt. "testlink" is a symlink to the archived directory, which has the files "cache.txt" and "cache2.txt".
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
                  -input "/user/me/samples/cachefile/input.txt"  \
                  -mapper "xargs cat"  \
                  -reducer "cat"  \
                  -output "/user/me/samples/cachefile/out" \  
                  -archives 'hdfs://hadoop-nn1.example.com/user/me/samples/
                                              cachefile/cachedir.jar#testlink' \  
                  -D mapred.map.tasks=1 \
                  -D mapred.reduce.tasks=1 \ 
                  -D mapred.job.name="Experiment"

$ ls test_jar/
cache.txt  cache2.txt

$ jar cvf cachedir.jar -C test_jar/ .
added manifest
adding: cache.txt(in = 30) (out= 29)(deflated 3%)
adding: cache2.txt(in = 37) (out= 35)(deflated 5%)

$ hadoop dfs -put cachedir.jar samples/cachefile

$ hadoop dfs -cat /user/me/samples/cachefile/input.txt
testlink/cache.txt
testlink/cache2.txt

$ cat test_jar/cache.txt 
This is just the cache string

$ cat test_jar/cache2.txt 
This is just the second cache string

$ hadoop dfs -ls /user/me/samples/cachefile/out      
Found 1 items
/user/me/samples/cachefile/out/part-00000  <r 3>   69

$ hadoop dfs -cat /user/me/samples/cachefile/out/part-00000
This is just the cache string   
This is just the second cache string


Specifying Additional Configuration Variables for Jobs

You can specify additional configuration variables by using "-D <n>=<v>". For example:
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper\
    -reducer /bin/wc \
    -D mapred.reduce.tasks=2
The -D mapred.reduce.tasks=2 in the above example specifies to use two reducers for the job.

Big Data to create a new boom in job market

 
The 'Big Data' industry - the ability to access , analyze and use humungous 


volumes of data through    specific technology - will require a whole new army of 

 data workers globally. India itself will require a minimum of 1,00,000 data 

scientists in the next couple of years, in addition to scores of data managers and 

data analysts , to support the fast emerging Big Data space.

 
 The exponentially decreasing costs of data storage combined with the soaring 


volume of data being captured presents challenges and opportunities to those who 

work in the new frontiers of data science. Businesses, government agencies, and 

scientists leveraging data-based decisions are more successful than those relying 


on decades of trial-and-error. But taming and harnessing big data can be a 

herculean undertaking. The data must be collected, processed and distilled, 

analyzed, and presented in a manner humans can understand. Because there are no 

degrees in data science, data scientists must grow into their roles. If you are 

looking for resources to help you better understand big data and analytics, We have

 the knowledge and experience needed to help make your systems contribute to 

the success of your business. Form a tandem with us and take advantage of our 

capacity to manage, process and analyze big data effectively, quickly and 

economically.

BigDataTraining.IN has a strong focus and established thought leadership in the 


area of Big Data and Analytics. We use a global delivery model to help you to 

 evaluate and implement solutions tailored to your specific technical and business

 context.
Contact us:
#67,2nd Floor, 1st Main Road, Gandhi Nagar, Adyar, Chennai- 600020

No comments: