国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 網站 > 建站經驗 > 正文

Hadoop streaming詳細介紹

2019-11-02 16:05:15
字體:
來源:轉載
供稿:網友

Hadoop streaming

Hadoop為MapReduce提供了不同的API,可以方便我們使用不同的編程語言來使用MapReduce框架,而不是只局限于Java。這里要介紹的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard streams作為我們mapreduce程序和MapReduce框架之間的接口。所以你可以用任何語言來編寫MapReduce程序,只要該語言可以往standard input/output上進行讀寫。

streamming是天然適用于文字處理的(text processing),當然,也僅適用純文本的處理,對于需要對象和序列化的場景,hadoop streaming無能為力。它力圖使我們能夠快捷的通過各種腳本語言,快速的處理大量的文本文件。以下是steaming的一些特點:

Map函數的輸入是通過stand input一行一行的接收數據的。(不像Java API,通過InputFormat類做預處理,使得Map函數的輸入是有Key和value的) Map函數的output則必須限定為key-value pair,key和value之間用/t分開。(MapReduce框架在處理intermediate的Map輸出時,必須做sort和partition,即shuffle) Reduce函數的input是Map函數的output也是key-value pair,key和value之間用/t分開。

常用的Streaming編程語言:

bash shell ruby python

Ruby

下面是一個Ruby編寫的MapReduce程序的示例:

map

max_temperature_map.rb:

ruby #!/usr/bin/env ruby STDIN.each_line do |line| val = line year, temp, q = val[15,4], val[87,5], val[92,1] puts "#{year}/t#{temp}" if (temp != "+9999" && q =~ /[01459]/) end 
從標準輸入讀入一行data。 處理數據之后,生成一個鍵值對,用/t分隔,輸出到標準輸出

reduce

max_temperature_reduce.rb:

ruby #!/usr/bin/env ruby last_key, max_val = nil, -1000000 STDIN.each_line do |line| key, val = line.split("/t") if last_key && last_key != key puts "#{last_key}/t#{max_val}" last_key, max_val = key, val.to_i else last_key, max_val = key, [max_val, val.to_i].max end end puts "#{last_key}/t#{max_val}" if last_key 
從標準輸入讀入一行數據 數據是用/t分隔的鍵值對 數據是被MapReduce根據key排序之后順序一行一行讀入 reduce函數對數據進行處理,并輸出,輸出仍是用/t分隔的鍵值對

運行

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar /-input input/ncdc/sample.txt /-output output /-mapper ch02/src/main/ruby/max_temperature_map.rb /-reducer ch02/src/main/ruby/max_temperature_reduce.rb
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar指明了使用hadoop streaming hadoop-*-streaming.jar會將input里的文件,一行一行的輸出到標準輸出。 用-mapper指定Map函數。類似于通過管道將數據傳給rb文件: data|ch02/src/main/ruby/max_temperature_map.rb -reducer指定Reduce函數。

Python

Map

#!/usr/bin/env pythonimport reimport sysfor line in sys.stdin:val = line.strip()(year, temp, q) = (val[15:19], val[87:92], val[92:93])if (temp != "+9999" and re.match("[01459]", q)):print "%s/t%s" % (year, temp)

Reduce

#!/usr/bin/env pythonimport sys(last_key, max_val) = (None, -sys.maxint)for line in sys.stdin:(key, val) = line.strip().split("/t")if last_key and last_key != key:print "%s/t%s" % (last_key, max_val)(last_key, max_val) = (key, int(val))else:(last_key, max_val) = (key, max(max_val, int(val)))if last_key:print "%s/t%s" % (last_key, max_val)
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 伊宁市| 旌德县| 潞城市| 潼南县| 二连浩特市| 鄱阳县| 安西县| 长沙县| 如皋市| 大城县| 铜鼓县| 五河县| 图木舒克市| 怀安县| 武清区| 毕节市| 潜山县| 丰原市| 许昌市| 鞍山市| 武鸣县| 紫阳县| 珠海市| 阿克苏市| 祁东县| 台北市| 东乌| 疏勒县| 朝阳区| 射洪县| 宜君县| 富源县| 肃北| 黄平县| 建宁县| 喀喇| 喀喇沁旗| 桑日县| 乌兰浩特市| 廉江市| 仪陇县|