1. 程式人生 > >Hadoop—MapReduce練習(資料去重、資料排序、平均成績、倒排索引)

Hadoop—MapReduce練習(資料去重、資料排序、平均成績、倒排索引)

1.  wordcount程式

先以簡單的wordcount為例。

Mapper:

package cn.nuc.hadoop.mapreduce.wordcount;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//4個泛型中,前兩個是指定mapper輸入資料的型別,KEYIN是輸入的key的型別,VALUEIN是輸入的value的型別
//map 和 reduce 的資料輸入輸出都是以 key-value對的形式封裝的
//預設情況下,Map框架傳遞給我們的mapper的輸入資料中,key是要處理的文字中一行的起始偏移量(選用LongWritable),value是這一行的內容(VALUEIN選用Text)
//在wordcount中,經過mapper處理資料後,得到的是<單詞,1>這樣的結果,所以KEYOUT選用Text,VAULEOUT選用IntWritable
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

	// MapReduce框架每讀一行資料就呼叫一次map方法
	@Override
	protected void map(LongWritable k1, Text v1,
			Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// 將這一行的內容轉換成string型別
		String line = v1.toString();
		// 對這一行的文字按特定分隔符切分
		// String[] words = line.split(" ");
		String[] words = StringUtils.split(line, " ");
		// 遍歷這個單詞陣列,輸出為key-value形式 key:單詞 value : 1
		for (String word : words) {
			context.write(new Text(word), new IntWritable(1));
		}

	}

}

Reducer:
package cn.nuc.hadoop.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//經過mapper處理後的資料會被reducer拉取過來,所以reducer的KEYIN、VALUEIN和mapper的KEYOUT、VALUEOUT一致
//經過reducer處理後的資料格式為<單詞,頻數>,所以KEYOUT為Text,VALUEOUT為IntWritable
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

	// 當mapper框架將相同的key的資料處理完成後,reducer框架會將mapper框架輸出的資料<key,value>變成<key,values{}>。
	// 例如,在wordcount中會將mapper框架輸出的所有<hello,1>變為<hello,{1,1,1...}>,即這裡的<k2,v2s>,然後將<k2,v2s>作為reduce函式的輸入
	@Override
	protected void reduce(Text k2, Iterable<IntWritable> v2s,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		int count = 0;
		// 遍歷v2的list,進行累加求和
		for (IntWritable v2 : v2s) {
			count = v2.get();
		}
		// 輸出這一個單詞的統計結果
		context.write(k2, new IntWritable(count));
	}

}

驅動類:
package cn.nuc.hadoop.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 用來描述一個特定的作業 比如,該作業使用哪個類作為邏輯處理中的map,哪個作為reduce 還可以指定該作業要處理的資料所在的路徑
 * 還可以指定改作業輸出的結果放到哪個路徑
 * 
 * @author Oner.wv
 *
 */
public class WCRunner {
	public static void main(String[] args) throws ClassNotFoundException,
			InterruptedException, IOException {

		Configuration conf = new Configuration();
		Job wcJob = Job.getInstance(conf);

		// 設定job所在的類在哪個jar包
		wcJob.setJarByClass(WCRunner.class);

		// 指定job所用的mappe類和reducer類
		wcJob.setMapperClass(WCMapper.class);
		wcJob.setReducerClass(WCReducer.class);

		// 指定mapper輸出型別和reducer輸出型別
		// 由於在wordcount中mapper和reducer的輸出型別一致,
		// 所以使用setOutputKeyClass和setOutputValueClass方法可以同時設定mapper和reducer的輸出型別
		// 如果mapper和reducer的輸出型別不一致時,可以使用setMapOutputKeyClass和setMapOutputValueClass單獨設定mapper的輸出型別
		// wcJob.setMapOutputKeyClass(Text.class);
		// wcJob.setMapOutputValueClass(IntWritable.class);
		wcJob.setOutputKeyClass(Text.class);
		wcJob.setOutputValueClass(IntWritable.class);

		// 指定job處理的資料路徑
		FileInputFormat.setInputPaths(wcJob, new Path(
				"hdfs://master:9000/user/exe_mapreduce/wordcount/input"));
		// 指定job處理資料輸出結果的路徑
		FileOutputFormat.setOutputPath(wcJob, new Path(
				"hdfs://master:9000/user/exe_mapreduce/wordcount/output"));

		// 將job提交給叢集執行
		wcJob.waitForCompletion(true);
	}
}


2. 統計手機流量資訊

    從下面的資料中的得到每個手機號的上行流量、下行流量、總流量。

源資料:

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	視訊網站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	資訊保安	20	20	3156	2936	200
1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站點統計	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜尋引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站點統計	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	綜合門戶	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	綜合門戶	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜尋引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜尋引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985066 	13726238888	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200


資料格式為:


想要的到的資料格式為:

手機號	上行流量	下行流量	總流量
13726230503		2481	24681	27162
13826544101		264		0	264
13926435656  	132		1512	1644
...				...		...

2.1 引入和Hadoop序列化機制相關的mapreduce

     由於源資料中每一個手機號可能存在多條上網記錄,最後要得到的輸出格式是一個手機號的所有上行流量、下行流量和總流量。所以可以考慮利用MapReduce框架的特性,將每個手機號作為map的輸出key,該手機號上網資訊作為map的輸出value,經過shuffle,則在reduce端接收到一個<key, value-list>,其中,key手機號,value-list為該手機號所對應的一些上網資訊的集合。這裡有一個問題,由於map和reduce輸入輸出都為key-value鍵值對形式,所以必須將手機的上網資訊(上行流量、下行流量)封裝成一個Bean類,將這個類作為value。

    由於資料需要在不同的節點間進行網路傳輸,所以Bean類必須實現序列化和反序列化,Hadoop提供了一套序列化機制(實現Writable介面)

FlowBean:

