1. 程式人生 > >hadoop求共同好友一個MapReduce搞定

hadoop求共同好友一個MapReduce搞定

Map端

package com.yd.CommFriend;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CommMapper extends Mapper<LongWritable, Text, Text, Text>{
	
	@Override
	protected void map
(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //A:N,K,L資料格式 A 和他的朋友 String[] split = value.toString().split(":"); //key表示本人,value是他的朋友們 context.write(new Text(split[0]), new Text(split[1])); } }

Reduce端

package com.yd.CommFriend;

import java.
io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import MR.OrderBean; public
class CommReducer extends Reducer<Text, Text, Text, Text>{ List<List<String>> personAndFriends = new ArrayList<>(); @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //其實迭代器中只有一個值 for(Text t : values) { String[] split = t.toString().split(","); List<String> friends = new ArrayList<>(); //第一個是本人,其餘的都是他的好友 friends.add(0,key.toString()); for(String s: split) { friends.add(s); } System.out.println(friends); personAndFriends.add(friends); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { personAndFriends.sort((ls1,ls2)->{return ls1.get(0).compareTo(ls2.get(0));}); Map<String,String> map = new HashMap<>(); for(int i=0;i< personAndFriends.size();i++) { //取出一個人和他的好友 List<String> begin = personAndFriends.get(i); String beginPerson = begin.get(0);// 取出本人 System.out.println(beginPerson+":"+begin); //迴圈其他集合,看看是不是有共同好友 for(int j=i+1 ; j< personAndFriends.size();j++) { List<String> back = personAndFriends.get(j); String backPerson = back.get(0); //Comparator<OrderBean> c = (a,b)->{return 0;}; //begin.sort((s1,s2)->{return s1.compareTo(s2);}); //back.sort((s1,s2)->{return s1.compareTo(s2);}); for(String friend : back) { //A:N,K,L //N:P,A //上面這種情況是不算共同好友的 if(begin.contains(friend)&&!friend.equals(beginPerson)&&!friend.equals(backPerson)) { String pp = beginPerson+"-"+backPerson; if(map.containsKey(pp)) { String value = map.get(pp)+","+friend; map.remove(pp); map.put(pp, value); }else { map.put(pp, friend); } } } } } //把結果輸出 Set<Entry<String,String>> entrySet = map.entrySet(); for(Entry<String,String> es: entrySet) { context.write(new Text(es.getKey()), new Text(es.getValue())); } } }

Driver端

package com.yd.CommFriend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class CommDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJarByClass(CommDriver.class);
		job.setInputFormatClass(CombineTextInputFormat.class);
		job.setMapperClass(CommMapper.class);
		job.setReducerClass(CommReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.setInputPaths(job, new Path("D:\\friend"));
		FileOutputFormat.setOutputPath(job, new Path("D:\\outFriends"));
		job.waitForCompletion(true);
		
	}

}