国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

hadoop之Kmeans數據挖掘算法實現

2019-11-06 06:37:42
字體:
來源:轉載
供稿:網友

Kmeans是十分常見的數據挖掘算法,其邏輯較為簡單,應用范圍廣。通過百度搜索java實現的Kmeans算法,可參考的版本很多,比如: http://blog.csdn.net/jdplus/article/details/23960127 還有: http://www.cnblogs.com/chaoku/p/3748456.html

雖然作者都表示親測有效,不會有任何問題,然而在實際應用中每個人的環境不同,尤其是hadoop版本的不同,總會出現這樣或者那樣的問題。不過他們的算法給了很好的參考,按照他們的邏輯照虎畫貓,也是可行的。 我的hadoop版本較為老舊,其中最為突出的問題就是在老版本的hadoop中并沒有

org.apache.hadoop.maPReduce.Job;

這個包,這個版本上的差別照成了并不能直接拿大牛們的代碼復制過來就用。隨后在參考了hadoop官網中的案例重新對Kmeans算法進行了實現,代碼參考“瀟灑子弦”較多,也容納我的思考,主要有耽三個大方面的不同:

實現的版本不同,基于低級版本的hadoop予以實現。

在計算距離上有了變化,采用了歐式距離,按照原來的實現方案并不能有效聚類成需要的組別數呢。

將中心點寫入新文件中語句也有變動,按照原始的寫法,似乎會覆蓋掉。

以下是主要代碼:

