国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發(fā)設計 > 正文

Hadoop那些事兒(四)---MapReduce編程實例(基礎)

2019-11-08 03:21:43
字體:
供稿:網(wǎng)友

前言

上一篇文章,以WordCount為例講了一下MaPReduce的代碼結(jié)構(gòu)及運行機制,這篇文章將通過幾個簡單的例子進一步認識MapReduce。

1.數(shù)據(jù)檢索

問題描述

假設有很多條數(shù)據(jù),我們從中查找包含某個字符串的語句。

解決方案

這個問題比較簡單,首先在Map中獲取當前讀取的文件的文件名作為key,將要解析的數(shù)據(jù)按句號分割,逐句判斷,如果包含指定的字符串則作為value輸出。在Reduce中對屬于同一文件的語句進行合并,然后輸出。

測試數(shù)據(jù)

輸入: in1.txt:

潯陽江頭夜送客,楓葉荻花秋瑟瑟。主人下馬客在船,舉酒欲飲無管弦。醉不成歡慘將別,別時茫茫江浸月。忽聞水上琵琶聲,主人忘歸客不發(fā)。尋聲暗問彈者誰?琵琶聲停欲語遲。移船相近邀相見,添酒回燈重開宴。千呼萬喚始出來,猶抱琵琶半遮面。轉(zhuǎn)軸撥弦三兩聲,未成曲調(diào)先有情。弦弦掩抑聲聲思,似訴平生不得志。低眉信手續(xù)續(xù)彈,說盡心中無限事。輕攏慢捻抹復挑,初為霓裳后六幺。大弦嘈嘈如急雨,小弦切切如私語。嘈嘈切切錯雜彈,大珠小珠落玉盤。間關(guān)鶯語花底滑,幽咽泉流冰下難。冰泉冷澀弦凝絕,凝絕不通聲暫歇。別有幽愁暗恨生,此時無聲勝有聲。銀瓶乍破水漿迸,鐵騎突出刀槍鳴。曲終收撥當心畫,四弦一聲如裂帛。東船西舫悄無言,唯見江心秋月白。沉吟放撥插弦中,整頓衣裳起斂容。自言本是京城女,家在蝦蟆陵下住。十三學得琵琶成,名屬教坊第一部。曲罷曾教善才服,妝成每被秋娘妒。五陵年少爭纏頭,一曲紅綃不知數(shù)。鈿頭銀篦擊節(jié)碎,血色羅裙翻酒污。今年歡笑復明年,秋月春風等閑度。弟走從軍阿姨死,暮去朝來顏色故。門前冷落鞍馬稀,老大嫁作商人婦。商人重利輕別離,前月浮梁買茶去。去來江口守空船,繞船月明江水寒。夜深忽夢少年事,夢啼妝淚紅闌干。我聞琵琶已嘆息,又聞此語重唧唧。同是天涯淪落人,相逢何必曾相識!我從去年辭帝京,謫居臥病潯陽城。潯陽地僻無音樂,終歲不聞絲竹聲。住近湓江地低濕,黃蘆苦竹繞宅生。其間旦暮聞何物?杜鵑啼血猿哀鳴。春江花朝秋月夜,往往取酒還獨傾。豈無山歌與村笛?嘔啞嘲哳難為聽。今夜聞君琵琶語,如聽仙樂耳暫明。莫辭更坐彈一曲,為君翻作《琵琶行》。感我此言良久立,卻坐促弦弦轉(zhuǎn)急。凄凄不似向前聲,滿座重聞皆掩泣。座中泣下誰最多?江州司馬青衫濕。

in2.txt:

