国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

Hadoop mapreduce自定義排序WritableComparable

2019-11-14 22:22:34
字體:
來源:轉載
供稿:網友
Hadoop maPReduce自定義排序WritableComparable

本文發表于本人博客。

今天繼續寫練習題,上次對分區稍微理解了一下,那根據那個步驟分區、排序、分組、規約來的話,今天應該是要寫個排序有關的例子了,那好現在就開始!

說到排序我們可以查看下hadoop源碼里面的WordCount例子中對LongWritable類型定義,它實現抽象接口WritableComparable,代碼如下:

public interface WritableComparable<T> extends Writable, Comparable<T> {}public interface Writable {  void write(DataOutput out) throws IOException;  void readFields(DataInput in) throws IOException;}

其中Writable抽象接口定義了write以及readFields方法,分別是寫入數據流以及讀取數據流。而Comparable中又有compareTo方法定義比較。竟然hadoop的內置類型有比較大小功能,那么它使用這個內置類型作為map端輸出的話是怎么樣去排序的,這個問題我們先來查看下map任務類MapTask源代碼,內部有內置MapOutputBuffer類,在spill accounting注釋下面有個排序字段:

private final IndexedSorter sorter;

這個字段是由:

sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);

可以看出,這個排序算法可以在配置文件中指定,不過默認是快速排序QuickSort。這個QuickSort內部有幾個重要的方法:

public void sort(final IndexedSortable s, int p, int r,final Progressable rep);private static void sortInternal(final IndexedSortable s, int p, int r,final Progressable rep, int depth);

其中在傳遞參數IndexSortable的時候是用MapOutputBuffer當前來傳遞,因為這個MapOutputBuffer也繼承IndexedSortable.這樣在QuickSort排序sort中就會使用MapOutputBuffer類中的compare方法進行比較,可以看下面源代碼:

    public int compare(int i, int j) {      final int ii = kvoffsets[i % kvoffsets.length];      final int ij = kvoffsets[j % kvoffsets.length];      // sort by partition      if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {        return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];      }      // sort by key      return comparator.compare(kvbuffer,          kvindices[ii + KEYSTART],          kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],          kvbuffer,          kvindices[ij + KEYSTART],          kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);    }

然而這個方法中comparator默認是由節點“mapred.output.key.comparator.class”決定,也可以看源碼:

  public RawComparator getOutputKeyComparator() {    Class<? extends RawComparator> theClass = getClass("mapred.output.key.comparator.class",            null, RawComparator.class);    if (theClass != null)      return ReflectionUtils.newInstance(theClass, this);    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));  }

就是這樣把排序以及比較方法關聯起來了!那現在我們可以按照LongWritable的思路實現自己的自定義類型并且讀取、寫入、比較。下面寫寫代碼加深下記憶,既然是排序那我們準備下數據,如下有2列數據要求按照第一列升序,第二列降序排序:

1    21    13    03    22    21    2

先自定義類型SortAPI:

public class SortAPI implements WritableComparable<SortAPI> {    /**     * 第一列數據     */    public Long first;    /**     * 第二列數據     */    public Long second;        public SortAPI(){}    public SortAPI(long first,long second){        this.first = first;        this.second = second;    }    /**     * 排序就在這里當:this.first - o.first > 0 升序,小于0倒序     */    @Override    public int compareTo(SortAPI o) {        long mis = (this.first - o.first);        if(mis != 0 ){            return (int)mis;        }        else{            return (int)(this.second - o.second);        }    }    @Override    public void write(DataOutput out) throws IOException {        out.writeLong(first);        out.writeLong(second);    }    @Override    public void readFields(DataInput in) throws IOException {        this.first = in.readLong();        this.second = in.readLong();            }    @Override    public int hashCode() {        return this.first.hashCode() + this.second.hashCode();    }    @Override    public boolean equals(Object obj) {        if(obj instanceof SortAPI){            SortAPI o = (SortAPI)obj;            return this.first == o.first && this.second == o.second;        }        return false;    }    @Override    public String toString() {        return "first:" + this.first + "second:" + this.second;    }}

這類型重寫compareTo(SortAPI o)、write(DataOutput out)、readFields(DataInput in),既然是有比較那么以前說的就一定要重寫hashCode()、equals(Object obj)方法了,這點不要忘記!還需要主要在write方法以及readFields方法中讀寫是有順序:先write什么字段就先read什么字段。其次這個compareTo(SortAPI o)方法中返回是整型大于0、0、以及小于0代表大于、等于、小于。至于怎么判斷2行數據是不是相等,不相等怎么比較著邏輯可以慢慢看下。

下面寫個自定義Mapper、Reducer類以及main函數:

public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> {            @Override    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {        String[] splied = value.toString().split("/t");        try {            long first = Long.parseLong(splied[0]);            long second = Long.parseLong(splied[1]);            context.write(new SortAPI(first,second), new LongWritable(1));        } catch (Exception e) {            System.out.println(e.getMessage());        }    }}
public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> {    @Override    protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {        context.write(new LongWritable(key.first), new LongWritable(key.second));    }    }
    static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/sort/output/";    static final String INPUT_DIR = "hdfs://hadoop-master:9000/sort/input/test.txt";        public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, Test.class.getSimpleName());                deleteOutputFile(OUTPUT_DIR);                //1設置輸入目錄        FileInputFormat.setInputPaths(job, INPUT_DIR);        //2設置輸入格式化類        job.setInputFormatClass(TextInputFormat.class);        //3設置自定義Mapper以及鍵值類型        job.setMapperClass(MyMapper.class);        job.setMapOutputKeyClass(SortAPI.class);        job.setMapOutputValueClass(LongWritable.class);        //4分區        job.setPartitionerClass(HashPartitioner.class);        job.setNumReduceTasks(1);                //5排序分組        //6設置在一定Reduce以及鍵值類型        job.setReducerClass(MyReduce.class);        job.setOutputKeyClass(LongWritable.class);        job.setOutputValueClass(LongWritable.class);        //7設置輸出目錄        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));        //8提交job        job.waitForCompletion(true);    }        static void deleteOutputFile(String path) throws Exception{        Configuration conf = new Configuration();        FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);        if(fs.exists(new Path(path))){            fs.delete(new Path(path));        }    }

這樣在eclipse下就可以直接運行查看結果:

1       11       22       23       03       2

這結果正確,那如果要求第一列倒敘第二列升序呢,怎么辦,這只需要修改下compareTo(SortAPI o):

    @Override    public int compareTo(SortAPI o) {        long mis = (this.first - o.first) * -1 ;        if(mis != 0 ){            return (int)mis;        }        else{            return (int)(this.second - o.second);        }    }

這樣保存在運行,結果:

3       03       22       21       11       2

也正確吧符合自己的這個要求。

留個小問題:這個compareTo(SortAPI o)方法在什么時候調用了,總共調用了幾次?

這次先到這里。堅持記錄點點滴滴!


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 江陵县| 屏东市| 康马县| 巴林左旗| 府谷县| 青州市| 镇宁| 西贡区| 保德县| 龙川县| 泰安市| 多伦县| 虹口区| 沾化县| 调兵山市| 新野县| 水城县| 丰台区| 龙门县| 茌平县| 苗栗市| 大田县| 屯门区| 阿鲁科尔沁旗| 三明市| 建阳市| 达日县| 社会| 改则县| 台安县| 宝清县| 麻城市| 衡东县| 十堰市| 丰县| 上思县| 志丹县| 海门市| 六安市| 绥阳县| 郧西县|