1. 程式人生 > >自定義MapReduce業務邏輯

自定義MapReduce業務邏輯

1.我們剛一開始的時候,在HDFS上面處理檔案時候,我們並沒有自己寫MapReduce,而是用的是映象架包下面的/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar,同樣的也將執行出來結果(hadoop jar hadoop-mapreduce-examples-2.6.4.jar wordcount 檔案的在Linux上的源路徑  檔案處理後結果存放在HDFS上的路徑)

例如:hadoop jar hadoop-mapreduce-examples-2.6.4.jar wordcount /root/wordcount.txt

/wordcount/input

這其實是hadoop架包裡面自帶的一個Mapreduce的demo例項,紅色是我們要執行的主方法名,通常我們在例項中需要寫絕對路徑;黃色是我們將要處理的檔案在Linux上的絕對路徑;綠色是我們處理後文件,得到的結果儲存的位置(指的是儲存在HDFS上的絕對路徑,並且這個路徑在HDFS上不能存在,當執行命令的時候回自動建立,如果存在程式會包錯誤)

2.往往在實際開發中我們需要自己寫MapReduce的業務邏輯,hadoop架包裡的已經不能滿足我們的多樣化需求了,下面就介紹如何去customMapReduce

我們在建立專案時候可以選擇maven也可以選擇javaProject,maven比較簡單直接從aliyun倉庫線上下載架包即可,Javaproject則需要我們機子手動搭建所需架包,

  1. 首先將你的cenos-6.5-hadoop-2.6.4架包解壓,
  2. 進入\cenos-6.5-hadoop-2.6.4\hadoop-2.6.4\share\hadoop;將common下面lib下的+sources下的+hadoop-common-2.6.4+hdfs下面的lib下面的+sources下面的+hadoop-hdfs-2.6.4匯入buildpath下面
  3. \cenos-6.5-hadoop-2.6.4\hadoop-2.6.4\share\hadoop\mapreduce\下面的全部架包匯入buildpath下面
  4. 最後apply一下就可以

3.例項:統計每一個使用者的使用總流量

此案例我們以物件的形式在網路間傳輸,所以要想在網路間傳輸,我們就要對物件進行序列化,正好hadoop內部封裝有序列化介面,我們只需要實現這個介面即可WritableComparable

public class TelBean implements WritableComparable<TelBean> {

	private String tel;使用者電話號碼
	private Long upPayLoad;上行流量
	private Long downPayLoad;下行流量
	private Long totalPayLoad;總流量=上行+下行

	public String getTel() {
		return tel;
	}

	public void setTel(String tel) {
		this.tel = tel;
	}

	public Long getUpPayLoad() {
		return upPayLoad;
	}

	public void setUpPayLoad(Long upPayLoad) {
		this.upPayLoad = upPayLoad;
	}

	public Long getDownPayLoad() {
		return downPayLoad;
	}

	public void setDownPayLoad(Long downPayLoad) {
		this.downPayLoad = downPayLoad;
	}

	public Long getTotalPayLoad() {
		return totalPayLoad;
	}

	public void setTotalPayLoad(Long totalPayLoad) {
		this.totalPayLoad = totalPayLoad;
	}


//序列化
	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		this.tel = in.readUTF();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
		this.totalPayLoad = in.readLong();
	}
//反序列化
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeUTF(tel);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
		out.writeLong(totalPayLoad);
	}
//物件之間的比較,排序
	@Override
	public int compareTo(TelBean bean) {
		// TODO Auto-generated method stub
		if (this.getTotalPayLoad() > bean.getTotalPayLoad()) {
			return -1;
		} else if (this.getTotalPayLoad() < bean.getTotalPayLoad()) {
			return 1;
		} else {
			if (this.getDownPayLoad() > bean.getDownPayLoad()) {
				return -1;
			} else {
				return 1;
			}
		}
	}

	@Override
	public String toString() {
		return tel + "\t" + upPayLoad + "\t" + downPayLoad + "\t" + totalPayLoad;
	}

	public TelBean(String tel, Long upPayLoad, Long downPayLoad, Long totalPayLoad) {
		super();
		this.tel = tel;
		this.upPayLoad = upPayLoad;
		this.downPayLoad = downPayLoad;
		this.totalPayLoad = totalPayLoad;
	}

	public TelBean() {
		super();
		// TODO Auto-generated constructor stub
	}

	public TelBean(Long upPayLoad, Long downPayLoad, Long totalPayLoad) {
		super();
		this.upPayLoad = upPayLoad;
		this.downPayLoad = downPayLoad;
		this.totalPayLoad = totalPayLoad;
	}

}