package cn.nuc.hadoop.mapreduce.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable {

	private String phoneNB;
	private long up_flow;
	private long down_flow;
	private long sum_flow;

	// 在反序列化時,反射機制需要呼叫空參建構函式,所以顯示定義了一個空參建構函式
	public FlowBean() {
	}

	// 為了物件資料的初始化方便,加入一個帶參的建構函式
	public FlowBean(String phoneNB, long up_flow, long down_flow) {
		this.phoneNB = phoneNB;
		this.up_flow = up_flow;
		this.down_flow = down_flow;
		this.sum_flow = up_flow + down_flow;
	}

	// 將物件的資料序列化到流中
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phoneNB);
		out.writeLong(up_flow);
		out.writeLong(down_flow);
		out.writeLong(sum_flow);
	}

	// 從流中反序列化出物件的資料
	// 從資料流中讀出物件欄位時,必須跟序列化時的順序保持一致
	@Override
	public void readFields(DataInput in) throws IOException {
		this.phoneNB = in.readUTF();
		this.up_flow = in.readLong();
		this.down_flow = in.readLong();
		this.sum_flow = in.readLong();
	}

	public String getPhoneNB() {
		return phoneNB;
	}

	public void setPhoneNB(String phoneNB) {
		this.phoneNB = phoneNB;
	}

	public long getUp_flow() {
		return up_flow;
	}

	public void setUp_flow(long up_flow) {
		this.up_flow = up_flow;
	}

	public long getDown_flow() {
		return down_flow;
	}

	public void setDown_flow(long down_flow) {
		this.down_flow = down_flow;
	}

	public long getSum_flow() {
		return sum_flow;
	}

	public void setSum_flow(long sum_flow) {
		this.sum_flow = sum_flow;
	}

	@Override
	public String toString() {
		return "" + up_flow + "\t" + down_flow + "\t" + sum_flow;
	}
}


FlowSumMapper:

package cn.nuc.hadoop.mapreduce.flowsum;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
	
	@Override
	protected void map(LongWritable k1, Text v1,
			Mapper<LongWritable, Text, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		// 一行資料
		String line = v1.toString();
		// 切分資料
		String[] fields = StringUtils.split(line, "\t");
		// 得到想要的手機號、上行流量、下行流量
		String phoneNB = fields[1];
		long up_flow = Long.parseLong(fields[7]);
		long down_flow = Long.parseLong(fields[8]);
		// 封裝資料為kv並輸出
		context.write(new Text(phoneNB), new FlowBean(phoneNB, up_flow,
				down_flow));

	}
}


FlowSumReducer:

package cn.nuc.hadoop.mapreduce.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

	// 框架每傳遞一組資料<1387788654,{flowbean,flowbean,flowbean,flowbean.....}>呼叫一次我們的reduce方法
	// reduce中的業務邏輯就是遍歷values,然後進行累加求和再輸出
	@Override
	protected void reduce(Text k2, Iterable<FlowBean> v2s,
			Reducer<Text, FlowBean, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		long up_flow = 0;
		long down_flow = 0;

		for (FlowBean v2 : v2s) {
			up_flow += v2.getUp_flow();
			down_flow += v2.getDown_flow();
		}

		context.write(k2, new FlowBean(k2.toString(), up_flow, down_flow));
	}
}


FlowSumRunner:

Job描述和提交的規範寫法如下:

package cn.nuc.hadoop.mapreduce.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//這是job描述和提交類的規範寫法
public class FlowSumRunner extends Configured implements Tool {

	public static void main(String[] args) throws Exception {
		int res = ToolRunner
				.run(new Configuration(), new FlowSumRunner(), args);
		System.exit(res);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(FlowSumRunner.class);

		job.setMapperClass(FlowSumMapper.class);
		job.setReducerClass(FlowSumReducer.class);

		// job.setMapOutputKeyClass(Text.class);
		// job.setMapOutputValueClass(FlowBean.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 執行成功,返回0,否則返回1
		return job.waitForCompletion(true) ? 0 : 1;
	}
}


    打成jar包後執行:

[[email protected] ~]$ hadoop jar flowcount.jar cn.nuc.hadoop.mapreduce.flowsum.FlowSumRunner /user/exe_mapreduce/flowcount/input /user/exe_mapreduce/flowcount/output

    檢視結果:
[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/flowcount/output/part-r-00000
13480253104	180	200	380
13502468823	102	7335	7437
13560436666	954	200	1154
13560439658	5892	400	6292
13602846565	12	1938	1950
13660577991	9	6960	6969
13719199419	0	200	200
13726230503	2481	24681	27162
13726238888	2481	24681	27162
13760778710	120	200	320
13826544101	0	200	200
13922314466	3008	3720	6728
13925057413	63	11058	11121
13926251106	0	200	200
13926435656	1512	200	1712
15013685858	27	3659	3686
15920133257	20	3156	3176
15989002119	3	1938	1941
18211575961	12	1527	1539
18320173382	18	9531	9549
84138413	4116	1432	5548

2.2  引入hadoop自定義排序

    從上面得到的結果可以看出來,hadoop預設將結果按照mapper的輸出按照key來進行排序,如果我們想要自定義排序結果(比如按照總流量從高到低排序),該如何做呢?瞭解shuffle的都知道,shuffle過程中,會將map的輸出結果按照key進行排序,所以只需要將FlowBean作為map輸出的key值,前提是FlowBean實現了Comparable介面。在hadoop中既實現Writable介面,又實現Comparable介面,可以簡寫為實現了WritableComparable介面。

FlowBean:

package cn.nuc.hadoop.mapreduce.flowsort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {

	private String phoneNB;
	private long up_flow;
	private long down_flow;
	private long sum_flow;

	// 在反序列化時,反射機制需要呼叫空參建構函式,所以顯示定義了一個空參建構函式
	public FlowBean() {
	}

	// 為了物件資料的初始化方便,加入一個帶參的建構函式
	public FlowBean(String phoneNB, long up_flow, long down_flow) {
		this.phoneNB = phoneNB;
		this.up_flow = up_flow;
		this.down_flow = down_flow;
		this.sum_flow = up_flow + down_flow;
	}

	// 將物件的資料序列化到流中
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phoneNB);
		out.writeLong(up_flow);
		out.writeLong(down_flow);
		out.writeLong(sum_flow);
	}

	// 從流中反序列化出物件的資料
	// 從資料流中讀出物件欄位時,必須跟序列化時的順序保持一致
	@Override
	public void readFields(DataInput in) throws IOException {
		this.phoneNB = in.readUTF();
		this.up_flow = in.readLong();
		this.down_flow = in.readLong();
		this.sum_flow = in.readLong();
	}

	public String getPhoneNB() {
		return phoneNB;
	}

	public void setPhoneNB(String phoneNB) {
		this.phoneNB = phoneNB;
	}

	public long getUp_flow() {
		return up_flow;
	}

	public void setUp_flow(long up_flow) {
		this.up_flow = up_flow;
	}

	public long getDown_flow() {
		return down_flow;
	}

	public void setDown_flow(long down_flow) {
		this.down_flow = down_flow;
	}

	public long getSum_flow() {
		return sum_flow;
	}

	public void setSum_flow(long sum_flow) {
		this.sum_flow = sum_flow;
	}

	@Override
	public String toString() {
		return "" + up_flow + "\t" + down_flow + "\t" + sum_flow;
	}

	// 實現Comparable介面,需要複寫compareTo方法
	@Override
	public int compareTo(FlowBean o) {
		return this.sum_flow > o.sum_flow ? -1 : 1;
	}
}