漢皇重色思傾國,御宇多年求不得。楊家有女初長成,養(yǎng)在深閨人未識。天生麗質(zhì)難自棄,一朝選在君王側(cè)?;仨恍Π倜纳?,六宮粉黛無顏色。春寒賜浴華清池,溫泉水滑洗凝脂。侍兒扶起嬌無力,始是新承恩澤時。云鬢花顏金步搖,芙蓉帳暖度春宵。春宵苦短日高起,從此君王不早朝。承歡侍宴無閑暇,春從春游夜專夜。后宮佳麗三千人,三千寵愛在一身。金屋妝成嬌侍夜,玉樓宴罷醉和春。姊妹弟兄皆列土,可憐光彩生門戶。遂令天下父母心,不重生男重生女。驪宮高處入青云,仙樂風飄處處聞。緩歌謾舞凝絲竹,盡日君王看不足。漁陽鼙鼓動地來,驚破霓裳羽衣曲。九重城闕煙塵生,千乘萬騎西南行。翠華搖搖行復止,西出都門百余里。六軍不發(fā)無奈何,宛轉(zhuǎn)蛾眉馬前死?;ㄢ毼責o人收,翠翹金雀玉搔頭。君王掩面救不得,回看血淚相和流。黃埃散漫風蕭索,云??M紆登劍閣。峨嵋山下少人行,旌旗無光日色薄。蜀江水碧蜀山青,圣主朝朝暮暮情。行宮見月傷心色,夜雨聞鈴腸斷聲。天旋地轉(zhuǎn)回龍馭,到此躊躇不能去。馬嵬坡下泥土中,不見玉顏空死處。君臣相顧盡沾衣,東望都門信馬歸。歸來池苑皆依舊,太液芙蓉未央柳。芙蓉如面柳如眉,對此如何不淚垂。春風桃李花開日,秋雨梧桐葉落時。西宮南內(nèi)多秋草,落葉滿階紅不掃。梨園弟子白發(fā)新,椒房阿監(jiān)青娥老。夕殿螢飛思悄然,孤燈挑盡未成眠。遲遲鐘鼓初長夜,耿耿星河欲曙天。鴛鴦瓦冷霜華重,翡翠衾寒誰與共。悠悠生死別經(jīng)年,魂魄不曾來入夢。臨邛道士鴻都客,能以精誠致魂魄。為感君王輾轉(zhuǎn)思,遂教方士殷勤覓。排空馭氣奔如電,升天入地求之遍。上窮碧落下黃泉,兩處茫茫皆不見。忽聞海上有仙山,山在虛無縹渺間。樓閣玲瓏五云起,其中綽約多仙子。中有一人字太真,雪膚花貌參差是。金闕西廂叩玉扃,轉(zhuǎn)教小玉報雙成。聞道漢家天子使,九華帳里夢魂驚。攬衣推枕起徘徊,珠箔銀屏迤邐開。云鬢半偏新睡覺,花冠不整下堂來。風吹仙袂飄飄舉,猶似霓裳羽衣舞。玉容寂寞淚闌干,梨花一枝春帶雨。含情凝睇謝君王,一別音容兩渺茫。昭陽殿里恩愛絕,蓬萊宮中日月長?;仡^下望人寰處,不見長安見塵霧。惟將舊物表深情,鈿合金釵寄將去。釵留一股合一扇,釵擘黃金合分鈿。但教心似金鈿堅,天上人間會相見。臨別殷勤重寄詞,詞中有誓兩心知。七月七日長生殿,夜半無人私語時。在天愿作比翼鳥,在地愿為連理枝。天長地久有時盡,此恨綿綿無絕期。

in3.txt:

春江潮水連海平,海上明月共潮生。滟滟隨波千萬里,何處春江無月明!江流宛轉(zhuǎn)繞芳甸,月照花林皆似霰;空里流霜不覺飛,汀上白沙看不見。江天一色無纖塵,皎皎空中孤月輪。江畔何人初見月?江月何年初照人?人生代代無窮已,江月年年只相似。不知江月待何人,但見長江送流水。白云一片去悠悠,青楓浦上不勝愁。誰家今夜扁舟子?何處相思明月樓?可憐樓上月徘徊,應照離人妝鏡臺。玉戶簾中卷不去,搗衣砧上拂還來。此時相望不相聞,愿逐月華流照君。鴻雁長飛光不度,魚龍潛躍水成文。昨夜閑潭夢落花,可憐春半不還家。江水流春去欲盡,江潭落月復西斜。斜月沉沉藏海霧,碣石瀟湘無限路。不知乘月幾人歸,落月?lián)u情滿江樹。