4.重寫Mapper,同樣只需要繼承hadoop內部封裝好的Mapper類即可,要注意的要在網路間傳輸,一切都要序列化,string對應的序列化後是text,int序列化後是IntWritable,long序列化後LongWritable等等

LongWritable指的是獲取一行內容的起始偏移量
Text指的是一行文字內容
Text指的是我們將要輸出的tel
TelBean指的是以物件的形式進行輸出
public class TCMapper extends Mapper<LongWritable, Text, Text, TelBean>{

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TelBean>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
獲取一行資訊
		String line = value.toString();
		將單詞以\t進行分割
		String[] fields = line.split("\t");
		例項化物件
		TelBean bean = new TelBean(fields[1],Long.valueOf(fields[8]),Long.valueOf(fields[9]),new Long(0));
		設定將要傳送給reduce的資料形式
		context.write(new Text(fields[1]), bean);
	}
}

5.對於Reduce同樣hadoop內部封裝有Reducer

注意的是map的輸出就是reduce的輸入
Text指的是map傳送過來的keyout,在這裡是keyin

我們最後想要的結果肯定是手機號對應的上行,下行,總流量
所以text數字的是tel
其他的封裝在telBean裡面

public class TCReducer extends Reducer<Text, TelBean, Text, TelBean>{

	@Override
	protected void reduce(Text key, Iterable<TelBean> value, Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
//		做統計
		long sumUp = 0;
		long sumDown = 0;
		
		for (TelBean bean : value) {
			sumUp += bean.getUpPayLoad();
			sumDown += bean.getDownPayLoad();
		}
		
//		telBean的屬性應該與log一一對應
//		當前的bean應該是一個新的bean
		TelBean bean = 
				new TelBean(sumUp, sumDown, sumUp+sumDown);
		
		context.write(new Text(key), bean);
	}
}

6.如果我們想將所有手機號前三位為135或者136的手機號令存放一個檔案,代表的是同一個歸屬地的手機號,那麼我們要用到partitioner分割器

map -- suffer -- reduce
map的輸出是suffer的輸入

public class TCPartitioner extends Partitioner<Text, TelBean>{

	@Override
	public int getPartition(Text key, TelBean bean, int arg2) {
		// TODO Auto-generated method stub

//生成的檔案part-r-00000 part的編號的結尾就是這個int型別的返回值
//根據不同的電話號碼 ,劃分到不同的區裡面

		String tel = bean.getTel();
		
		String subTel = tel.substring(0, 3);
		
		if ("135".equals(subTel)||"136".equals(subTel)) {
			return 1;//part-r-00001裡面
		}
		return 0;
	}

}

7.建立job

public class TCAPP {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//		獲取job
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
//		指定job使用的類
		job.setJarByClass(TCAPP.class);
		
//		將partitioner新增到job裡面
		job.setPartitionerClass(TCPartitioner.class);
//		設定reduceTasks的數量 有幾個分割槽設定幾個任務
		job.setNumReduceTasks(2);
		
//		設定mapper的類以及屬性
		job.setMapperClass(TCMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TelBean.class);
		
//		設定reduce的類以及屬性
		job.setReducerClass(TCReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TelBean.class);
		
//		設定輸入檔案 在呼叫的時候動態的傳遞引數
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
//		設定輸出目錄
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
//		提交任務
		job.waitForCompletion(true);
	}
}

8.將你當前的專案打包

9.啟動hadoop叢集

10.上傳要處理的檔案和打好的包到Linux下

11.將要處理的檔案上傳到HDFS根目錄下

hadoop fs -put /root/要處理的檔名+字尾 /   (/代表的是hdfs根目錄)

12.將hadoop的前端頁面開啟,觀察是否上傳成功

13.執行架包

hadoop jar 架包名   我們住方法絕對路徑    我們上傳在HDFS裡面要處理的檔案絕對路徑       處理結果存放在HDFS裡面的絕對路徑

實際開發中要確保處理檔案的時間大於HDFS啟動的時間,要記住hadoop不適合處理小檔案

14.前端頁面檢視是否成功

_SUCCESS只是一個標識,代表執行成功

part-r-000001則是我們要求的手機號碼為135或者136開頭的存放在一個檔案中

注意:hadoop不適合處理小檔案,實際開發中要避免多個小檔案的產生,在源頭進行處理,將小檔案合併,或者是在map階段將小的分割槽內容進行適當合併,減少reduce階段的處理壓力。