SortMapReduce:

package cn.nuc.hadoop.mapreduce.flowsort;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortMapReduce {

	public static class SortMapper extends
			Mapper<LongWritable, Text, FlowBean, NullWritable> {
		@Override
		protected void map(
				LongWritable k1,
				Text v1,
				Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			
			String line = v1.toString();
			String[] fields = StringUtils.split(line, "\t");

			String phoneNB = fields[0];
			long up_flow = Long.parseLong(fields[1]);
			long down_flow = Long.parseLong(fields[2]);

			context.write(new FlowBean(phoneNB, up_flow, down_flow),
					NullWritable.get());
		}
	}

	public static class SortReducer extends
			Reducer<FlowBean, NullWritable, Text, FlowBean> {
		@Override
		protected void reduce(FlowBean k2, Iterable<NullWritable> v2s,
				Reducer<FlowBean, NullWritable, Text, FlowBean>.Context context)
				throws IOException, InterruptedException {
			String phoneNB = k2.getPhoneNB();
			context.write(new Text(phoneNB), k2);
		}
	}

	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(SortMapReduce.class);

		job.setMapperClass(SortMapper.class);
		job.setReducerClass(SortReducer.class);

		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(NullWritable.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}


    打成jar包,執行:

[[email protected] ~]$ hadoop jar flowcountsort.jar cn.nuc.hadoop.mapreduce.flowsort.SortMapReduce /user/exe_mapreduce/flowcount/output /user/exe_mapreduce/flowcount/sortout/

    檢視結果:
[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/flowcount/sortout/part-r-00000
13726238888	2481	24681	27162
13726230503	2481	24681	27162
13925057413	63	11058	11121
18320173382	18	9531	9549
13502468823	102	7335	7437
13660577991	9	6960	6969
13922314466	3008	3720	6728
13560439658	5892	400	6292
84138413	4116	1432	5548
15013685858	27	3659	3686
15920133257	20	3156	3176
13602846565	12	1938	1950
15989002119	3	1938	1941
13926435656	1512	200	1712
18211575961	12	1527	1539
13560436666	954	200	1154
13480253104	180	200	380
13760778710	120	200	320
13826544101	0	200	200
13926251106	0	200	200
13719199419	0	200	200

2.3 引入Hadoop分割槽功能

     如果資訊特別多,想要將最後的結果分別存放在不通過的檔案中,該怎麼辦呢?可以使用Hadoop提供的Partitioner函式,hadoop預設使用HashPartitioner。可以檢視下Hadoop原始碼:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

    HashPartitioner是處理Mapper任務輸出的,getPartition()方法有三個形參,key、value分別指的是Mapper任務的輸出,numReduceTasks指的是設定的Reducer任務數量,預設值是1。那麼任何整數與1相除的餘數肯定是0。也就是說getPartition(…)方法的返回值總是0。也就是Mapper任務的輸出總是送給一個Reducer任務,最終只能輸出到一個檔案中。據此分析,如果想要最終輸出到多個檔案中,在Mapper任務中對資料應該劃分到多個區中。

AreaPartitioner

package cn.nuc.hadoop.mapreduce.areapartition;

import java.util.HashMap;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class AreaPartitioner extends Partitioner<Text, FlowBean> {

	private static HashMap<String, Integer> areaMap = new HashMap<>();

	static {
		areaMap.put("135", 0);
		areaMap.put("136", 1);
		areaMap.put("137", 2);
		areaMap.put("138", 3);
		areaMap.put("139", 4);
	}

	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) {
		// 從key中拿到手機號,查詢手機歸屬地字典,不同的省份返回不同的組號
		Integer areCoder = areaMap.get(key.toString().substring(0, 3));
		if (areCoder == null) {
			areCoder = 5;
		}
		return areCoder;
	}

}


FlowBean

package cn.nuc.hadoop.mapreduce.areapartition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {

	private String phoneNB;
	private long up_flow;
	private long down_flow;
	private long sum_flow;

	// 在反序列化時,反射機制需要呼叫空參建構函式,所以顯示定義了一個空參建構函式
	public FlowBean() {
	}

	// 為了物件資料的初始化方便,加入一個帶參的建構函式
	public FlowBean(String phoneNB, long up_flow, long down_flow) {
		this.phoneNB = phoneNB;
		this.up_flow = up_flow;
		this.down_flow = down_flow;
		this.sum_flow = up_flow + down_flow;
	}

	// 將物件的資料序列化到流中
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phoneNB);
		out.writeLong(up_flow);
		out.writeLong(down_flow);
		out.writeLong(sum_flow);
	}

	// 從流中反序列化出物件的資料
	// 從資料流中讀出物件欄位時,必須跟序列化時的順序保持一致
	@Override
	public void readFields(DataInput in) throws IOException {
		this.phoneNB = in.readUTF();
		this.up_flow = in.readLong();
		this.down_flow = in.readLong();
		this.sum_flow = in.readLong();
	}

	public String getPhoneNB() {
		return phoneNB;
	}

	public void setPhoneNB(String phoneNB) {
		this.phoneNB = phoneNB;
	}

	public long getUp_flow() {
		return up_flow;
	}

	public void setUp_flow(long up_flow) {
		this.up_flow = up_flow;
	}

	public long getDown_flow() {
		return down_flow;
	}

	public void setDown_flow(long down_flow) {
		this.down_flow = down_flow;
	}

	public long getSum_flow() {
		return sum_flow;
	}

	public void setSum_flow(long sum_flow) {
		this.sum_flow = sum_flow;
	}

	@Override
	public String toString() {
		return "" + up_flow + "\t" + down_flow + "\t" + sum_flow;
	}

	// 實現Comparable介面,需要複寫compareTo方法
	@Override
	public int compareTo(FlowBean o) {
		return this.sum_flow > o.sum_flow ? -1 : 1;
	}
}

