Hadoop MapReduce多輸出
FileOutputFormat及其子類產生的文件放在輸出目錄下。每個reducer一個文件并且文件由分區號命名:part-r-00000,part-r-00001,等等。有時可能要對輸出的文件名進行控制或讓每個reducer輸出多個文件。MapReduce為此提供了MultipleOutputFormat類。
MultipleOutputFormat類可以將數據寫到多個文件,這些文件的名稱源于輸出的鍵和值或者任意字符串。這允許每個reducer(或者只有map作業的mapper)創建多個文件。采用name-r-nnnnn形式的文件名用于map輸出,name-r-nnnnn形式的文件名用于reduce輸出,其中name是由程序設定的任意名字,nnnnn是一個指名塊號的整數(從0開始)。塊號保證從不同塊(mapper或者reducer)寫的輸出在相同名字情況下不會沖突。
1. 重定義輸出文件名
我們可以對輸出的文件名進行控制。考慮這樣一個需求:按男女性別來區分度假訂單數據。這需要運行一個作業,作業的輸出是男女各一個文件,此文件包含男女性別的所有數據記錄。
這個需求可以使用MultipleOutputs來實現:
package com.sjf.open.test;import java.io.IOException;import org.apache.commons.lang3.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapred.JobPriority;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.sjf.open.utils.ConfigUtil;/** * Created by xiaosi on 16-11-7. */public class VacationOrderBySex extends Configured implements Tool { public static void main(String[] args) throws Exception { int status = ToolRunner.run(new VacationOrderBySex(), args); System.exit(status); } public static class VacationOrderBySexMapper extends Mapper<LongWritable, Text, Text, Text> { public String fInputPath = ""; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); fInputPath = ((FileSplit) context.getInputSplit()).getPath().toString(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if(fInputPath.contains("vacation_hot_country_order")){ String[] params = line.split("/t"); String sex = params[2]; if(StringUtils.isBlank(sex)){ return; } context.write(new Text(sex.toLowerCase()), value); } } } public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> { private MultipleOutputs<NullWritable, Text> multipleOutputs; @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { multipleOutputs.write(NullWritable.get(), value, key.toString()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } } @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("./run <input> <output>"); System.exit(1); } String inputPath = args[0]; String outputPath = args[1]; int numReduceTasks = 16; Configuration conf = this.getConf(); conf.setBoolean("mapred.output.compress", true); conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); Job job = Job.getInstance(conf); job.setJobName("vacation_order_by_jifeng.si"); job.setJarByClass(VacationOrderBySex.class); job.setMapperClass(VacationOrderBySexMapper.class); job.setReducerClass(VacationOrderBySexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.setNumReduceTasks(numReduceTasks); boolean success = job.waitForCompletion(true); return success ? 0 : 1; }}
新聞熱點
疑難解答