自定義MapReduce業務邏輯
1.我們剛一開始的時候,在HDFS上面處理檔案時候,我們並沒有自己寫MapReduce,而是用的是映象架包下面的/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar,同樣的也將執行出來結果(hadoop jar hadoop-mapreduce-examples-2.6.4.jar wordcount 檔案的在Linux上的源路徑 檔案處理後結果存放在HDFS上的路徑)
例如:hadoop jar hadoop-mapreduce-examples-2.6.4.jar wordcount /root/wordcount.txt
這其實是hadoop架包裡面自帶的一個Mapreduce的demo例項,紅色是我們要執行的主方法名,通常我們在例項中需要寫絕對路徑;黃色是我們將要處理的檔案在Linux上的絕對路徑;綠色是我們處理後文件,得到的結果儲存的位置(指的是儲存在HDFS上的絕對路徑,並且這個路徑在HDFS上不能存在,當執行命令的時候回自動建立,如果存在程式會包錯誤)
2.往往在實際開發中我們需要自己寫MapReduce的業務邏輯,hadoop架包裡的已經不能滿足我們的多樣化需求了,下面就介紹如何去customMapReduce
我們在建立專案時候可以選擇maven也可以選擇javaProject,maven比較簡單直接從aliyun倉庫線上下載架包即可,Javaproject則需要我們機子手動搭建所需架包,
- 首先將你的cenos-6.5-hadoop-2.6.4架包解壓,
- 進入\cenos-6.5-hadoop-2.6.4\hadoop-2.6.4\share\hadoop;將common下面lib下的+sources下的+hadoop-common-2.6.4+hdfs下面的lib下面的+sources下面的+hadoop-hdfs-2.6.4匯入buildpath下面
- \cenos-6.5-hadoop-2.6.4\hadoop-2.6.4\share\hadoop\mapreduce\下面的全部架包匯入buildpath下面
- 最後apply一下就可以
3.例項:統計每一個使用者的使用總流量
此案例我們以物件的形式在網路間傳輸,所以要想在網路間傳輸,我們就要對物件進行序列化,正好hadoop內部封裝有序列化介面,我們只需要實現這個介面即可WritableComparable
public class TelBean implements WritableComparable<TelBean> {
private String tel;使用者電話號碼
private Long upPayLoad;上行流量
private Long downPayLoad;下行流量
private Long totalPayLoad;總流量=上行+下行
public String getTel() {
return tel;
}
public void setTel(String tel) {
this.tel = tel;
}
public Long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(Long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public Long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(Long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public Long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(Long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
//序列化
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.tel = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}
//反序列化
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(tel);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
//物件之間的比較,排序
@Override
public int compareTo(TelBean bean) {
// TODO Auto-generated method stub
if (this.getTotalPayLoad() > bean.getTotalPayLoad()) {
return -1;
} else if (this.getTotalPayLoad() < bean.getTotalPayLoad()) {
return 1;
} else {
if (this.getDownPayLoad() > bean.getDownPayLoad()) {
return -1;
} else {
return 1;
}
}
}
@Override
public String toString() {
return tel + "\t" + upPayLoad + "\t" + downPayLoad + "\t" + totalPayLoad;
}
public TelBean(String tel, Long upPayLoad, Long downPayLoad, Long totalPayLoad) {
super();
this.tel = tel;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = totalPayLoad;
}
public TelBean() {
super();
// TODO Auto-generated constructor stub
}
public TelBean(Long upPayLoad, Long downPayLoad, Long totalPayLoad) {
super();
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = totalPayLoad;
}
}
4.重寫Mapper,同樣只需要繼承hadoop內部封裝好的Mapper類即可,要注意的要在網路間傳輸,一切都要序列化,string對應的序列化後是text,int序列化後是IntWritable,long序列化後LongWritable等等
LongWritable指的是獲取一行內容的起始偏移量
Text指的是一行文字內容
Text指的是我們將要輸出的tel
TelBean指的是以物件的形式進行輸出
public class TCMapper extends Mapper<LongWritable, Text, Text, TelBean>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TelBean>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
獲取一行資訊
String line = value.toString();
將單詞以\t進行分割
String[] fields = line.split("\t");
例項化物件
TelBean bean = new TelBean(fields[1],Long.valueOf(fields[8]),Long.valueOf(fields[9]),new Long(0));
設定將要傳送給reduce的資料形式
context.write(new Text(fields[1]), bean);
}
}
5.對於Reduce同樣hadoop內部封裝有Reducer
注意的是map的輸出就是reduce的輸入
Text指的是map傳送過來的keyout,在這裡是keyin
我們最後想要的結果肯定是手機號對應的上行,下行,總流量
所以text數字的是tel
其他的封裝在telBean裡面
public class TCReducer extends Reducer<Text, TelBean, Text, TelBean>{
@Override
protected void reduce(Text key, Iterable<TelBean> value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 做統計
long sumUp = 0;
long sumDown = 0;
for (TelBean bean : value) {
sumUp += bean.getUpPayLoad();
sumDown += bean.getDownPayLoad();
}
// telBean的屬性應該與log一一對應
// 當前的bean應該是一個新的bean
TelBean bean =
new TelBean(sumUp, sumDown, sumUp+sumDown);
context.write(new Text(key), bean);
}
}
6.如果我們想將所有手機號前三位為135或者136的手機號令存放一個檔案,代表的是同一個歸屬地的手機號,那麼我們要用到partitioner分割器
map -- suffer -- reduce
map的輸出是suffer的輸入
public class TCPartitioner extends Partitioner<Text, TelBean>{
@Override
public int getPartition(Text key, TelBean bean, int arg2) {
// TODO Auto-generated method stub
//生成的檔案part-r-00000 part的編號的結尾就是這個int型別的返回值
//根據不同的電話號碼 ,劃分到不同的區裡面
String tel = bean.getTel();
String subTel = tel.substring(0, 3);
if ("135".equals(subTel)||"136".equals(subTel)) {
return 1;//part-r-00001裡面
}
return 0;
}
}
7.建立job
public class TCAPP {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 指定job使用的類
job.setJarByClass(TCAPP.class);
// 將partitioner新增到job裡面
job.setPartitionerClass(TCPartitioner.class);
// 設定reduceTasks的數量 有幾個分割槽設定幾個任務
job.setNumReduceTasks(2);
// 設定mapper的類以及屬性
job.setMapperClass(TCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TelBean.class);
// 設定reduce的類以及屬性
job.setReducerClass(TCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TelBean.class);
// 設定輸入檔案 在呼叫的時候動態的傳遞引數
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 設定輸出目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交任務
job.waitForCompletion(true);
}
}
8.將你當前的專案打包
9.啟動hadoop叢集
10.上傳要處理的檔案和打好的包到Linux下
11.將要處理的檔案上傳到HDFS根目錄下
hadoop fs -put /root/要處理的檔名+字尾 / (/代表的是hdfs根目錄)
12.將hadoop的前端頁面開啟,觀察是否上傳成功
13.執行架包
hadoop jar 架包名 我們住方法絕對路徑 我們上傳在HDFS裡面要處理的檔案絕對路徑 處理結果存放在HDFS裡面的絕對路徑
實際開發中要確保處理檔案的時間大於HDFS啟動的時間,要記住hadoop不適合處理小檔案
14.前端頁面檢視是否成功
_SUCCESS只是一個標識,代表執行成功
part-r-000001則是我們要求的手機號碼為135或者136開頭的存放在一個檔案中
注意:hadoop不適合處理小檔案,實際開發中要避免多個小檔案的產生,在源頭進行處理,將小檔案合併,或者是在map階段將小的分割槽內容進行適當合併,減少reduce階段的處理壓力。