FlowSumArea

package cn.nuc.hadoop.mapreduce.areapartition;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 對流量原始日誌進行流量統計,將不同省份的使用者統計結果輸出到不同檔案 需要自定義改造兩個機制: 1、改造分割槽的邏輯,自定義一個partitioner
 * 2、自定義reduer task的併發任務數
 * 
 * @author [email protected]
 *
 */
public class FlowSumArea {

	public static class FlowSumAreaMapper extends
			Mapper<LongWritable, Text, Text, FlowBean> {

		@Override
		protected void map(LongWritable k1, Text v1,
				Mapper<LongWritable, Text, Text, FlowBean>.Context context)
				throws IOException, InterruptedException {
			String line = v1.toString();
			String[] fields = StringUtils.split(line, "\t");

			String phoneNB = fields[1];
			Long up_flow = Long.parseLong(fields[7]);
			Long down_flow = Long.parseLong(fields[8]);

			context.write(new Text(phoneNB), new FlowBean(phoneNB, up_flow,
					down_flow));
		}
	}

	public static class FlowSumAreaReducer extends
			Reducer<Text, FlowBean, Text, FlowBean> {

		@Override
		protected void reduce(Text k2, Iterable<FlowBean> v2s,
				Reducer<Text, FlowBean, Text, FlowBean>.Context context)
				throws IOException, InterruptedException {
			long up_flow = 0;
			long down_flow = 0;
			for (FlowBean v2 : v2s) {
				up_flow += v2.getUp_flow();
				down_flow += v2.getDown_flow();
			}
			context.write(new Text(k2), new FlowBean(k2.toString(), up_flow,
					down_flow));
		}
	}

	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(FlowSumArea.class);

		job.setMapperClass(FlowSumAreaMapper.class);
		job.setReducerClass(FlowSumAreaReducer.class);

