1. 程式人生 > >一些演算法的MapReduce實現——圖的BFS遍歷

一些演算法的MapReduce實現——圖的BFS遍歷

package com.joey.mapred.graph;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;

import com.joey.mapred.BaseDriver;
import com.joey.mapred.graph.TraverseGraph.TraverseMapper;
import com.joey.mapred.graph.TraverseGraph.TraverseReducer;
import com.joey.mapred.graph.utils.Node;

/**
 * Description : MapReduce program to solve the single-source shortest path
 * problem using parallel breadth-first search. This program also illustrates
 * how to perform iterative map-reduce.
 * 
 * The single source shortest path is implemented by using Breadth-first search
 * concept.
 * 
 * Reference :
 * http://www.johnandcailin.com/blog/cailin/breadth-first-graph-search
 * -using-iterative-map-reduce-algorithm
 * 
 */

public class BFSearchDriver extends BaseDriver {

  static class SearchMapperSSSP extends TraverseMapper {

    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {

      Node inNode = new Node(value.toString());
      // calls the map method of the super class SearchMapper
      super.map(key, value, context, inNode);

    }
  }
  
  
  static class SearchReducerSSSP extends TraverseReducer {

    // the parameters are the types of the input key, the values associated with
    // the key and the Context object through which the Reducer communicates
    // with the Hadoop framework

    public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {

      // create a new out node and set its values
      Node outNode = new Node();

      // call the reduce method of SearchReducer class
      outNode = super.reduce(key, values, context, outNode);

      // if the color of the node is gray, the execution has to continue, this
      // is done by incrementing the counter
      if (outNode.getColor() == Node.Color.GRAY)
        context.getCounter(MoreIterations.numberOfIterations).increment(1L);
    }
  }
  
  public int run(String[] args) throws Exception {
    int iterationCount = 0; // counter to set the ordinal number of the
                            // intermediate outputs
    Job job;
    long terminationValue = 1;
    
    // while there are more gray nodes to process
    while (terminationValue > 0) {
      job = getJobConf(args); // get the job configuration
      String input, output;

      // setting the input file and output file for each iteration
      // during the first time the user-specified file will be the input whereas
      // for the subsequent iterations
      // the output of the previous iteration will be the input
      if (iterationCount == 0) { 
      	// for the first iteration the input will be the first input argument
        input = args[0];
      } else {
        // for the remaining iterations, the input will be the output of the
        // previous iteration
        input = args[1] + iterationCount;
      }
      output = args[1] + (iterationCount + 1); // setting the output file

      FileInputFormat.setInputPaths(job, new Path(input)); 
      FileOutputFormat.setOutputPath(job, new Path(output)); 

      job.waitForCompletion(true); 

      Counters jobCntrs = job.getCounters();
      terminationValue = jobCntrs
          .findCounter(MoreIterations.numberOfIterations).getValue();
      iterationCount++;
    }

    return 0;
  }
  
  static enum MoreIterations {
    numberOfIterations
  }
  
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new BFSearchDriver(), args);
    if(args.length != 2){
      System.err.println("Usage: <in> <output name> ");
    }
    System.exit(res);
  }


  @Override
  protected Job getJobConf(String[] args) throws Exception {
    JobInfo jobInfo = new JobInfo() {

      @Override
      public Class<?> getJarByClass() {
        return BFSearchDriver.class;
      }

      @Override
      public Class<? extends Mapper> getMapperClass() {
        return SearchMapperSSSP.class;
      }

      @Override
      public Class<? extends Reducer> getCombinerClass() {
        return null;
      }

      @Override
      public Class<? extends Reducer> getReducerClass() {
        return SearchReducerSSSP.class;
      }

      @Override
      public Class<?> getOutputKeyClass() {
        return Text.class;
      }

      @Override
      public Class<?> getOutputValueClass() {
        return Text.class;
      }

      @Override
      public Class<? extends InputFormat> getInputFormatClass() {
        return TextInputFormat.class;
      }

      @Override
      public Class<? extends OutputFormat> getOutputFormatClass() {
        return TextOutputFormat.class;
      }
      
    };
    return setupJob("BFSSearch", jobInfo);
  }

}

BaseDriver.java

package com.joey.mapred;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;

public abstract class BaseDriver extends Configured implements Tool {

  // method to set the configuration for the job and
  // the mapper and the reducer classes
  protected Job setupJob(String jobName, JobInfo jobInfo) throws Exception {
   Configuration conf = getConf();
  
   if (conf == null) {
   throw new RuntimeException("Configuration should not be null");
   }
  
    Job job = new Job(conf, jobName);
    // set the several classes
    job.setJarByClass(jobInfo.getJarByClass());
    // set the mapper class
    job.setMapperClass(jobInfo.getMapperClass());

    // the combiner class is optional, so set it only if it
    // is required by the program
    if (jobInfo.getCombinerClass() != null)
      job.setCombinerClass(jobInfo.getCombinerClass());

    // set the reducer class
    job.setReducerClass(jobInfo.getReducerClass());

    // the number of reducers is set to 3, this can be
    // altered according to the program's requirements
    job.setNumReduceTasks(3);

    // set the type of the output key and value for the
    // Map & Reduce functions
    job.setOutputKeyClass(jobInfo.getOutputKeyClass());
    job.setOutputValueClass(jobInfo.getOutputValueClass());
    if (jobInfo.getInputFormatClass() != null)
      job.setInputFormatClass(jobInfo.getInputFormatClass());
    if (jobInfo.getOutputFormatClass() != null)
      job.setOutputFormatClass(jobInfo.getOutputFormatClass());
    return job;
  }

  protected abstract Job getJobConf(String[] args) throws Exception;
  
  protected abstract class JobInfo {
    public abstract Class<?> getJarByClass();

    public abstract Class<? extends Mapper> getMapperClass();

    public abstract Class<? extends Reducer> getCombinerClass();

    public abstract Class<? extends Reducer> getReducerClass();

    public abstract Class<?> getOutputKeyClass();

    public abstract Class<?> getOutputValueClass();
    
    public abstract Class<? extends InputFormat> getInputFormatClass();
    
    public abstract Class<? extends OutputFormat> getOutputFormatClass();
  }
}