預期結(jié)果:

in1.txt 春江花朝秋月夜,往往取酒還獨傾---|---去來江口守空船,繞船月明江水寒---|---商人重利輕別離,前月浮梁買茶去---|---今年歡笑復明年,秋月春風等閑度---|---東船西舫悄無言,唯見江心秋月白---|---醉不成歡慘將別,別時茫茫江浸月---|---in2.txt 七月七日長生殿,夜半無人私語時---|---昭陽殿里恩愛絕,蓬萊宮中日月長---|---行宮見月傷心色,夜雨聞鈴腸斷聲---|---in3.txt 不知乘月幾人歸,落月?lián)u情滿江樹---|---斜月沉沉藏海霧,碣石瀟湘無限路---|---江水流春去欲盡,江潭落月復西斜---|---此時相望不相聞,愿逐月華流照君---|---可憐樓上月徘徊,應照離人妝鏡臺---|---誰家今夜扁舟子?何處相思明月樓?---|---不知江月待何人,但見長江送流水---|---人生代代無窮已,江月年年只相似---|---江畔何人初見月?江月何年初照人?---|---江天一色無纖塵,皎皎空中孤月輪---|---江流宛轉(zhuǎn)繞芳甸,月照花林皆似霰;---|---滟滟隨波千萬里,何處春江無月明!---|---春江潮水連海平,海上明月共潮生---|---

以上例子是檢索文件中包含“”字的詩句。

看圖說話

通過下面的圖來看具體的流程: 這里寫圖片描述

代碼

package train;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Mapper.Context;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import train.InvertedIndex.Combine;import train.InvertedIndex.Map;import train.InvertedIndex.Reduce;/** * 查找包含指定字符串的句子 * @author hadoop * */public class Search { public static class Map extends Mapper<Object,Text,Text,Text>{ private static final String word = "月"; private FileSplit fileSplit; public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ fileSplit = (FileSplit)context.getInputSplit(); String fileName = fileSplit.getPath().getName().toString(); //按句號分割 StringTokenizer st = new StringTokenizer(value.toString(),"。"); while(st.hasMoreTokens()){ String line = st.nextToken().toString(); if(line.indexOf(word)>=0){ context.write(new Text(fileName),new Text(line)); } } } } public static class Reduce extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ String lines = ""; for(Text value:values){ lines += value.toString()+"---|---"; } context.write(key, new Text(lines)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "localhost:9001"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/search_in","hdfs://localhost:9000/user/hadoop/output/search_out"}; //檢查運行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage search <int> <out>"); System.exit(2); } //配置作業(yè)名 Job job = new Job(conf,"search"); //配置作業(yè)各個類 job.setJarByClass(InvertedIndex.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

在map中,通過context.getInputSplit()獲取到數(shù)據(jù)所在的文件,然后將讀取的數(shù)據(jù)按句號分隔,并遍歷,如果包含指定字符“月”,則將文件名作為key,該句作value寫出。

在reduce中是一個簡單的合并的過程。

2.最大值 最小值 平均數(shù)

問題描述

給定一批數(shù)字,獲取其中的最大值 最小值 以及求得平均數(shù)

解決方案

這個問題也很簡單,首先在map中讀取數(shù)據(jù)并進行切割,定義一個遞增的數(shù)字作key,切下來的數(shù)字作為value.在reduce中遍歷value,計算數(shù)量并求和同時比較大小獲取最大最小值,最后求其平均數(shù)

測試數(shù)據(jù)

輸入

in1.txt

1 1 1 1 1 1 1 1 1 1 5 5 5 5 5 5 5 5 5 5

in2.txt

5 8 10 17 328 9 13 32 21

預期結(jié)果

平均數(shù) 11最大值 32最小值 1

看圖說話

這里寫圖片描述

代碼

package train;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import test.WordCount;/** * 計算平均數(shù) * @author hadoop * */public class Average1 { public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{ private static IntWritable no = new IntWritable(1); //計數(shù)作為key private Text number = new Text(); //存儲切下的數(shù)字 public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ number.set(st.nextToken()); context.write(no, new IntWritable(Integer.parseInt(number.toString()))); } } } public static class Reduce extends Reducer<IntWritable,IntWritable,Text,IntWritable>{ //定義全局變量 int count = 0; //數(shù)字的數(shù)量 int sum = 0; //數(shù)字的總和 int max = -2147483648; int min = 2147483647; public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ for(IntWritable val:values){ if(val.get()>max){ max = val.get(); } if(val.get()<min){ min = val.get(); } count++; sum+=val.get(); } int average = (int)sum/count; //計算平均數(shù) //System.out.println(sum+"--"+count+"--"+average); context.write(new Text("平均數(shù)"), new IntWritable(average)); context.write(new Text("最大值"), new IntWritable(max)); context.write(new Text("最小值"), new IntWritable(min)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf = new Configuration(); //conf.set("mapred.job.tracker", "localhost:9001"); conf.addResource("config.xml"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/average1_in","hdfs://localhost:9000/user/hadoop/output/average1_out"}; //檢查運行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage WordCount <int> <out>"); System.exit(2); } //配置作業(yè)名 Job job = new Job(conf,"average1 "); //配置作業(yè)各個類 job.setJarByClass(Average1.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //Mapper的輸出類型 *強調(diào)內(nèi)容* job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

3.平均成績

問題描述

給定三個輸入文件,每個文件中分別寫有多個學生的數(shù)學 英語 語文成績,求每個學生三科的平均成績。

解決方案

這個問題同樣很簡單,在map中解析數(shù)據(jù)并以學生名字作為key,成績作為value輸出。

測試數(shù)據(jù)

輸入:

in1.txt

張三 80李四 83王五 91趙六 88

in2.txt

張三 92李四 100王五 94趙六 88

in3.txt

張三 89李四 98王五 84趙六 93

預期結(jié)果

張三 87李四 93王五 89趙六 89

看圖說話

這里寫圖片描述

代碼

package train;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.Reducer.Context;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import train.Average1.Map;import train.Average1.Reduce;/** * 計算每個學生的平均成績 * @author hadoop * */public class Average2 { public static class Map extends Mapper<Object,Text,Text,IntWritable>{ public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ //按行分割數(shù)據(jù) StringTokenizer st = new StringTokenizer(value.toString(),"/n"); while(st.hasMoreTokens()){ //按空格分割每行數(shù)據(jù) StringTokenizer stl = new StringTokenizer(st.nextToken()); String name = stl.nextToken(); String score = stl.nextToken(); //名字 分數(shù) context.write(new Text(name), new IntWritable(Integer.parseInt(score))); } } } public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{ public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int count = 0; //數(shù)量 int sum = 0; //總和 for(IntWritable val:values){ count++; sum+=val.get(); } int average = (int)sum/count; //計算平均數(shù) System.out.println(sum+"--"+count+"--"+average); context.write(key, new IntWritable(average)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf = new Configuration(); //conf.set("mapred.job.tracker", "localhost:9001"); conf.addResource("config.xml"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/average2_in","hdfs://localhost:9000/user/hadoop/output/average2_out"}; //檢查運行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage WordCount <int> <out>"); System.exit(2); } //配置作業(yè)名 Job job = new Job(conf,"average1 "); //配置作業(yè)各個類 job.setJarByClass(Average2.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //Mapper的輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

4.數(shù)據(jù)去重

問題描述

給定幾組數(shù)據(jù),對數(shù)據(jù)進行去重操作并輸出

解決方案

在shuffing洗牌階段時,會按照key進行歸類,所以數(shù)據(jù)到達reduce方法時,key值是唯一的,只要將從文件中讀取的數(shù)據(jù)作為key值輸出即可,而value值置空即可。

測試數(shù)據(jù)

輸入 in1.txt

Etoak-001Etoak-002Etoak-003Etoak-002Etoak-004Etoak-005Etoak-006Etoak-001Etoak-007Etoak-008

in2.txt

Etoak-009Etoak-010Etoak-011Etoak-012Etoak-013Etoak-009Etoak-014Etoak-015Etoak-011Etoak-016

預期結(jié)果:

Etoak-001 Etoak-002 Etoak-003 Etoak-004 Etoak-005 Etoak-006 Etoak-007 Etoak-008 Etoak-009 Etoak-010 Etoak-011 Etoak-012 Etoak-013 Etoak-014 Etoak-015 Etoak-016

看圖說話

這里寫圖片描述

代碼

package train;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import test.WordCount;/** * 數(shù)據(jù)去重 * @author hadoop * */public class Duplicate { //輸出鍵Text,輸出值為Text public static class Map extends Mapper<Object,Text,Text,Text>{ //在Map中直接將從文件中接收到的數(shù)據(jù)的value作為key寫到輸出中,value為空即可 public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ context.write(value, new Text("")); } } //上面map的階段的結(jié)果經(jīng)過shuffle洗牌后將傳遞給reduce //在reduce階段,直接將獲取到的數(shù)據(jù)的key作為輸出key,value置空 public static class Reduce extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ context.write(key, new Text("")); System.out.println(key); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "localhost:9001"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/duplicate_in","hdfs://localhost:9000/user/hadoop/output/duplicate_out"}; //檢查運行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage Duplicate <int> <out>"); System.exit(2); } //配置作業(yè)名 Job job = new Job(conf,"duplicate"); //配置作業(yè)各個類 job.setJarByClass(Duplicate.class); job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

5.排序

問題描述

將給定的一組數(shù)據(jù)按升序進行排序,并給出每個數(shù)字的次序

解決方案

使用mapreduce默認的排序規(guī)則,對于Intwritable類型的數(shù)據(jù)按照key值大小進行排序

測試數(shù)據(jù)

輸入: in1.txt:

901499915889

in2.txt:

6554322110

in3.txt:

109218

預期結(jié)果:

1 01 02 13 84 94 94 95 106 147 158 218 219 3210 5411 6512 8813 999

看圖說話

這里寫圖片描述

代碼

package train;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import train.Duplicate.Map;import train.Duplicate.Reduce;/** * 升序排序(使用mapreduce提供的默認排序規(guī)則) * 對于IntWritable類型的數(shù)據(jù),按key值大小進行排序 * @author hadoop * */public class Sort { //將輸入數(shù)據(jù)的value裝換為int類型并作為key輸出 public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{ private static IntWritable numble = new IntWritable(); private static final IntWritable one = new IntWritable(1); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line = value.toString(); numble.set(Integer.parseInt(line)); context.write(numble, one); } } //全局num確定每個數(shù)字的順序位次 //遍歷values來確定每個數(shù)字輸出的次數(shù) public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{ private static IntWritable num = new IntWritable(1); public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ //System.out.println(key+" "+num); for(IntWritable value:values){ context.write(num, key); System.out.println(key+"--"+value+"--"+num); } num = new IntWritable(num.get()+1); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "localhost:9001"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/sort_in","hdfs://localhost:9000/user/hadoop/output/sort_out"}; //檢查運行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage Sort <int> <out>"); System.exit(2); } //配置作業(yè)名 Job job = new Job(conf,"sort"); //配置作業(yè)各個類 job.setJarByClass(Sort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

注意,這個代碼不需要設置combin,否則結(jié)果會不一致,因為會多一次合并

6.倒排索引

問題描述

有多條數(shù)據(jù),對數(shù)據(jù)按照屬性值進行分組,比如對于多條語句,按所含的單詞進行分組

測試數(shù)據(jù)

輸入: in1.txt

Life is brief , and then you die, you know ?

in2.txt:

Innovation distinguishes between a leader and a follower

in3.txt

We're here to put a dent in the universe . Otherwise why else even be here ?

預期結(jié)果:

, in1.txt:1;. in3.txt:1;? in3.txt:1;Innovation in2.txt:1;Life in1.txt:1;Otherwise in3.txt:1;We're in3.txt:1;a in3.txt:1;in2.txt:2;and in2.txt:1;in1.txt:1;be in3.txt:1;between in2.txt:1;brief in1.txt:1;dent in3.txt:1;die, in1.txt:1;distinguishes in2.txt:1;else in3.txt:1;even in3.txt:1;follower in2.txt:1;here in3.txt:2;in in3.txt:1;is in1.txt:1;know in1.txt:1;leader in2.txt:1;put in3.txt:1;the in3.txt:1;then in1.txt:1;to in3.txt:1;universe in3.txt:1;why in3.txt:1;you in1.txt:2;? in1.txt:1;

看圖說話

這里寫圖片描述

代碼

package train;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;/** * 倒排索引 * @author hadoop * */public class InvertedIndex { //輸出值:key為單詞+文件地址 value為頻數(shù),均指定1 public static class Map extends Mapper<Object,Text,Text,Text>{ private Text keyStr = new Text(); private Text valueStr = new Text(); private FileSplit fileSplit; public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ //獲取輸入文件信息 fileSplit = (FileSplit)context.getInputSplit(); //按空格切割 StringTokenizer st = new StringTokenizer(value.toString().trim()); while(st.hasMoreTokens()){ String filePath = fileSplit.getPath().getName().toString(); keyStr.set(st.nextToken()+":"+filePath); valueStr.set("1"); context.write(keyStr,valueStr); } } } //合并頻數(shù) //輸出:key為單詞 value為文件地址+頻數(shù) public static class Combine extends Reducer<Text,Text,Text,Text>{ private Text newValue = new Text(); public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ int sum = 0; //合并頻數(shù) for(Text value:values){ sum += Integer.parseInt(value.toString()); } //拆分原有key,將單詞作為新key,文件地址+頻數(shù) 作為value int index = key.toString().indexOf(":"); String word = key.toString().substring(0,index); String filePath = key.toString().substring(index+1,key.toString().length()); key.set(word); newValue.set(filePath+":"+sum); context.write(key,newValue); } } //將每個單詞對應的多個文件及頻數(shù)整合到一行 public static class Reduce extends Reducer<Text,Text,Text,Text>{ Text newValue = new Text(); public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ String files = ""; for(Text value:values){ files += value+";"; } newValue.set(files); context.write(key,newValue); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "localhost:9001"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/invertedIndex_in","hdfs://localhost:9000/user/hadoop/output/invertedIndex_out"}; //檢查運行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage invertedIndex <int> <out>"); System.exit(2); } //配置作業(yè)名 Job job = new Job(conf,"invertedIndex"); //配置作業(yè)各個類 job.setJarByClass(InvertedIndex.class); job.setMapperClass(Map.class); job.setCombinerClass(Combine.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 阿拉善左旗| 江山市| 公安县| 昔阳县| 赤水市| 集安市| 永宁县| 昭觉县| 贵港市| 惠安县| 崇州市| 同江市| 临安市| 泰安市| 浦北县| 海南省| 双桥区| 舞阳县| 鄂州市| 闽侯县| 澎湖县| 洞头县| 会泽县| 奎屯市| 浮梁县| 莒南县| 塘沽区| 阳西县| 苏尼特左旗| 班玛县| 涿州市| 海淀区| 怀远县| 黑河市| 贡觉县| 英山县| 叙永县| 桐梓县| 梧州市| 定安县| 上蔡县|