		// 定義分組邏輯類
		job.setPartitionerClass(AreaPartitioner.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 設定reducer的任務併發數,應該跟分組的數量保持一致
		job.setNumReduceTasks(6);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

     打包執行:

[[email protected] ~]$ hadoop jar area.jar cn.nuc.hadoop.mapreduce.areapartition.FlowSumArea /user/exe_mapreduce/flowcount/input /user/exe_mapreduce/flowcount/areaout

     檢視結果:
[[email protected] ~]$ hadoop fs -ls /user/exe_mapreduce/flowcount/areaout/
Found 7 items
-rw-r--r--   3 hadoop supergroup          0 2016-02-07 19:28 /user/exe_mapreduce/flowcount/areaout/_SUCCESS
-rw-r--r--   3 hadoop supergroup         77 2016-02-07 19:28 /user/exe_mapreduce/flowcount/areaout/part-r-00000
-rw-r--r--   3 hadoop supergroup         49 2016-02-07 19:28 /user/exe_mapreduce/flowcount/areaout/part-r-00001
-rw-r--r--   3 hadoop supergroup        104 2016-02-07 19:28 /user/exe_mapreduce/flowcount/areaout/part-r-00002
-rw-r--r--   3 hadoop supergroup         22 2016-02-07 19:28 /user/exe_mapreduce/flowcount/areaout/part-r-00003
-rw-r--r--   3 hadoop supergroup        102 2016-02-07 19:28 /user/exe_mapreduce/flowcount/areaout/part-r-00004
-rw-r--r--   3 hadoop supergroup        172 2016-02-07 19:28 /user/exe_mapreduce/flowcount/areaout/part-r-00005

[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/flowcount/areaout/part-r-00000
13502468823	102	7335	7437
13560436666	954	200	1154
13560439658	5892	400	6292
[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/flowcount/areaout/part-r-00001
13602846565	12	1938	1950
13660577991	9	6960	6969
[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/flowcount/areaout/part-r-00002
13719199419	0	200	200
13726230503	2481	24681	27162
13726238888	2481	24681	27162
13760778710	120	200	320
[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/flowcount/areaout/part-r-00003
^[[A13826544101	0	200	200
[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/flowcount/areaout/part-r-00004
^[[A13922314466	3008	3720	6728
13925057413	63	11058	11121
13926251106	0	200	200
13926435656	1512	200	1712
[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/flowcount/areaout/part-r-00005
13480253104	180	200	380
15013685858	27	3659	3686
15920133257	20	3156	3176
15989002119	3	1938	1941
18211575961	12	1527	1539
18320173382	18	9531	9549
84138413	4116	1432	5548


3. 資料去重

“ 資料去重”主要是為了掌握和利用並行化思想來對資料進行有意義的篩選。 統計大 資料集上的資料種類個數、 從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及資料 去重。下面就進入這個例項的 MapReduce 程式設計。

3.1 例項描述

對資料檔案中的資料進行去重。資料檔案中的每行都是一個數據。

樣例輸入如下所示:

file1:

2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c

file2:

2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c

樣例輸出如下:

2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d


3.2 設計思路

資料去重的最終目標是讓原始資料中出現次數超過一次的資料在輸出檔案中只出現一次。我們自然而然會想到將同一個資料的所有記錄都交給一臺 reduce 機器,無論這個資料出現多少次,只要在最終結果中輸出一次就可以了。具體就是 reduce 的輸入應該以資料作為 key,而對 value-list 則沒有要求。當 reduce 接收到一個<key, value-list>時就直接將 key複製到輸出的 key 中,並將 value 設定成空值。

在 MapReduce 流程中, map 的輸出<key, value>經過 shuffle 過程聚整合<key, value-list>後會交給 reduce。所以從設計好的 reduce 輸入可以反推出 map 的輸出 key 應為資料, value任意。繼續反推, map 輸出資料的 key 為資料,而在這個例項中每個資料代表輸入檔案中的一行內容,所以 map 階段要完成的任務就是在採用 Hadoop 預設的作業輸入方式之後,將value 設定為 key,並直接輸出(輸出中的 value 任意)。 map 中的結果經過 shuffle 過程之後交給 reduce。 reduce 階段不會管每個 key 有多少個 value,它直接將輸入的 key 複製為輸出的 key,並輸出就可以了(輸出中的 value 被設定成空了)。

3.3 程式程式碼

DedupMapper:

package cn.nuc.hadoop.mapreduce.dedup;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

	private static Text field = new Text();

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {

		field = value;
		context.write(field, NullWritable.get());

	}
}


DedupReducer:

package cn.nuc.hadoop.mapreduce.dedup;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DedupReducer extends
		Reducer<Text, NullWritable, Text, NullWritable> {
	@Override
	protected void reduce(Text key, Iterable<NullWritable> values,
			Context context) throws IOException, InterruptedException {

		context.write(key, NullWritable.get());

	}
}


DedupRunner:

package cn.nuc.hadoop.mapreduce.dedup;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DedupRunner {
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(DedupRunner.class);

		job.setMapperClass(DedupMapper.class);
		job.setReducerClass(DedupReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}
}


打成jar包後執行:

[[email protected] ~]$  hadoop jar dedup.jar cn.nuc.hadoop.mapreduce.dedup.DedupRunner /user/exe_mapreduce/dedup/input /user/exe_mapreduce/dedup/out


檢視結果:

[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/dedup/output/part-r-00000
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d


4.  資料排序

“ 資料排序”是許多實際任務執行時要完成的第一項工作,比如學生成績評比、資料建立索引等。這個例項和資料去重類似,都是先對原始資料進行初步處理,為進一步的資料 操作打好基礎。下面進入這個示例。

4.1 例項描述

對輸入檔案中資料進行排序。 輸入檔案中的每行內容均為一個數字, 即一個數據。要求在輸出中每行有兩個間隔的數字,其中, 第一個代表原始資料在原始資料集中的位次, 第 二個代表原始資料。

樣例輸入:

file1:

2
32
654
32
15
756
65223

file2:

5956
22
650
92

file3:

26
54
6

樣例輸出:

1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223

4.2  設計思路

這個例項僅僅要求對輸入資料進行排序,熟悉 MapReduce 過程的讀者會很快想到在 MapReduce 過程中就有排序,是否可以利用這個預設的排序,而不需要自己再實現具體的 排序呢?答案是肯定的。

但是在使用之前首先需要了解它的預設排序規則。它是按照 key 值進行排序的,如果 key 為封裝 int 的 IntWritable 型別,那麼 MapReduce 按照數字大小對 key 排序,如果 key 為封裝為 String 的 Text 型別,那麼 MapReduce 按照字典順序對字串排序。

瞭解了這個細節,我們就知道應該使用封裝 int 的 IntWritable 型資料結構了。也就是在 map 中將讀入的資料轉化成 IntWritable 型,然後作為 key 值輸出( value 任意)。 reduce 拿到 <key, value-list>之後,將輸入的 key 作為 value 輸出,並根據 value-list 中元素的個數決定 輸出的次數。輸出的 key(即程式碼中的 linenum)是一個全域性變數,它統計當前 key 的位次。 需要注意的是這個程式中沒有配置 Combiner,也就是在 MapReduce 過程中不使用 Combiner。 這主要是因為使用 map 和 reduce 就已經能夠完成任務了。

4.3 程式程式碼

SortMapper:

package cn.nuc.hadoop.mapreduce.sort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//map將輸入中的 value化成 IntWritable型別,作為輸出的 key
public class SortMapper extends
		Mapper<LongWritable, Text, IntWritable, IntWritable> {

	private static IntWritable data = new IntWritable();
	private static final IntWritable one = new IntWritable(1);

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		data.set(Integer.parseInt(line));
		context.write(data, one);
	}
}

SortReducer:

package cn.nuc.hadoop.mapreduce.sort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

//reduce 將輸入中的 key 複製到輸出資料的 key 上,
//然後根據輸入的 value‐list 中元素的個數決定 key 的輸出次數
//用全域性linenumber來代表key的位次
public class SortReducer extends
		Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

	private static IntWritable linenumber = new IntWritable(1);

	@Override
	protected void reduce(IntWritable key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		for (IntWritable value : values) {
			context.write(linenumber, key);
			linenumber.set(linenumber.get() + 1);
			// linenumber=new IntWritable(linenumber.get()+1);
		}

	}
}

SotrRunner:

package cn.nuc.hadoop.mapreduce.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortRunner {
	public static void main(String[] args) throws IllegalArgumentException,
			IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(SortRunner.class);

		job.setMapperClass(SortMapper.class);
		job.setReducerClass(SortReducer.class);

		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}
}


打成jar包執行:

[[email protected] ~]$ hadoop jar sort.jar cn.nuc.hadoop.mapreduce.sort.SortRunner /user/exe_mapreduce/sort/input /user/exe_mapreduce/sort/output

檢視結果:

[[email protected] input]$ hadoop fs -cat /user/exe_mapreduce/sort/output/part-r-00000
1	2
2	6
3	15
4	22
5	26
6	32
7	32
8	54
9	92
10	650
11	654
12	756
13	5956
14	65223

5 平均成績

“平均成績”主要目的還是在重溫經典“ WordCount”例子,可以說是在基礎上的微變 化版,該例項主要就是實現一個計算學生平均成績的例子。

5.1 例項描述

對輸入檔案中資料進行就算學生平均成績。輸入檔案中的每行內容均為一個學生的姓名 和他相應的成績,如果有多門學科,則每門學科為一個檔案。要求在輸出中每行有兩個間隔 的資料,其中, 第一個代表學生的姓名, 第二個代表其平均成績。

樣本輸入:

math:

張三 88
李四 99
王五 66
趙六 77

china:

張三 78
李四 89
王五 96
趙六 67

english:

張三 80
李四 82
王五 84
趙六 86

樣本輸出:

張三 82
李四 90
王五 82
趙六 76


5.2 設計思路

計算學生平均成績是一個仿“ WordCount”例子,用來重溫一下開發 MapReduce 程式的 流程。程式包括兩部分的內容: Map 部分和 Reduce 部分,分別實現了 map 和 reduce 的功能。

Map 處理的是一個純文字檔案,檔案中存放的資料時每一行表示一個學生的姓名和他 相應一科成績。 Mapper 處理的資料是由 InputFormat 分解過的資料集,其中 InputFormat 的 作用是將資料集切割成小資料集 InputSplit,每一個 InputSlit 將由一個 Mapper 負責處理。此 外,InputFormat 中還提供了一個 RecordReader 的實現,並將一個 InputSplit 解析成<key,value>對提供給了 map 函式。 InputFormat 的預設值是 TextInputFormat,它針對文字檔案,按行將 文字切割成 InputSlit,並用 LineRecordReader 將 InputSplit 解析成<key,value>對, key 是行在 文字中的位置, value 是檔案中的一行。

Map 的結果會通過 partion 分發到 Reducer, Reducer 做完 Reduce 操作後,將通過以格 式 OutputFormat 輸出。

Mapper 最終處理的結果對<key,value>,會送到 Reducer 中進行合併,合併的時候,有 相同 key 的鍵/值對則送到同一個 Reducer 上。 Reducer 是所有使用者定製 Reducer 類地基礎, 它的輸入是 key 和這個 key 對應的所有 value 的一個迭代器,同時還有 Reducer 的上下文。 Reduce 的結果由 Reducer.Context 的 write 方法輸出到檔案中。

5.3 程式程式碼

ScoreMapper:

package cn.nuc.hadoop.mapreduce.score;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ScoreMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

	private static Text name = new Text();
	private static IntWritable score = new IntWritable();

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String[] fields = StringUtils.split(line, " ");
		String strName = fields[0];//學生姓名
		int strScore = Integer.parseInt(fields[1]);//學生單科成績

		name.set(strName);
		score.set(strScore);
		context.write(name, score);
	}
}

ScoreReducer:

package cn.nuc.hadoop.mapreduce.score;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ScoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

	private static IntWritable avg_score = new IntWritable();

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		int sum_score = 0;//統計總成績
		int count=0;//統計總的科目數
		for (IntWritable score : values) {
			count++;
			sum_score += score.get();		
		}

		avg_score.set(sum_score / count);
		context.write(key, avg_score);
	}
}


ScoreRunner:

package cn.nuc.hadoop.mapreduce.score;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ScoreRunner {
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(ScoreRunner.class);

		job.setMapperClass(ScoreMapper.class);
		job.setReducerClass(ScoreReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}
}


打成jar包執行:

[[email protected] ~]$ hadoop jar score.jar cn.nuc.hadoop.mapreduce.score.ScoreRunner /user/exe_mapreduce/score/input /user/exe_mapreduce/score/output

檢視結果:

[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/score/output/part-r-00000
張三	82
李四	90
王五	82
趙六	76

6 倒排索引

“ 倒排索引”是文件檢索系統中最常用的資料結構,被廣泛地應用於全文搜尋引擎。 它主要是用來儲存某個單詞(或片語) 在一個文件或一組文件中的儲存位置的對映,即提 供了一種根據內容來查詢文件的方式。由於不是根據文件來確定文件所包含的內容,而是進 行相反的操作,因而稱為倒排索引( Inverted Index)。

6.1 例項描述

通常情況下,倒排索引由一個單詞(或片語)以及相關的文件列表組成,文件列表中的 文件或者是標識文件的 ID 號,或者是指文件所在位置的 URL,如圖 6.1-1 所示。


從圖 6.1-1 可以看出,單詞 1 出現在{文件 1,文件 4,文件 13, ……}中,單詞 2 出現 在{文件 3,文件 5,文件 15, ……}中,而單詞 3 出現在{文件 1,文件 8,文件 20, ……} 中。在實際應用中, 還需要給每個文件新增一個權值,用來指出每個文件與搜尋內容的相 關度,如圖 6.1-2 所示。


最常用的是使用詞頻作為權重,即記錄單詞在文件中出現的次數。以英文為例,如圖 6.1-3 所示,索引檔案中的“ MapReduce”一行表示:“ MapReduce”這個單詞在文字 T0 中 出現過 1 次,T1 中出現過 1 次,T2 中出現過 2 次。當搜尋條件為“ MapReduce”、“ is”、“ Simple” 時,對應的集合為: {T0, T1, T2}∩{T0, T1}∩{T0, T1}={T0, T1},即文件 T0 和 T1 包 含了所要索引的單詞,而且只有 T0 是連續的。


更復雜的權重還可能要記錄單詞在多少個文件中出現過,以實現 TF-IDF( Term Frequency-Inverse Document Frequency)演算法,或者考慮單詞在文件中的位置資訊(單詞是 否出現在標題中,反映了單詞在文件中的重要性)等。

樣例輸入如下所示。

file1:

MapReduce is simple

file2:

MapReduce is powerful is simple

file3:

Hello MapReduce bye MapReduce

樣例輸出如下所示:

MapReduce file1.txt:1;file2.txt:1;file3.txt:2;
is file1.txt:1;file2.txt:2;
simple file1.txt:1;file2.txt:1;
powerful file2.txt:1;
Hello file3.txt:1;
bye file3.txt:1;

6.2 設計思路

實現“ 倒排索引”只要關注的資訊為: 單詞、 文件 URL 及詞頻,如圖 3-11 所示。但是 在實現過程中,索引檔案的格式與圖 6.1-3 會略有所不同,以避免重寫 OutPutFormat 類。下 面根據 MapReduce 的處理過程給出倒排索引的設計思路。

1)Map過程

首先使用預設的 TextInputFormat 類對輸入檔案進行處理,得到文字中每行的偏移量及 其內容。顯然, Map 過程首先必須分析輸入的<key,value>對,得到倒排索引中需要的三個信 息:單詞、文件 URL 和詞頻,如圖 6.2-1 所示。


這裡存在兩個問題: 第一, <key,value>對只能有兩個值,在不使用 Hadoop 自定義資料 型別的情況下,需要根據情況將其中兩個值合併成一個值,作為 key 或 value 值; 第二,通 過一個 Reduce 過程無法同時完成詞頻統計和生成文件列表,所以必須增加一個 Combine 過程完成詞頻統計。

這裡講單詞和 URL 組成 key 值(如“ MapReduce: file1.txt”),將詞頻作為 value,這樣 做的好處是可以利用 MapReduce 框架自帶的 Map 端排序,將同一文件的相同單詞的詞頻組 成列表,傳遞給 Combine 過程,實現類似於 WordCount 的功能。

2)Combine過程

經過 map 方法處理後, Combine 過程將 key 值相同的 value 值累加,得到一個單詞在文 檔在文件中的詞頻,如圖 6.2-2 所示。 如果直接將圖 6.2-2 所示的輸出作為 Reduce 過程的輸 入,在 Shuffle 過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、 URL 和詞頻組 成) 應該交由同一個 Reducer 處理,但當前的 key 值無法保證這一點,所以必須修改 key 值 和 value 值。這次將單詞作為 key 值, URL 和詞頻組成 value 值(如“ file1.txt: 1”)。這樣 做的好處是可以利用 MapReduce 框架預設的 HashPartitioner 類完成 Shuffle 過程,將相同單 詞的所有記錄傳送給同一個 Reducer 進行處理。


3)Reduce過程

經過上述兩個過程後, Reduce 過程只需將相同 key 值的 value 值組合成倒排索引檔案所 需的格式即可,剩下的事情就可以直接交給 MapReduce 框架進行處理了。如圖 6.2-3 所示。 索引檔案的內容除分隔符外與圖 6.1-3 解釋相同。

4)需要解決的問題

本例項設計的倒排索引在檔案數目上沒有限制,但是單詞檔案不宜過大(具體值與默 認 HDFS 塊大小及相關配置有關),要保證每個檔案對應一個 split。否則,由於 Reduce 過 程沒有進一步統計詞頻,最終結果可能會出現詞頻未統計完全的單詞。可以通過重寫 InputFormat 類將每個檔案為一個 split,避免上述情況。或者執行兩次 MapReduce, 第一次 MapReduce 用於統計詞頻, 第二次 MapReduce 用於生成倒排索引。除此之外,還可以利用 複合鍵值對等實現包含更多資訊的倒排索引。


6.3 程式程式碼

InvertedIndexMapper:

package cn.nuc.hadoop.mapreduce.invertedindex;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {

	private static Text keyInfo = new Text();// 儲存單詞和 URL 組合
	private static final Text valueInfo = new Text("1");// 儲存詞頻,初始化為1

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {

		String line = value.toString();
		String[] fields = StringUtils.split(line, " ");// 得到欄位陣列

		FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到這行資料所在的檔案切片
		String fileName = fileSplit.getPath().getName();// 根據檔案切片得到檔名

		for (String field : fields) {
			// key值由單詞和URL組成,如“MapReduce:file1”
			keyInfo.set(field + ":" + fileName);
			context.write(keyInfo, valueInfo);
		}
	}
}

InvertedIndexCombiner:

package cn.nuc.hadoop.mapreduce.invertedindex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {

	private static Text info = new Text();

	// 輸入: <MapReduce:file3 {1,1,...}>
	// 輸出:<MapReduce file3:2>
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		int sum = 0;// 統計詞頻
		for (Text value : values) {
			sum += Integer.parseInt(value.toString());
		}

		int splitIndex = key.toString().indexOf(":");
		// 重新設定 value 值由 URL 和詞頻組成
		info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
		// 重新設定 key 值為單詞
		key.set(key.toString().substring(0, splitIndex));
		
		context.write(key, info);
	}
}

