Hadoop streaming details

  • 2020-05-27 07:57:06
  • OfStack

Hadoop streaming

Hadoop provides different API for MapReduce, making it easy to use the MapReduce framework in different programming languages instead of being limited to Java. This is Hadoop streaming API. Hadoop streaming USES standard streams of Unix as the interface between our mapreduce program and the MapReduce framework. So you can write MapReduce programs in any language, as long as the language can read and write to standard input/output.

streamming is natural for word processing (text processing) and, of course, only for plain text; hadoop streaming is useless for scenarios that require objects and serialization. It seeks to enable us to quickly process large amounts of text files in a variety of scripting languages. Here are some of the features of steaming:

The input to the Map function receives data from line 1 of stand input1. (unlike Java API, InputFormat is preprocessed so that the input of Map function has Key and value.) The output of the Map function must be limited to key-value pair, and the key and value are separated by \t. (the MapReduce framework must do sort and partition, that is, shuffle, when handling the Map output of intermediate.) input of Reduce function is output of Map function and key-value pair, key and value are separated by \t.

Common Streaming programming languages:

bash shell ruby python


Here is an example of an MapReduce program written by Ruby:


max_temperature_map. rb:

#!/usr/bin/env ruby 
STDIN.each_line do |line| 
val = line 
year, temp, q = val[15,4], val[87,5], val[92,1] 
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/) 
Read 1 line data from standard input. After processing the data, generate 1 key-value pair, separated by \t, and output to standard output


max_temperature_reduce. rb:

#!/usr/bin/env ruby 
last_key, max_val = nil, -1000000 
STDIN.each_line do |line| 
key, val = line.split("\t") 
if last_key && last_key != key 
puts "#{last_key}\t#{max_val}" 
last_key, max_val = key, val.to_i 
last_key, max_val = key, [max_val, val.to_i].max 
puts "#{last_key}\t#{max_val}" if last_key 
Read 1 row of data from standard input The data is a key-value pair separated by \t The data is read 1 row 1 by MapReduce sorted by key The reduce function processes the data and outputs it. The output is still a key-value pair separated by \t


% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.rb \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb
hadoop jar $HADOOP_INSTALL/contrib streaming/hadoop - * - streaming. jar pointed out the use of hadoop streaming hadoop-* -streaming.jar will output the files in input, line by line, to standard output. Specify the Map function with -mapper. Similar to the data through a pipeline to rb file: data | ch02 / src/main/ruby/max_temperature_map rb -reducer specifies the Reduce function.



#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)


#!/usr/bin/env python
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)


% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.py\
-reducer ch02/src/main/ruby/max_temperature_reduce.py

Bash shell


#!/usr/bin/env bash
# NLineInputFormat gives a single line: key is offset, value is S3 URI
read offset s3file
# Retrieve file from S3 to local disk
echo "reporter:status:Retrieving $s3file" >&2
$HADOOP_INSTALL/bin/hadoop fs -get $s3file .
# Un-bzip and un-tar the local file
target=`basename $s3file .tar.bz2`
mkdir -p $target
echo "reporter:status:Un-tarring $s3file to $target" >&2
tar jxf `basename $s3file` -C $target
# Un-gzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*/*
gunzip -c $file >> $target.all
echo "reporter:status:Processed $file" >&2
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz


% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-D mapred.reduce.tasks=0 \
-D mapred.map.tasks.speculative.execution=false \
-D mapred.task.timeout=12000000 \
-input ncdc_files.txt \
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
-output output \
-mapper load_ncdc_map.sh \
-file load_ncdc_map.sh
Here, -D mapred.reduce.tasks =0 looks at reduce task, so there is no need to set -reducer Using Mapper only, we can use MapReduce to complete some shell scripts which can only be serialized in parallel Notice that -file here, when you run in parallel in cluster mode, you need -file to transfer files to other nodes


In streaming mode, Combiner can still be run in two ways:

Write one combiner function using Java and use -combiner option Complete combiner's tasks in command line pipeline mode

The second method is explained here:

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/all \
-output output \
-mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |
ch02/src/main/ruby/max_temperature_reduce.rb" \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb \
-file ch02/src/main/ruby/max_temperature_map.rb \
-file ch02/src/main/ruby/max_temperature_reduce.rb

Note the -mapper line, the way the pipeline is cleared, takes the temporary output file of mapper (intermediate file, Map temporary file when Map is finished) as input, sends it to sort for sorting, and then to the reduce script to do something similar to combiner. This is when the output is actually grouped as input to shuffle and sent over the network to Reduce

Thank you for reading, I hope to help you, thank you for your support of this site!

Related articles: