本文發表于本人博客。
今天繼續寫練習題,上次對分區稍微理解了一下,那根據那個步驟分區、排序、分組、規約來的話,今天應該是要寫個排序有關的例子了,那好現在就開始!
說到排序我們可以查看下hadoop源碼里面的WordCount例子中對LongWritable類型定義,它實現抽象接口WritableComparable,代碼如下:
public interface WritableComparable<T> extends Writable, Comparable<T> {}public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException;}其中Writable抽象接口定義了write以及readFields方法,分別是寫入數據流以及讀取數據流。而Comparable中又有compareTo方法定義比較。竟然hadoop的內置類型有比較大小功能,那么它使用這個內置類型作為map端輸出的話是怎么樣去排序的,這個問題我們先來查看下map任務類MapTask源代碼,內部有內置MapOutputBuffer類,在spill accounting注釋下面有個排序字段:
private final IndexedSorter sorter;
這個字段是由:
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);可以看出,這個排序算法可以在配置文件中指定,不過默認是快速排序QuickSort。這個QuickSort內部有幾個重要的方法:
public void sort(final IndexedSortable s, int p, int r,final Progressable rep);private static void sortInternal(final IndexedSortable s, int p, int r,final Progressable rep, int depth);
其中在傳遞參數IndexSortable的時候是用MapOutputBuffer當前來傳遞,因為這個MapOutputBuffer也繼承IndexedSortable.這樣在QuickSort排序sort中就會使用MapOutputBuffer類中的compare方法進行比較,可以看下面源代碼:
public int compare(int i, int j) { final int ii = kvoffsets[i % kvoffsets.length]; final int ij = kvoffsets[j % kvoffsets.length]; // sort by partition if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) { return kvindices[ii + PARTITION] - kvindices[ij + PARTITION]; } // sort by key return comparator.compare(kvbuffer, kvindices[ii + KEYSTART], kvindices[ii + VALSTART] - kvindices[ii + KEYSTART], kvbuffer, kvindices[ij + KEYSTART], kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]); }然而這個方法中comparator默認是由節點“mapred.output.key.comparator.class”決定,也可以看源碼:
public RawComparator getOutputKeyComparator() { Class<? extends RawComparator> theClass = getClass("mapred.output.key.comparator.class", null, RawComparator.class); if (theClass != null) return ReflectionUtils.newInstance(theClass, this); return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); }就是這樣把排序以及比較方法關聯起來了!那現在我們可以按照LongWritable的思路實現自己的自定義類型并且讀取、寫入、比較。下面寫寫代碼加深下記憶,既然是排序那我們準備下數據,如下有2列數據要求按照第一列升序,第二列降序排序:
1 21 13 03 22 21 2
先自定義類型SortAPI:
public class SortAPI implements WritableComparable<SortAPI> { /** * 第一列數據 */ public Long first; /** * 第二列數據 */ public Long second; public SortAPI(){} public SortAPI(long first,long second){ this.first = first; this.second = second; } /** * 排序就在這里當:this.first - o.first > 0 升序,小于0倒序 */ @Override public int compareTo(SortAPI o) { long mis = (this.first - o.first); if(mis != 0 ){ return (int)mis; } else{ return (int)(this.second - o.second); } } @Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); } @Override public int hashCode() { return this.first.hashCode() + this.second.hashCode(); } @Override public boolean equals(Object obj) { if(obj instanceof SortAPI){ SortAPI o = (SortAPI)obj; return this.first == o.first && this.second == o.second; } return false; } @Override public String toString() { return "first:" + this.first + "second:" + this.second; }}這類型重寫compareTo(SortAPI o)、write(DataOutput out)、readFields(DataInput in),既然是有比較那么以前說的就一定要重寫hashCode()、equals(Object obj)方法了,這點不要忘記!還需要主要在write方法以及readFields方法中讀寫是有順序:先write什么字段就先read什么字段。其次這個compareTo(SortAPI o)方法中返回是整型大于0、0、以及小于0代表大于、等于、小于。至于怎么判斷2行數據是不是相等,不相等怎么比較著邏輯可以慢慢看下。
下面寫個自定義Mapper、Reducer類以及main函數:
public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splied = value.toString().split("/t"); try { long first = Long.parseLong(splied[0]); long second = Long.parseLong(splied[1]); context.write(new SortAPI(first,second), new LongWritable(1)); } catch (Exception e) { System.out.println(e.getMessage()); } }}public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> { @Override protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { context.write(new LongWritable(key.first), new LongWritable(key.second)); } } static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/sort/output/"; static final String INPUT_DIR = "hdfs://hadoop-master:9000/sort/input/test.txt"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, Test.class.getSimpleName()); deleteOutputFile(OUTPUT_DIR); //1設置輸入目錄 FileInputFormat.setInputPaths(job, INPUT_DIR); //2設置輸入格式化類 job.setInputFormatClass(TextInputFormat.class); //3設置自定義Mapper以及鍵值類型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(SortAPI.class); job.setMapOutputValueClass(LongWritable.class); //4分區 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //5排序分組 //6設置在一定Reduce以及鍵值類型 job.setReducerClass(MyReduce.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); //7設置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR)); //8提交job job.waitForCompletion(true); } static void deleteOutputFile(String path) throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf); if(fs.exists(new Path(path))){ fs.delete(new Path(path)); } }這樣在eclipse下就可以直接運行查看結果:
1 11 22 23 03 2
這結果正確,那如果要求第一列倒敘第二列升序呢,怎么辦,這只需要修改下compareTo(SortAPI o):
@Override public int compareTo(SortAPI o) { long mis = (this.first - o.first) * -1 ; if(mis != 0 ){ return (int)mis; } else{ return (int)(this.second - o.second); } }這樣保存在運行,結果:
3 03 22 21 11 2
也正確吧符合自己的這個要求。
留個小問題:這個compareTo(SortAPI o)方法在什么時候調用了,總共調用了幾次?
這次先到這里。堅持記錄點點滴滴!
新聞熱點
疑難解答