1. 程式人生 > >求共同好友,多job運用

求共同好友,多job運用

package bd1805day09;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ManyJob { //多job串聯,求共同好友 /** * A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I 第一步將每個好友關注哪些使用者 第二步將兩兩使用者的共同好友求出來 建立兩個MR可以和容易完成 */ //第一個MapReduce static class MyMapper extends
Mapper<LongWritable, Text, Text, Text>{
@Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //A-B //A:B,C,D,F,E,O String line = value.toString(); String[] user_friends = line.split(":"); //獲取的是所有的好友 String[] friends = user_friends[1].split(","); //迴圈遍歷好友 和使用者拼接傳送 for(String f:friends){ context.write(new Text(f), new Text(user_friends[0])); } } } static class MyReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //同一個好友的所有使用者,A:B,C,D //將values進行迴圈遍歷拼接 StringBuffer sb=new StringBuffer(); for(Text v:values){ sb.append(v.toString()).append(","); //A F,I,O,K,G,D,C,H,B } context.write(key, new Text(sb.substring(0,sb.length()-1))); } }

第二個MapReduce

static class MyMapper2 extends Mapper<LongWritable, Text, Text, Text>{
                @Override
                protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                        throws IOException, InterruptedException {
                    String line = value.toString();
                    String[] friend_users = line.split("\t");
                    String[] users = friend_users[1].split(",");
                     //A    F,I,O,K,G,D,C,H,B
                    //迴圈遍歷使用者  進行兩兩拼接   拼接順序問題   a-b    b-a
                    for(String ul:users){//拼左側的
                        for(String ur:users){//拼右側的
                            if(ul.charAt(0)<ur.charAt(0)){  
                                String uu=ul+"-"+ur;        
                                System.out.println(uu);
                                context.write(new Text(uu), new Text(friend_users[0]));//A-E    C
                            }
                        }
                    }
                }   
            }
            static class MyReducer2 extends Reducer<Text, Text, Text, Text>{
                //相同的兩兩使用者為一組
                @Override
                protected void reduce(Text key, Iterable<Text> values, 
                        Reducer<Text, Text, Text, Text>.Context context)
                        throws IOException, InterruptedException {
                    StringBuffer sb=new StringBuffer();
                    for(Text v:values){
                        sb.append(v.toString()).append(",");
                    }
                    context.write(key, new Text(sb.substring(0, sb.length()-1)));  //A-E    C,D,B
                }
            }

//建立驅動,載入job任務

    public static void main(String[] args) throws IllegalArgumentException, IOException, URISyntaxException, InterruptedException {
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        Configuration conf=new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
        Job job1=Job.getInstance(conf);             //建立第一個job

        job1.setJarByClass(bd1805day09.ManyJob.class);

        job1.setMapperClass(MyMapper.class);
        job1.setReducerClass(MyReducer.class);

        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job1, new Path("hdfs://hadoop02:9000/friendin"));
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop02:9000"), conf);//建立一個hdfs的檔案系統
        Path path = new Path("/friendout_01");
        if(fs.exists(path)){
            fs.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job1, new Path("/friendout_01"));


        Job job2=Job.getInstance(conf);         //建立第二個job

        job2.setJarByClass(bd1805day09.ManyJob.class);

        job2.setMapperClass(MyMapper2.class);
        job2.setReducerClass(MyReducer2.class);

        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job2, new Path("/friendout_01"));

        Path path1=new Path("/friendout_03");
        if(fs.exists(path1)){           //對所在路徑下的檔案清除
            fs.delete(path1, true);
        }
        FileOutputFormat.setOutputPath(job2,path1);
        //提交兩個job  組:需要一起執行的job  組名隨意
        JobControl jc=new JobControl("wc_sort");
        //job.xml
        ControlledJob ajob=new ControlledJob(job1.getConfiguration());
        ControlledJob bjob=new ControlledJob(job2.getConfiguration());

        //需要新增多個job之間的依賴關係
        bjob.addDependingJob(ajob);
        //bjob.addDependingJob(cjob);


        jc.addJob(ajob);
        jc.addJob(bjob);

        //提交job  啟動可一個執行緒
        new Thread(jc).start();    
        //關閉了這個執行緒  應該在上面的執行緒執行完成之後進行關閉
        //判斷jc物件上的job是否全部執行完成   執行完成 true   不完成  false
        while(!jc.allFinished()){
            Thread.sleep(500);
        }
        jc.stop();

    }
}