package mykmeans;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.util.LineReader;public class CopyOfUtils { //讀取中心文件的數據 public static ArrayList<ArrayList<Double>> getCentersFromHDFS(String centersPath,boolean isDirectory) throws IOException{ ArrayList<ArrayList<Double>> result = new ArrayList<ArrayList<Double>>(); Path path = new Path(centersPath); Configuration conf = new Configuration(); FileSystem fileSystem = path.getFileSystem(conf); if(isDirectory){ FileStatus[] listFile = fileSystem.listStatus(path); for (int i = 0; i < listFile.length; i++) { result.addAll(getCentersFromHDFS(listFile[i].getPath().toString(),false)); } return result; } FSDataInputStream fsis = fileSystem.open(path); LineReader lineReader = new LineReader(fsis, conf); Text line = new Text(); while(lineReader.readLine(line) > 0){ //ArrayList<Double> tempList = textToArray(line); ArrayList<Double> tempList = new ArrayList<Double>(); String[] fields = line.toString().replaceAll("/t", "").split(","); for(int i=0;i<fields.length;i++){ tempList.add(Double.parseDouble(fields[i])); } result.add(tempList); } lineReader.close(); return result; } //刪掉文件 public static void deletePath(String pathStr) throws IOException{ Configuration conf = new Configuration(); Path path = new Path(pathStr); FileSystem hdfs = path.getFileSystem(conf); hdfs.delete(path ,true); } public static ArrayList<Double> textToArray(Text text){ ArrayList<Double> list = new ArrayList<Double>(); String[] fileds = text.toString().replaceAll("/t", "").split("/,"); for(int i=0;i<fileds.length;i++){ list.add(Double.parseDouble(fileds[i])); } return list; } public static boolean compareCenters(String centerPath,String newPath) throws IOException{ System.out.println("比較兩個中心點是否相等"); List<ArrayList<Double>> oldCenters = CopyOfUtils.getCentersFromHDFS(centerPath,false); List<ArrayList<Double>> newCenters = CopyOfUtils.getCentersFromHDFS(newPath,true); int size = oldCenters.size(); int fildSize = oldCenters.get(0).size(); double distance = 0; for(int i=0;i<size;i++){ for(int j=0;j<fildSize;j++){ double t1 = Math.abs(oldCenters.get(i).get(j)); double t2 = Math.abs(newCenters.get(i).get(j)); distance += Math.pow((t1 - t2) / (t1 + t2), 2); } } if(distance <= 0.00001){ //刪掉新的中心文件以便最后依次歸類輸出 CopyOfUtils.deletePath(newPath); return true; }else{ //先清空中心文件,將新的中心文件復制到中心文件中,再刪掉中心文件 CopyOfUtils.deletePath(centerPath); Configuration conf = new Configuration(); Path outPath = new Path(centerPath); FileSystem fileSystem = outPath.getFileSystem(conf); FSDataOutputStream out = fileSystem.create(outPath); //out. //將newCenter的內容寫到文件里面 Path inPath = new Path(newPath); FileStatus[] listFiles = fileSystem.listStatus(inPath); for (int i = 0; i < listFiles.length; i++) { FSDataInputStream in = fileSystem.open(listFiles[i].getPath()); int byteRead = 0; byte[] buffer = new byte[256]; while ((byteRead = in.read(buffer)) > 0) { out.write(buffer, 0, byteRead); } in.close(); } out.close(); //刪掉新的中心文件以便第二次任務運行輸出 CopyOfUtils.deletePath(newPath); } return false; }}package mykmeans;import java.io.*;import java.text.DecimalFormat;import java.util.*;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.filecache.DistributedCache;import org.apache.hadoop.conf.*;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.*;public class CopyOfNewMapReduce extends Configured implements Tool{ public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text>{ // private String centerpath; public ArrayList<ArrayList<Double>> centers = null; public int k = 3; public void configure(JobConf job){ String center = job.get("map.center.file"); try { centers = Utils.getCentersFromHDFS(center,false); k = centers.size(); System.out.println("centers point is: "+centers.toString()); } catch (IOException e) { System.err.println("cannot find the map center file!"); e.printStackTrace(); } } @Override public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter report) throws IOException { //讀取一行數據 ArrayList<Double> fileds = new ArrayList<Double>(); String[] temp = value.toString().replaceAll("/t", "").split(","); for(int i = 0; i<temp.length;i++){ fileds.add(Double.parseDouble(temp[i])); } int sizeOfFileds = fileds.size(); double minDistance = 99999999; int centerIndex = 0; //依次取出k個中心點與當前讀取的記錄做計算 for(int i=0;i<k;i++){ double currentDistance = 0; for(int j=0;j<sizeOfFileds;j++){ double centerPoint = centers.get(i).get(j); double filed = fileds.get(j); currentDistance += (centerPoint-filed)*(centerPoint-filed); } currentDistance = Math.sqrt(currentDistance); //循環找出距離該記錄最接近的中心點的ID if(currentDistance<minDistance){ minDistance = currentDistance; centerIndex = i; } } //以中心點為Key 將記錄原樣輸出 output.collect(new IntWritable(centerIndex+1), value); } } public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text, Text, Text> { @Override public void reduce(IntWritable key, Iterator<Text> value, OutputCollector<Text, Text> output, Reporter report) throws IOException {ArrayList<ArrayList<Double>> filedsList = new ArrayList<ArrayList<Double>>();DecimalFormat df0 = new DecimalFormat("###.000000"); //依次讀取記錄集,每行為一個ArrayList<Double> System.out.println(key+": "+value.toString()); while(value.hasNext()){ ArrayList<Double> tempList = new ArrayList<Double>(); String[] temp0 = value.next().toString().replaceAll("/t", "").split(","); for(int i = 0; i< temp0.length; i++){ tempList.add(Double.parseDouble(df0.format(Double.parseDouble(temp0[i])))); } filedsList.add(tempList); } //計算新的中心 //每行的元素個數 int filedSize = filedsList.get(0).size(); double[] avg = new double[filedSize]; for(int i=0;i<filedSize;i++){ //求沒列的平均值 double sum = 0; int size = filedsList.size(); for(int j=0;j<size;j++){ sum += filedsList.get(j).get(i); } avg[i] = sum / size; avg[i] = Double.parseDouble(df0.format(avg[i])); } output.collect(new Text("") , new Text(Arrays.toString(avg).replace("[", "").replace("]", "").replaceAll("/t", ""))); } } @Override public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), CopyOfNewMapReduce.class); conf.setJobName("kmeans"); conf.setMapperClass(Map.class); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(Text.class); if(!"false".equals(args[3])||"true".equals(args[3])){ conf.setReducerClass(Reduce.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); } FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.set("map.center.file", args[2]); JobClient.runJob(conf); return 0; } public static void main(String[] args)throws Exception{ int count = 0; int res = 0; while(true){ res = ToolRunner.run(new Configuration(), new CopyOfNewMapReduce(), args); System.out.println(" 第 " + ++count + " 次計算 "); if(Utils.compareCenters(args[2],args[1] )){ String lastarg[] = new String[args.length]; for(int i=0; i < args.length-1; i++){ lastarg[i] = args[i]; } lastarg[args.length-1] = "false"; res = ToolRunner.run(new Configuration(), new CopyOfNewMapReduce(), lastarg); break; } } System.exit(res); }}

編譯后,打成jar包,注意java版本的一致性。 在hadoop客戶端執行:

~hadoop>bin/hadoop jar MyKmeans.jar mykmeans.CopyOfNewMapReduce /xxxx/kmeans/input*.file /xxx/output /xxx/kmeans/cluster.file true

需要保證輸入的數據內容以逗號“,”隔開,初始中心點需要自行設置,而不是隨機取的,同樣以逗號“,”隔開。 最后實現的效果在output文件夾下,每個聚類在一個文件中,輸出的數據格式截圖如下: 代碼結果截圖

如此便實現了低版本的hadoop上kmeans算法的實現,有問題歡迎交流。

引用:MapReduce Kmeans聚類算法.http://www.cnblogs.com/chaoku/p/3748456.html


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 利川市| 囊谦县| 宁河县| 乐亭县| 十堰市| 长丰县| 阜新市| 武定县| 鄂州市| 曲松县| 曲麻莱县| 丽水市| 鲁山县| 四川省| 青铜峡市| 泉州市| 濮阳市| 西安市| 宝兴县| 弋阳县| 乌拉特后旗| 南部县| 合山市| 体育| 永丰县| 沽源县| 邛崃市| 普洱| 白朗县| 许昌县| 沙坪坝区| 宁武县| 金华市| 铅山县| 蛟河市| 湘潭县| 太保市| 龙海市| 法库县| 乐业县| 唐山市|