InvertedIndexReducer:

package cn.nuc.hadoop.mapreduce.invertedindex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {

	private static Text result = new Text();

	// 輸入:<MapReduce file3:2>
	// 輸出:<MapReduce file1:1;file2:1;file3:2;>
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		// 生成文件列表
		String fileList = new String();
		for (Text value : values) {
			fileList += value.toString() + ";";
		}

		result.set(fileList);
		context.write(key, result);
	}
}

InvertedIndexRunner:

package cn.nuc.hadoop.mapreduce.invertedindex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndexRunner {
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(InvertedIndexRunner.class);

		job.setMapperClass(InvertedIndexMapper.class);
		job.setCombinerClass(InvertedIndexCombiner.class);
		job.setReducerClass(InvertedIndexReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		// 檢查引數所指定的輸出路徑是否存在,若存在,先刪除
		Path output = new Path(args[1]);
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(output)) {
			fs.delete(output, true);
		}
		FileOutputFormat.setOutputPath(job, output);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

打成jar包並執行:

hadoop jar invertedindex.jar cn.nuc.hadoop.mapreduce.invertedindex.InvertedIndexRunner /user/exe_mapreduce/invertedindex/input /user/exe_mapreduce/invertedindex/output

檢視結果:

[[email protected] ~]$ hadoop fs -cat /user/exe_mapreduce/invertedindex/output/part-r-00000
Hello	file3:1;
MapReduce	file3:2;file1:1;file2:1;
bye	file3:1;
is	file1:1;file2:2;
powerful	file2:1;
simple	file2:1;file1:1;

相關推薦

HadoopMapReduce練習資料資料排序平均成績索引

1.  wordcount程式 先以簡單的wordcount為例。 Mapper: package cn.nuc.hadoop.mapreduce.wordcount; import java.io.IOException; import org.apache.com

MapReduce處理資料資料排序

一:MapReduce處理資料去重 Map的key具有資料去重的功能 /* * 去除資料中相同資料 * 資料去重問題 * 以整個資料作為key傳送出去, value為null */ public class DelsameMap extends Mapper<

使用HadoopMapReduce來實現資料

最近在系統學習大資料知識,學了沒有記錄過幾天又忘光了,所以把學習內容記錄下來,方便以後檢視  import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.

Hadoop MapReduce開發--資料

環境 hadoop-2.9.1 windows7 idea15 示例資料 file1.txt和file2.txt檔案儲存在路徑:C:\bigdata\example_data\mr_example\exp_02\ file1.txt 2012-3-1 a 2012

Hadoop閱讀筆記——利用MapReduce求平均數和

前言: 聖誕節來了,我怎麼能虛度光陰呢?!依稀記得,那一年,大家互贈賀卡,短短几行字,字字融化在心裡;那一年,大家在水果市場,尋找那些最能代表自己心意的蘋果香蕉梨,摸著冰冷的水果外皮,內心早已滾燙。這一年……我在部落格園-_-#,希望用dt的程式碼燃燒腦細胞,溫暖小心窩。 上篇 《Hadoop閱讀筆記(

資料_ShuffleMapReduce程式設計案例(資料多表查詢索引使用單元測試)

一、什麼是Shuffle(洗牌) ----> MapReduce核心 1、序列化 2、排序 3、分割槽 4、合併 二、MapReduce程式設計案例 ------> 掌握方法:如何開發一個程式 1、資料

BloomFilter資料+Redis持久化策略

之前在重構一套文章爬蟲系統時,其中有塊邏輯是根據文章標題去重,原先去重的方式是,插入文章之前檢查待插入文章的標題是否在ElasticSearch中存在,這無疑加重了ElasticSearch的負擔也勢必會影響程式的效能! BloomFilter演算法 簡介:布隆過濾器實際上

海量資料上億資料

   在資料開發中,我們不難遇到重複資料的問題,搞過這類資料開發的同志肯定覺得,重複資料是真的煩人,特別是當資料量十分大的時候,如果我們用空間複雜度去換時間複雜度,會十分耗內容,稍不注意,就會記憶體溢位,那麼針對如此龐大的資料量我們一般能怎麼解決呢?下面分享幾個方案: 方案一

表中重複資料只保留一份id較小的

查詢店員表w_other_empl中身份證號ss_id重複的數量 select t.ss_id,count(t.ss_id) from w_other_empl t group by ss_id having count(t.ss_id)>1 order by ss_id;

js實現常見的幾種演算法陣列字元統計二分查詢等

1、陣列去重:利用物件屬性進行篩選 function filter(arr){ let obj={}; let newArr=[]; for (let i=0;i<arr.

pythonpandas檔案合併資料

目錄下有如圖60個txt檔案,每個txt檔案裡的資料大概有7000萬行目的:把每個txt檔案裡的資料去重後合併60個檔案為一個總檔案,然後把總檔案裡的資料按第一列、第二列分組第三列求去重後出現的次數每個檔案的內容如下:程式碼如下:# -*- coding:utf-8 -*-

MapReduce案例3——求簡單資料

資料去重源資料: 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7 c 2012-3-3 c 2012-3-1 b 2012-3-2 a 2012-3-3 b 2012-3-4

MapReduce例項】資料

一、例項描述 資料去重是利用並行化思想來對資料進行有意義的篩選。統計大資料集上的資料種類個數、從網站日誌中計算訪問等這些看似龐大的任務都會涉及資料去重。 比如,輸入檔案 file1.txt,其內容如下: 2017-12-9 a 2017-12-10 b

資料資料治理|Intellij IDEA提交遠端Hadoop MapReduce任務第八篇

1.新建IntelliJ下空的的maven專案 直接next即可。 2.配置依賴 編輯pom.xml檔案,新增apache源和hadoop依賴 基礎依賴hadoop-core和hadoop-common; 讀寫HDFS,需要依賴hadoop-hdfs和hadoop-client

Hadoop鏈式MapReduce多維排序索引自連線演算法二次排序Join效能優化處理員工資訊Join實戰URL流量分析TopN及其排序求平均值和最大最小值資料清洗ETL分析氣

Hadoop Mapreduce 演算法彙總  第52課:Hadoop鏈式MapReduce程式設計實戰...1 第51課:Hadoop MapReduce多維排序解析與實戰...2 第50課:HadoopMapReduce倒排索引解析與實戰...3 第49課:Hado

Elasticsearch資料百萬級別

如果你對去重結果的精準度沒有特殊要求,使用cardinality聚合函式         AggregationBuilders.cardinality("deviceCount").field("deviceID").precisionThreshold(自定義一個精度範

sql 資料並且保留一條在一定的時間範圍隨機獲取時間

-- 將lmt作為唯一標識確保唯一 update SWS_ST_SPB_P set lmt = t.lmt from SWS_ST_SPB_P s, ( SELECT stcd, mpcd, tm,

Hadoop Mapreduce 連線Join之一:分割槽連線Repartition join

4.1 連線(Join) 連線是關係運算,可以用於合併關係(relation)。對於資料庫中的表連線操作,你可能已經比較熟悉了。在MapReduce中,連線可以用於合併兩個或多個數據集。例如,使用者基本資訊和使用者活動詳情。使用者基本資訊來自於OLTP資料庫。使用者活動

資料data deduplication方案

資料去重(data deduplication)是大資料領域司空見慣的問題了。除了統計UV等傳統用法之外,去重的意義更在於消除不可靠資料來源產生的髒資料——即重複上報資料或重複投遞資料的影響,使計算產生的結果更加準確。 介紹下經常使用的去重方案: 一、布隆過濾器(BloomFilter

【C++】判斷元素是否在vector中,對vector,兩個vector求交集並集

bool iostream space col 求交集 uniq AI void print #include <iostream> #include <vector> #include <algorithm> //sort函數、交並補