Hadoop streaming details
- 2020-05-27 07:57:06
- OfStack
Hadoop streaming
Hadoop provides different API for MapReduce, making it easy to use the MapReduce framework in different programming languages instead of being limited to Java. This is Hadoop streaming API. Hadoop streaming USES standard streams of Unix as the interface between our mapreduce program and the MapReduce framework. So you can write MapReduce programs in any language, as long as the language can read and write to standard input/output.
streamming is natural for word processing (text processing) and, of course, only for plain text; hadoop streaming is useless for scenarios that require objects and serialization. It seeks to enable us to quickly process large amounts of text files in a variety of scripting languages. Here are some of the features of steaming:
The input to the Map function receives data from line 1 of stand input1. (unlike Java API, InputFormat is preprocessed so that the input of Map function has Key and value.) The output of the Map function must be limited to key-value pair, and the key and value are separated by \t. (the MapReduce framework must do sort and partition, that is, shuffle, when handling the Map output of intermediate.) input of Reduce function is output of Map function and key-value pair, key and value are separated by \t.Common Streaming programming languages:
bash shell ruby pythonRuby
Here is an example of an MapReduce program written by Ruby:
map
max_temperature_map. rb:
ruby
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
Read 1 line data from standard input.
After processing the data, generate 1 key-value pair, separated by \t, and output to standard output
reduce
max_temperature_reduce. rb:
ruby
#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
Read 1 row of data from standard input
The data is a key-value pair separated by \t
The data is read 1 row 1 by MapReduce sorted by key
The reduce function processes the data and outputs it. The output is still a key-value pair separated by \t
run
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.rb \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb
hadoop jar $HADOOP_INSTALL/contrib streaming/hadoop - * - streaming. jar pointed out the use of hadoop streaming
hadoop-* -streaming.jar will output the files in input, line by line, to standard output.
Specify the Map function with -mapper. Similar to the data through a pipeline to rb file: data | ch02 / src/main/ruby/max_temperature_map rb
-reducer specifies the Reduce function.
Python
Map
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)
Reduce
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)
run
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.py\
-reducer ch02/src/main/ruby/max_temperature_reduce.py
Bash shell
Map
#!/usr/bin/env bash
# NLineInputFormat gives a single line: key is offset, value is S3 URI
read offset s3file
# Retrieve file from S3 to local disk
echo "reporter:status:Retrieving $s3file" >&2
$HADOOP_INSTALL/bin/hadoop fs -get $s3file .
# Un-bzip and un-tar the local file
target=`basename $s3file .tar.bz2`
mkdir -p $target
echo "reporter:status:Un-tarring $s3file to $target" >&2
tar jxf `basename $s3file` -C $target
# Un-gzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*/*
do
gunzip -c $file >> $target.all
echo "reporter:status:Processed $file" >&2
done
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz
run
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-D mapred.reduce.tasks=0 \
-D mapred.map.tasks.speculative.execution=false \
-D mapred.task.timeout=12000000 \
-input ncdc_files.txt \
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
-output output \
-mapper load_ncdc_map.sh \
-file load_ncdc_map.sh
Here, -D mapred.reduce.tasks =0 looks at reduce task, so there is no need to set -reducer
Using Mapper only, we can use MapReduce to complete some shell scripts which can only be serialized in parallel
Notice that -file here, when you run in parallel in cluster mode, you need -file to transfer files to other nodes
Combiner
In streaming mode, Combiner can still be run in two ways:
Write one combiner function using Java and use -combiner option Complete combiner's tasks in command line pipeline modeThe second method is explained here:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/all \
-output output \
-mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |
ch02/src/main/ruby/max_temperature_reduce.rb" \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb \
-file ch02/src/main/ruby/max_temperature_map.rb \
-file ch02/src/main/ruby/max_temperature_reduce.rb
Note the -mapper line, the way the pipeline is cleared, takes the temporary output file of mapper (intermediate file, Map temporary file when Map is finished) as input, sends it to sort for sorting, and then to the reduce script to do something similar to combiner. This is when the output is actually grouped as input to shuffle and sent over the network to Reduce
Thank you for reading, I hope to help you, thank you for your support of this site!