国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

Hadoop mapreduce自定義分組RawComparator

2019-11-14 22:41:39
字體:
來源:轉載
供稿:網友
Hadoop maPReduce自定義分組RawComparator

本文發表于本人博客。

今天接著上次【Hadoop mapreduce自定義排序WritableComparable】文章寫,按照順序那么這次應該是講解自定義分組如何實現,關于操作順序在這里不多說了,需要了解的可以看看我在博客園的評論,現在開始。

首先我們查看下Job這個類,發現有setGroupingComparatorClass()這個方法,具體源碼如下:

  /**   * Define the comparator that controls which keys are grouped together   * for a single call to    * {@link Reducer#reduce(Object, Iterable,    *                       org.apache.hadoop.mapreduce.Reducer.Context)}   * @param cls the raw comparator to use   * @throws IllegalStateException if the job is submitted   */  public void setGroupingComparatorClass(Class<? extends RawComparator> cls                                         ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setOutputValueGroupingComparator(cls);  }

從方法的源碼可以看出這個方法是定義自定義鍵分組功能。設置這個自定義分組類必須滿足extends RawComparator,那我們可以看下這個類的源碼:

/** * <p> * A {@link Comparator} that Operates directly on byte representations of * objects. * </p> * @param <T> * @see DeserializerComparator */public interface RawComparator<T> extends Comparator<T> {  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);}

然而這個RawComparator是泛型繼承Comparator接口的,簡單看了下那我們來自定義一個類繼承RawComparator,代碼如下:

public class MyGrouper implements RawComparator<SortAPI> {    @Override    public int compare(SortAPI o1, SortAPI o2) {        return (int)(o1.first - o2.first);    }    @Override    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {        int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);        return compareBytes;    }    }

源碼中SortAPI是上節自定義排序中的定義對象,第一個方法從注釋可以看出是比較2個參數的大小,返回的是自然整數;第二個方法是在反序列化時比較,所以需要是用字節比較。接下來我們繼續看看自定義MyMapper類:

public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> {        @Override    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {        String[] splied = value.toString().split("/t");        try {            long first = Long.parseLong(splied[0]);            long second = Long.parseLong(splied[1]);            context.write(new SortAPI(first,second), new LongWritable(1));        } catch (Exception e) {            System.out.println(e.getMessage());        }    }    }

自定義MyReduce類:

public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> {    @Override    protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {        context.write(new LongWritable(key.first), new LongWritable(key.second));    }    }

自定義SortAPI類:

public class SortAPI implements WritableComparable<SortAPI> {    public Long first;    public Long second;    public SortAPI(){            }    public SortAPI(long first,long second){        this.first = first;        this.second = second;    }    @Override    public int compareTo(SortAPI o) {        return (int) (this.first - o.first);    }    @Override    public void write(DataOutput out) throws IOException {        out.writeLong(first);        out.writeLong(second);    }    @Override    public void readFields(DataInput in) throws IOException {        this.first = in.readLong();        this.second = in.readLong();            }    @Override    public int hashCode() {        return this.first.hashCode() + this.second.hashCode();    }    @Override    public boolean equals(Object obj) {        if(obj instanceof SortAPI){            SortAPI o = (SortAPI)obj;            return this.first == o.first && this.second == o.second;        }        return false;    }        @Override    public String toString() {        return "輸出:" + this.first + ";" + this.second;    }    }

接下來準備數據,數據如下:

1       21       13       03       22       21       2

上傳至hdfs://hadoop-master:9000/grouper/input/test.txt,main代碼如下:

public class Test {    static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/grouper/output/";    static final String INPUT_DIR = "hdfs://hadoop-master:9000/grouper/input/test.txt";    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, Test.class.getSimpleName());            job.setJarByClass(Test.class);        deleteOutputFile(OUTPUT_DIR);        //1設置輸入目錄        FileInputFormat.setInputPaths(job, INPUT_DIR);        //2設置輸入格式化類        job.setInputFormatClass(TextInputFormat.class);        //3設置自定義Mapper以及鍵值類型        job.setMapperClass(MyMapper.class);        job.setMapOutputKeyClass(SortAPI.class);        job.setMapOutputValueClass(LongWritable.class);        //4分區        job.setPartitionerClass(HashPartitioner.class);        job.setNumReduceTasks(1);        //5排序分組        job.setGroupingComparatorClass(MyGrouper.class);        //6設置在一定Reduce以及鍵值類型        job.setReducerClass(MyReduce.class);        job.setOutputKeyClass(LongWritable.class);        job.setOutputValueClass(LongWritable.class);        //7設置輸出目錄        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));        //8提交job        job.waitForCompletion(true);    }        static void deleteOutputFile(String path) throws Exception{        Configuration conf = new Configuration();        FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);        if(fs.exists(new Path(path))){            fs.delete(new Path(path));        }    }}

執行代碼,然后在節點上用終端輸入:hadoop fs -text /grouper/output/part-r-00000查看結果:

1       22       23       0

接下來我們修改下SortAPI類的compareTo()方法:

    @Override    public int compareTo(SortAPI o) {        long mis = (this.first - o.first) * -1;        if(mis != 0 ){            return (int)mis;        }        else{            return (int)(this.second - o.second);        }    }

再次執行并查看/grouper/output/part-r-00000文件:

3       02       21       1

這樣我們就得出了同樣的數據分組結果會受到排序算法的影響,比如排序是倒序那么分組也是先按照倒序數據源進行分組輸出。我們還可以在map函數以及reduce函數中打印記錄(過程省略)這樣經過對比也得出分組階段:鍵值對中key相同(即compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法返回0)的則為一組,當前組再按照順序選擇第一個往緩沖區輸出(也許會存儲到硬盤)。其它的相同key的鍵值對就不會再往緩沖區輸出了。在百度上檢索到這邊文章,其中它的分組是把map函數輸出的value全部迭代到同一個key中,就相當于上面{key,value}:{1,{2,1,2}},這個結果跟最開始沒有自定義分組時是一樣的,我們可以在reduce函數輸出Iterable<LongWritable> values進行查看,其實我覺得這樣的才算是分組吧就像數據查詢一樣。

在這里我們應該要弄懂分組與分區的區別。分區是對輸出結果文件進行分類拆分文件以便更好查看,比如一個輸出文件包含所有狀態的http請求,那么為了方便查看通過分區把請求狀態分成幾個結果文件。分組就是把一些相同鍵的鍵值對進行計算減少輸出;分區之后數據全部還是照樣輸出到reduce端,而分組的話就有所減少了;當然這2個步驟也是不同的階段執行。

這次先到這里。堅持記錄點點滴滴!


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 疏附县| 赣州市| 绥滨县| 东兴市| 南汇区| 香河县| 萝北县| 岑溪市| 民勤县| 横山县| 长岭县| 上犹县| 昌江| 大港区| 沙湾县| 东海县| 白沙| 黄浦区| 荥经县| 黎平县| 全椒县| 深州市| 武功县| 平顶山市| 托里县| 玛沁县| 克山县| 镇赉县| 曲麻莱县| 曲水县| 灵山县| 江阴市| 马尔康县| 筠连县| 灵石县| 德阳市| 永寿县| 洛扎县| 雅安市| 建德市| 屏南县|