1. 程式人生 > >mapreduce學習筆記三:平均值

mapreduce學習筆記三:平均值

求平均數是MapReduce比較常見的演算法,求平均數的演算法也比較簡單,一種思路是Map端讀取資料,在資料輸入到Reduce之前先經過shuffle,將map函式輸出的key值相同的所有的value值形成一個集合value-list,然後將輸入到Reduce端,Reduce端彙總並且統計記錄數,然後作商即可。

package mapreduce;  
import java.io.IOException;  
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyAverage{ public static class Map extends Mapper<Object , Text , Text , IntWritable>{
private static Text newKey=new Text(); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); System.out.println(line); String arr[]=line.split(" "); newKey.set(arr[0]); System.out.println(arr[0]); System.out.println(arr[1]); int click=Integer.parseInt(arr[1]); context.write(newKey, new IntWritable(click)); } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{ public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int num=0; int count=0; for(IntWritable val:values){ num+=val.get(); count++; } int avg=num/count; context.write(key,new IntWritable(avg)); } } @SuppressWarnings("deprecation") public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf=new Configuration(); conf.set("dfs.client.use.datanode.hostname", "true"); System.out.println("start"); Job job =new Job(conf,"MyAverage"); job.setJarByClass(MyAverage.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in=new Path("hdfs://*:9000/user/hadoop/input/c.txt"); System.out.println("in執行完畢"); Path out=new Path("hdfs://*:9000/user/hadoop/output"); System.out.println("out執行完畢"); Path path = new Path("hdfs://*:9000/user/hadoop/output");// 取第1個表示輸出目錄引數(第0個引數是輸入目錄) FileSystem fileSystem = path.getFileSystem(conf);// 根據path找到這個檔案 if (fileSystem.exists(path)) { fileSystem.delete(path, true);// true的意思是,就算output有東西,也一帶刪除 } FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }