1. 程式人生 > >MapReduce-Join中級優化-hadoop自帶datajoin的解決方法

MapReduce-Join中級優化-hadoop自帶datajoin的解決方法

接著上一篇《MapReuce-Join操作-初級優化》這一篇部落格繼續說明MapReduce對於Join的操作,這裡使用hadoop包中自帶的datajoin包來處理,如果是hadoop1.x則包在${HADOOP_HOME}/contrib/datajoin資料夾下。如果是hadoop2.x則該包在${HADOOP_HOME}/share/hadoop/tools/lib下面把包引入工程中就可以使用了。
以下是本篇部落格要處理的資料,為了我們前兩篇進行用法上的比較,這裡使用同樣的資料:
uid,name,phoneid
1,tom,40
2,jack,20
3,seven,30
4,lee,10
5,smith,20
6,張三,10
7,李四,30
8,王五,20

goodid,name
10,蘋果
20,三星
30,LG
40,華為

輸出結果:
lee 蘋果
張三 蘋果
jack 三星
smith 三星
王五 三星
seven LG
李四 LG
tom 華為
下面說說datajoin包的基本用法:
首先來看看Map端的寫法:
Map端要繼承DataJoinMapperBase類
public abstract class DataJoinMapperBase extends JobBase 

並實現以下幾個方法:

/**
   * Determine the source tag based on the input file name.
   * 
   * @param inputFile
   * @return the source tag computed from the given file name.
   */
  protected abstract Text generateInputTag(String inputFile);

  /**
   * Generate a tagged map output value. The user code can also perform
   * projection/filtering. If it decides to discard the input record when
   * certain conditions are met,it can simply return a null.
   * 
   * @param value
   * @return an object of TaggedMapOutput computed from the given value.
   */
  protected abstract TaggedMapOutput generateTaggedMapOutput(Object value);

  /**
   * Generate a map output key. The user code can compute the key
   * programmatically, not just selecting the values of some fields. In this
   * sense, it is more general than the joining capabilities of SQL.
   * 
   * @param aRecord
   * @return the group key for the given record
   */
  protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
下面來看看configure()和map()函式的執行過程:
public void configure(JobConf job) {
    super.configure(job);
    this.job = job;
    this.inputFile = job.get(MRJobConfig.MAP_INPUT_FILE);
    //生成該map的資料的Tag
    this.inputTag = generateInputTag(this.inputFile);
  }

  public void map(Object key, Object value,
                  OutputCollector output, Reporter reporter) throws IOException {
    if (this.reporter == null) {
      this.reporter = reporter;
    }
    //記錄總記錄條數
    addLongValue("totalCount", 1);
    //把原始行記錄成生一個TaggedMapOutput的物件
    TaggedMapOutput aRecord = generateTaggedMapOutput(value);
    if (aRecord == null) {
      //記錄不合格的字條數
      addLongValue("discardedCount", 1);
      return;
    }
    Text groupKey = generateGroupKey(aRecord);
    if (groupKey == null) {
      //記錄分組鍵為空的記錄條數
      addLongValue("nullGroupKeyCount", 1);
      return;
    }
    //輸出分組鍵和TaggedMapOutput的物件
    output.collect(groupKey, aRecord);
    addLongValue("collectedCount", 1);
  }
  //主要功能為把map物件中對應的name的計數器加1
  protected Long addLongValue(Object name, long inc) {
    Long val = this.longCounters.get(name);
    Long retv = null;
    if (val == null) {
      retv = Long.valueOf(inc);
    } else {
      retv = Long.valueOf(val.longValue() + inc);
    }
    this.longCounters.put(name, retv);
    return retv;
  }
以上知道了map()中的處理流程過後,
首先我們要創造一個用於傳輸的實體類,必須繼承TaggedMapOutput,下面是參考程式碼以及註釋:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class TaggedWritable extends TaggedMapOutput {
	/**
	 * 這樣定義報空以下導常:
	 * Error: java.lang.NullPointerException
     	 *   at com.seven.mapreduce.join.TaggedWritable.readFields(TaggedWritable.java:32)
	 *   org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106)
	 *   可以參考http://stackoverflow.com/questions/10201500/hadoop-reduce-side-join-using-datajoin
	 *   解決。
	 */
	private Writable data;

	public TaggedWritable() {
		this.tag = new Text();
	}
	public TaggedWritable(Writable data) { 
		this.tag = new Text("");
        this.data = data;  
    }
	public void setData(Writable data) {
		this.data = data;
	}
	public void readFields(DataInput arg0) throws IOException {
		this.tag.readFields(arg0);
		String dataClz = arg0.readUTF();
		/**
		 * 根據序列化時傳入的型別進行反序列化
		 */
        if (this.data == null
                || !this.data.getClass().getName().equals(dataClz)) {
            try {
				this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);
			} catch (ClassNotFoundException e) {
				e.printStackTrace();
			}
        }
		this.data.readFields(arg0);
	}
	public void write(DataOutput arg1) throws IOException {
		this.tag.write(arg1);
		/**
		 * 寫入類名,反序列化時可以用到
		 */
		arg1.writeUTF(this.data.getClass().getName());
		this.data.write(arg1);
	}
	@Override
	public Writable getData() {
		return data;
	}
}
下面就來編寫Map端程式了:
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;

public class JoinMapper extends DataJoinMapperBase {
	@Override
	protected Text generateInputTag(String inputFile) {
		/**
		 * 生成對應於該Map的Tag
		 */
		String tagTmp = inputFile.substring(inputFile.lastIndexOf("/") + 1);
		return new Text(tagTmp); 
	}
	@Override
	protected TaggedMapOutput generateTaggedMapOutput(Object value) {
		TaggedWritable retv = new TaggedWritable((Text) value);
		/**
		 * 來自父類DataJoinMapperBase的變數,在config()方法中根據檔名初始化
		 */
        retv.setTag(this.inputTag);  
        return retv;
	}
	@Override
	protected Text generateGroupKey(TaggedMapOutput aRecord) {
		/**
		 * 生成分組的鍵,如果是多個檔案,但對應的列不同,則在這裡根據inputTag來進行
		 * 判斷和控制
		 */
		String line = ((Text) aRecord.getData()).toString();  
        String[] tokens = line.split(","); 
        String groupKey = null;
        if(this.inputTag.toString().equals("12")){
        	groupKey = tokens[2];
        } else if (this.inputTag.toString().equals("122")){
        	groupKey = tokens[0];
		}
        return new Text(groupKey); 
	}
}
下面實現reduce端的程式碼:這裡不過多的介紹DataJoinReducerBase的具體執行過程,下一篇部落格會單獨的分析這個包的整個執行過程。
這裡的程式碼編寫方式是,直接繼承DataJoinReducerBase並實現combine()方法就行。
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
public class JoinReducer extends DataJoinReducerBase {
	/**
	 * combine方法用來篩選掉不需要的組合,獲得所需的聯結操作(內聯結,左聯結等)。並且
         * 將結果化為合適輸出格式(如:欄位排列,去重等)
	 */
	@Override
	protected TaggedMapOutput combine(Object[] tags, Object[] values) {
		/**
		 * 實現innerjoin的功能
		 */
	if (tags.length < 2) return null;    
        String joinedStr = "";   
        for (int i=0; i<values.length; i++) {  
            if (i > 0) joinedStr += ",";  
            TaggedWritable tw = (TaggedWritable) values[i];
            String line = ((Text) tw.getData()).toString();
            String[] tokens = line.split(",");
	    /**
             * 根據tag的不同,把不同檔案中的不同的欄位取出進和join操作
             * 12為使用者資訊檔名   122為手機資訊檔名
             */
            if(tw.getTag().equals("12")) {
            	joinedStr += tokens[1];
            } else {
            	joinedStr += tokens[1];
            }
        }  
        TaggedWritable retv = new TaggedWritable(new Text(joinedStr));  
        retv.setTag((Text) tags[0]);   
        return retv;  
	}
}
啟動程式:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();  
        JobConf job = new JobConf(conf, JobMain.class);  
        Path in = new Path(args[0]);  
        Path out = new Path(args[1]);  
        FileInputFormat.setInputPaths(job, in);
        /**
         * 設定多個資料夾下面的檔案進和JOIN操作
         */
        //FileInputFormat.setInputPaths(job, args[0]+ "," + args[1]);
        FileOutputFormat.setOutputPath(job, out);  
        job.setJobName("DataJoin");  
        job.setMapperClass(JoinMapper.class);  
        job.setReducerClass(JoinReducer.class);  
        job.setInputFormat(TextInputFormat.class);  
        job.setOutputFormat(TextOutputFormat.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(TaggedWritable.class);  
        job.set("mapred.textoutputformat.separator", ",");  
        JobClient.runJob(job);   
        return 0;
	}
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(
				new Configuration(),  
                new JobMain(),  
                args);  
		System.exit(res); 
	}
}
執行結果:

使用者資訊表

手機資訊表

執行命令:

 ./hadoop jar mr.jar com.seven.mapreduce.join.JobMain /input/eight /output/night00

執行結果:


總結:

這是hadoop包中自帶的join方式的使用,這是一個通用型的JOIN方法,如果熟練了可以快速的開發出JOIN功能,但在執行效率上還有可以提高的空間,下面一篇會說明《hadoop硬實戰》中的對這一個功能的優化的實現。