1. 程式人生 > >MapReduce部分API程式設計練習(好友推薦)

MapReduce部分API程式設計練習(好友推薦)

1、主方法

package com.bjsxt.FOF;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyFOF {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf=new Configuration(true);
		Job job=Job.getInstance(conf);
		job.setJarByClass(MyFOF.class);
		job.setJobName("ooxx");
		
		Path filein=new Path("/user/local");
		FileInputFormat.addInputPath(job, filein);
		
		Path fileout=new Path("/data");
		if(fileout.getFileSystem(conf).exists(fileout)) {
			fileout.getFileSystem(conf).delete(fileout);
		}
		FileOutputFormat.setOutputPath(job, fileout);
		
//		job.setInputFormatClass(cls);
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
//		job.setPartitionerClass(MyPartitioner.class);
//		job.setSortComparatorClass(MyComparator.class);
//		job.setGroupingComparatorClass(MyGrouping.class);
		
//		job.setCombinerClass(MyCombiner.class);
		
		job.setReducerClass(MyReducer.class);
		job.setNumReduceTasks(2);
		
		job.waitForCompletion(true);		
	}
}

2、map方法

package com.bjsxt.FOF;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	Text mkey=new Text();
	IntWritable mval=new IntWritable();
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		
		//相同的key為一組,這一組key呼叫一次reduce方法
		//value::::tom hello hadoop cat
		String[] split = StringUtils.split(value.toString(),' ');
		for(int i=1;i<split.length;i++) {
			mkey.set(getfof(split[0],split[i]));
			mval.set(0);
			context.write(mkey, mval);
			
			for(int j=i+1;j<split.length;j++) {
				mkey.set(getfof(split[i],split[j]));
				mval.set(1);
				context.write(mkey, mval);
			}
		}
	}
	public static String getfof(String s1,String s2) {
		if(s1.compareTo(s2)<0) {
			return s1+":"+s2;
		}else {
			return s2+":"+s1;
		}
	}

}

3、reduce方法

package com.bjsxt.FOF;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MRBench.Reduce;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	
	IntWritable rval=new IntWritable();
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		//相同的key為一組,這一組資料呼叫一次reduce方法
		//方法內迭代這一組資料
		//hadoop:hadoop 0
		int flag=0;
		int sum=0;
		for(IntWritable v:values) {
			if(v.get()==0) {
				flag=1;
			}
			sum+=v.get();
			if(flag==0) {
				rval.set(sum);
				context.write(key, rval);
			}			
		}	
	}
}