筆記中提供了大量的代碼示例,需要說(shuō)明的是,大部分代碼示例都是本人所敲代碼并進(jìn)行測(cè)試,不足之處,請(qǐng)大家指正~
LZ 本來(lái)想先仔細(xì)寫一寫 Hadoop 偽分布式的部署安裝,然后介紹一些 HDFS 的內(nèi)容再來(lái)介紹 MapReduce,是在是沒有抽出空,今天就簡(jiǎn)單入門一下 MapReduce 吧。
一、MapReduce 概述
1.MapReduce 是一種分布式計(jì)算模型,由Google提出,主要用于搜索領(lǐng)域,解決海量數(shù)據(jù)的計(jì)算問(wèn)題.
2.MapReduce 由兩個(gè)階段組成:Map和Reduce,用戶只需要實(shí)現(xiàn)map()和reduce()兩個(gè)函數(shù),即可實(shí)現(xiàn)分布式計(jì)算
二、具體實(shí)現(xiàn)
1.先來(lái)看一下 Eclipse 中此應(yīng)用的包結(jié)構(gòu)

2.創(chuàng)建 map 的任務(wù)處理類:WCMapper
/* * 1.Mapper 類的四個(gè)泛型中,前兩個(gè)指定 mapper 輸入數(shù)據(jù)的類型,后兩個(gè)指定 mapper 輸出數(shù)據(jù)的類型 * KEYIN 是輸入的 key 的類型,VALUEIN 是輸入的 value 的類型 * KEYOUT 是輸出的 key 的類型,VALUEOUT 是輸出的 value 的類型 * 2.map 和 reduce 的數(shù)據(jù)的輸入輸出都是以 key-value 對(duì)的形式封裝的 * 3.默認(rèn)情況下,框架傳遞給我們的 mapper 的輸入數(shù)據(jù)中,key 是要處理的文本中一行的起始偏移量,為 Long 類型, * 這一行的內(nèi)容為 value,為 String 類型的 * 4.后兩個(gè)泛型的賦值需要我們結(jié)合實(shí)際情況 * 5.為了在網(wǎng)絡(luò)中傳輸時(shí)序列化更高效,Hadoop 把 Java 中的 Long 封裝為 LongWritable, 把 String 封裝為 Text */public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> { //重寫 Mapper 中的 map 方法,MapReduce 框架每讀一行數(shù)據(jù)就調(diào)用一次此方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //書寫具體的業(yè)務(wù)邏輯,業(yè)務(wù)要處理的數(shù)據(jù)已經(jīng)被框架傳遞進(jìn)來(lái),就是方法的參數(shù)中的 key 和 value //key 是這一行數(shù)據(jù)的起始偏移量,value 是這一行的文本內(nèi)容 //1.將 Text 類型的一行的內(nèi)容轉(zhuǎn)為 String 類型 String line = value.toString(); //2.使用 StringUtils 以空格切分字符串,返回 String[] String[] words = StringUtils.split(line, " "); //3.循環(huán)遍歷 String[],調(diào)用 context 的 writer()方法,輸出為 key-value 對(duì)的形式 //key:?jiǎn)卧~ value:1 for(String word : words) { context.write(new Text(word), new LongWritable(1)); } }}2.創(chuàng)建 reduce 的任務(wù)處理類:WCReducer:
/* * 1.Reducer 類的四個(gè)泛型中,前兩個(gè)輸入要與 Mapper 的輸出相對(duì)應(yīng)。輸出需要聯(lián)系具體情況自定義 */public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { //框架在 map 處理完之后,將所有的 kv 對(duì)緩存起來(lái),進(jìn)行分組,然后傳遞一個(gè)分組(<key,{values}>,例如:<"hello",{1,1,1,1}>), //調(diào)用此方法 @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context)throws IOException, InterruptedException { //1.定義一個(gè)計(jì)數(shù)器 long count = 0; //2.遍歷 values的 list,進(jìn)行累加求和 for(LongWritable value : values) { //使用 LongWritable 的 get() 方法,可以將 一個(gè) LongWritable 類型轉(zhuǎn)為 Long 類型 count += value.get(); } //3.輸出這一個(gè)單詞的統(tǒng)計(jì)結(jié)果 context.write(key, new LongWritable(count)); }}3.創(chuàng)建一個(gè)類,用來(lái)描述一個(gè)特定的作業(yè):WCRunner,(此類了LZ沒有按照規(guī)范的模式寫)
/** * 此類用來(lái)描述一個(gè)特定的作業(yè) * 例:1.該作業(yè)使用哪個(gè)類作為邏輯處理中的 map,哪個(gè)作為 reduce * 2.指定該作業(yè)要處理的數(shù)據(jù)所在的路徑 * 3.指定該作業(yè)輸出的結(jié)果放到哪個(gè)路徑 */public class WCRunner { public static void main(String[] args) throws Exception { //1.獲取 Job 對(duì)象:使用 Job 靜態(tài)的 getInstance() 方法,傳入 Configuration 對(duì)象 Configuration conf = new Configuration(); Job wcJob = Job.getInstance(conf); //2.設(shè)置整個(gè) Job 所用的類的 jar 包:使用 Job 的 setJarByClass(),一般傳入 當(dāng)前類.class wcJob.setJarByClass(WCRunner.class); //3.設(shè)置本 Job 使用的 mapper 和 reducer 的類 wcJob.setMapperClass(WCMapper.class); wcJob.setReducerClass(WCReducer.class); //4.指定 reducer 輸出數(shù)據(jù)的 kv 類型 注:若 mapper 和 reducer 的輸出數(shù)據(jù)的 kv 類型一致,可以用如下兩行代碼設(shè)置 wcJob.setOutputKeyClass(Text.class); wcJob.setOutputValueClass(LongWritable.class); //5.指定 mapper 輸出數(shù)據(jù)的 kv 類型 wcJob.setMapOutputKeyClass(Text.class); wcJob.setMapOutputValueClass(LongWritable.class); //6.指定原始的輸入數(shù)據(jù)存放路徑:使用 FileInputFormat 的 setInputPaths() 方法 FileInputFormat.setInputPaths(wcJob, new Path("/wc/srcdata/")); //7.指定處理結(jié)果的存放路徑:使用 FileOutputFormat 的 setOutputFormat() 方法 FileOutputFormat.setOutputPath(wcJob, new Path("/wc/output/")); //8.將 Job 提交給集群運(yùn)行,參數(shù)為 true 表示顯示運(yùn)行狀態(tài) wcJob.waitForCompletion(true); }}4.將此項(xiàng)目導(dǎo)出為 jar 文件
步驟:右擊項(xiàng)目 ---> Export ---> Java ---> JAR file --->指定導(dǎo)出路徑(我指定的為:e:/wc.jar) ---> Finish


5.將導(dǎo)出的 jar 包上傳到 linux 上
LZ使用的方法是:在 SecureCRT 客戶端中使用 Alt + p 快捷鍵打開上傳文件的終端,輸入 put e"/wc.jar 即可上傳

6.創(chuàng)建初始測(cè)試文件:words.log
命令: vi words.log 自己輸入測(cè)試數(shù)據(jù)即可

7.在 hdfs 中創(chuàng)建存放初始測(cè)試文件 words.log 的目錄:我們?cè)?WCRunner 中指定的是 /wc/srcdata/
命令:
[hadoop@crawl ~]$ hadoop fs -mkdir /wc
[hadoop@crawl ~]$ hadoop fs -mkdir /wc/srcdata
8.將初始測(cè)試文件 words.log 上傳到 hdfs 的相應(yīng)目錄
命令:[hadoop@crawl ~]$ hadoop fs -put words.log /wc/srcdata
9.運(yùn)行 jar 文件
命令:hadoop jar wc.jar com.software.hadoop.mr.wordcount.WCRunner
此命令為 hadoop jar wc.jar 加上 WCRunner類的全類名,程序的入口為 WCRunner 內(nèi)的 main 方法,運(yùn)行完此命令便可以看到輸出日志信息:

然后前去我們之前配置的存放輸出結(jié)果的路徑(LZ之前設(shè)置的為:/wc/output/)就可以看到 MapReduce 的執(zhí)行結(jié)果了
輸入命令:hadoop fs -ls /wc/output/ 查看以下 /wc/output/ 路徑下的內(nèi)容

結(jié)果數(shù)據(jù)就在第二個(gè)文件中,輸入命令:hadoop fs -cat /wc/output/part-r-00000 即可查看:

至此我們的這個(gè)小應(yīng)用就完成了,是不是很有意思的,LZ 在實(shí)現(xiàn)的時(shí)候還是發(fā)生了一點(diǎn)小意外:

LZ 查閱資料發(fā)現(xiàn)這是由于 jdk 版本不一致導(dǎo)致的錯(cuò)誤,統(tǒng)一 jdk 版本后便沒有問(wèn)題了。
以上這篇MapReduce 入門之一步步自實(shí)現(xiàn)詞頻統(tǒng)計(jì)功能的教程就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持武林網(wǎng)。
新聞熱點(diǎn)
疑難解答
圖片精選