(防坑筆記)hadoop3.0 (三) MapReduce流程及序列化、偏移值(MapReduce)
防坑留言:
一種將資料量分成小塊計算後再彙總的一種方式吧,
基本理解
一張圖簡單構建MapReduce的基本思路
map():相當於分解任務的集合吧
reduce(): 相當於對分解任務運算結果的彙總
以上的兩種函式的形參都是K/V結構
Mapper的任務過程
(其中的mappe任務是一個java程序)
MapReduce執行的時候,通過Mapper執行的任務讀取HDFS中的資料檔案,然後呼叫自己的方法,處理資料,最後輸出。Reducer任務會接收Mapper任務輸出的資料,作為自己的輸入資料,呼叫自己的方法,最後輸出到HDFS的檔案中。
一共是如下6階段
1.第一階段是把輸入檔案按照一定的標準分片(InputSplit),每個輸入片的大小是固定的。預設情況下,輸入片(InputSplit)的大小與資料塊(Block)的大小是相同的。如果資料塊(Block)的大小是預設值64MB,輸入檔案有兩個,一個是32MB,一個是72MB,那麼小的檔案是一個輸入片,大檔案會分為兩個資料塊64MB和8MB,一共產生三個輸入片。每一個輸入片由一個Mapper程序處理。這裡的三個輸入片,會有三個Mapper程序處理。 2.第二階段是對輸入片中的記錄按照一定的規則解析成鍵值對。有個預設規則是把每一行文字內容解析成鍵值對。“鍵”是每一行的起始位置(單位是位元組),“值”是本行的文字內容。 3.第三階段是呼叫Mapper類中的map方法。第二階段中解析出來的每一個鍵值對,呼叫一次map方法。如果有1000個鍵值對,就會呼叫1000次map方法。每一次呼叫map方法會輸出零個或者多個鍵值對。 4.第四階段是按照一定的規則對第三階段輸出的鍵值對進行分割槽。比較是基於鍵進行的。比如我們的鍵表示省份(如北京、上海、山東等),那麼就可以按照不同省份進行分割槽,同一個省份的鍵值對劃分到一個區中。預設是隻有一個區。分割槽的數量就是Reducer任務執行的數量。預設只有一個Reducer任務。 5.第五階段是對每個分割槽中的鍵值對進行排序。首先,按照鍵進行排序,對於鍵相同的鍵值對,按照值進行排序。比如三個鍵值對<2,2>、<1,3>、<2,1>,鍵和值分別是整數。那麼排序後的結果是<1,3>、<2,1>、<2,2>。如果有第六階段,那麼進入第六階段;如果沒有,直接輸出到本地的linux檔案中。 6.第六階段是對資料進行歸約處理,也就是reduce處理。鍵相等的鍵值對會呼叫一次reduce方法。經過這一階段,資料量會減少。歸約後的資料輸出到本地的linux檔案中。本階段預設是沒有的,需要使用者自己增加這一階段的程式碼。
其實說白了,整個mapreduce的過程,就是將任務按需求的切割成一個個塊(block),分別對其進行操作運算,之後再重新彙總的一種過程。
直接拉一個通用手機瀏覽分析例子
源資料 (3個手機號及其在指定網站的上下傳流量)
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視訊網站 15 12 1527 2106 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 資訊保安 20 20 3156 2936 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200
通解:
在此之前,先對序列化做一個初步的認識,序列化後,可以將結構化的資料轉為位元組流,方便傳輸與永久性儲存
而hadoop也有自己的一套序列化讀寫介面 Writable,內部僅有兩個方法write()、readFields(),便於其叢集時節點的通訊。
這裡的Text,從原始碼中看出,它是針對於編碼格式且轉換為utf-8的,同時繼承與實現中也同時實現了Comparable方法,代表可排序的,可以當做一個編碼為utf-8的特殊String型別;
KpiWritable.java (類似一個pojo,但是實現了hadoop的序列化介面Writable)
package hadoop.Writable.Serialization; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * @ClassName: KpiWritable * @Description: 封裝KpiWritable型別 * @author linge * @date 2017年12月27日 下午5:12:18 * */ public class KpiWritable implements Writable{ private long upPackNum; // 上行資料包數,單位:個 private long downPackNum; // 下行資料包數,單位:個 private long upPayLoad; // 上行總流量,單位:byte private long downPayLoad; // 下行總流量,單位:byte //方便序列化 public KpiWritable() { super(); } public KpiWritable(long upPackNum, long downPackNum, long upPayLoad, long downPayLoad) { super(); this.upPackNum = upPackNum; this.downPackNum = downPackNum; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; } public KpiWritable(String upPack, String downPack, String upPay, String downPay) {//這個建構函式主要是為了在分發處理資料的時候方便字串插入 upPackNum = Long.parseLong(upPack); downPackNum = Long.parseLong(downPack); upPayLoad = Long.parseLong(upPay); downPayLoad = Long.parseLong(downPay); } @Override public String toString() {//這裡是方便分割槽,分片使用 String result = upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad; return result; } @Override public void write(DataOutput out) throws IOException { out.writeLong(upPackNum); out.writeLong(downPackNum); out.writeLong(upPayLoad); out.writeLong(downPayLoad); } @Override public void readFields(DataInput in) throws IOException { upPackNum = in.readLong(); downPackNum = in.readLong(); upPayLoad = in.readLong(); downPayLoad = in.readLong(); } public long getUpPackNum() { return upPackNum; } public void setUpPackNum(long upPackNum) { this.upPackNum = upPackNum; } public long getDownPackNum() { return downPackNum; } public void setDownPackNum(long downPackNum) { this.downPackNum = downPackNum; } public long getUpPayLoad() { return upPayLoad; } public void setUpPayLoad(long upPayLoad) { this.upPayLoad = upPayLoad; } public long getDownPayLoad() { return downPayLoad; } public void setDownPayLoad(long downPayLoad) { this.downPayLoad = downPayLoad; } }
MyMapper.java 分散處理,也就是前面說的分割槽,分片
package hadoop.Writable.Serialization;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//LongWritable(偏移值) 是因為所有引數都是long型別,也可以用自定義的類,那樣只需要讓pojo類實現與LongWritable一樣的WritableComparable介面就好,裡面同樣實現了writable介面
public class MyMapper extends Mapper<LongWritable, Text, Text,KpiWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
throws IOException, InterruptedException {
String[] spilted = key.toString().split("\t");
String string = spilted[1];//手機號碼
Text k2 = new Text(string);//手機號碼作為主鍵
KpiWritable v2 = new KpiWritable(spilted[6], spilted[7],spilted[8], spilted[9]);
context.write(k2, v2);
}
}
MyReducer.java對資料進行歸約處理
package hadoop.Writable.Serialization;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<Text,KpiWritable, Text, KpiWritable>{
@Override
protected void reduce(Text k2, Iterable<KpiWritable> arg1,
Reducer<Text, KpiWritable, Text, KpiWritable>.Context context) throws IOException, InterruptedException {
//用於統計的引數
long upPackNum = 0L;
long downPackNum = 0L;
long upPayLoad = 0L;
long downPayLoad = 0L;
for (KpiWritable kpiWritable : arg1) {
upPackNum+=kpiWritable.getUpPackNum();
downPackNum+=kpiWritable.getDownPackNum();
upPayLoad+=kpiWritable.getUpPayLoad();
downPayLoad+=kpiWritable.getDownPayLoad();
}
//k2這裡的k2是mapper拆分的key,也就是手機號碼
KpiWritable v3 = new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad);
context.write(k2, v3);
}
}
程式碼整合實現Tool介面,通過ToolRunner來執行應用程式
@Override
public int run(String[] args) throws Exception {
// 首先刪除輸出目錄已生成的檔案
FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf());
Path outPath = new Path(OUTPUT_PATH);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 定義一個作業
Job job = Job.getInstance(getConf(),"lingeJob");
// 設定輸入目錄
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
// 設定自定義Mapper類
job.setMapperClass(MyMapper.class);
// 指定<k2,v2>的型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(KpiWritable.class);
// 設定自定義Reducer類
job.setReducerClass(MyReducer.class);
// 指定<k3,v3>的型別
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(KpiWritable.class);
// 設定輸出目錄
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
// 提交作業
Boolean res = job.waitForCompletion(true);
if(res){
System.out.println("Process success!");
System.exit(0);
}
else{
System.out.println("Process failed!");
System.exit(1);
}
return 0;
}
main方法呼叫註冊
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
int res = ToolRunner.run(conf, new MyKpiJob(), args);
System.exit(res);
} catch (Exception e) {
e.printStackTrace();
}
}
完整程式碼
package hadoop.Writable.Deserialization;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyKpiJob extends Configured implements Tool {
/*
* 自定義資料型別KpiWritable
*/
public static class KpiWritable implements WritableComparable<KpiWritable>{
long upPackNum; // 上行資料包數,單位:個
long downPackNum; // 下行資料包數,單位:個
long upPayLoad; // 上行總流量,單位:byte
long downPayLoad; // 下行總流量,單位:byte
public KpiWritable() {
}
public KpiWritable(String upPack, String downPack, String upPay,
String downPay) {
upPackNum = Long.parseLong(upPack);
downPackNum = Long.parseLong(downPack);
upPayLoad = Long.parseLong(upPay);
downPayLoad = Long.parseLong(downPay);
}
@Override
public String toString() {
String result = upPackNum + "\t" + downPackNum + "\t" + upPayLoad
+ "\t" + downPayLoad;
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upPackNum);
out.writeLong(downPackNum);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
upPackNum = in.readLong();
downPackNum = in.readLong();
upPayLoad = in.readLong();
downPayLoad = in.readLong();
}
@Override
public int compareTo(KpiWritable o) {
// TODO Auto-generated method stub
return (int) (o.downPackNum-this.downPackNum);
}
}
/*
* 自定義Mapper類,重寫了map方法
*/
public static class MyMapper extends
Mapper<LongWritable, Text, Text, KpiWritable> {
protected void map(
LongWritable k1,
Text v1,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
throws IOException, InterruptedException {
String[] spilted = v1.toString().split("\\s+");
System.out.println(spilted.length);
if(spilted.length>1){
String msisdn = spilted[1]; // 獲取手機號碼
Text k2 = new Text(msisdn); // 轉換為Hadoop資料型別並作為k2
KpiWritable v2 = new KpiWritable(spilted[6], spilted[7],
spilted[8], spilted[9]);
context.write(k2, v2);
}
};
}
/*
* 自定義Reducer類,重寫了reduce方法
*/
public static class MyReducer extends
Reducer<Text, KpiWritable, Text, KpiWritable> {
protected void reduce(
Text k2,
java.lang.Iterable<KpiWritable> v2s,
org.apache.hadoop.mapreduce.Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
throws IOException, InterruptedException {
long upPackNum = 0L;
long downPackNum = 0L;
long upPayLoad = 0L;
long downPayLoad = 0L;
for (KpiWritable kpiWritable : v2s) {
upPackNum += kpiWritable.upPackNum;
downPackNum += kpiWritable.downPackNum;
upPayLoad += kpiWritable.upPayLoad;
downPayLoad += kpiWritable.downPayLoad;
}
KpiWritable v3 = new KpiWritable(upPackNum + "", downPackNum + "",
upPayLoad + "", downPayLoad + "");
System.out.println(k2+"\nupPackNum:"+upPackNum+"/ndownPackNum:"+downPackNum+"/n:upPayLoad:"+upPayLoad+"/ndownPayLoad:"+downPayLoad);
context.write(k2, v3);
};
}
// 輸入檔案目錄
public static final String INPUT_PATH = "hdfs://192.168.88.129:9000/ha.txt";
// 輸出檔案目錄
public static final String OUTPUT_PATH = "hdfs://192.168.88.129:9000/out";
@Override
public int run(String[] args) throws Exception {
// 首先刪除輸出目錄已生成的檔案
FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf());
Path outPath = new Path(OUTPUT_PATH);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 定義一個作業
Job job = Job.getInstance(getConf(),"lingeJob");
// 設定輸入目錄
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
// 設定自定義Mapper類
job.setMapperClass(MyMapper.class);
// 指定<k2,v2>的型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(KpiWritable.class);
// 設定自定義Reducer類
job.setReducerClass(MyReducer.class);
// 指定<k3,v3>的型別
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(KpiWritable.class);
// 設定輸出目錄
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
// 提交作業
Boolean res = job.waitForCompletion(true);
if(res){
System.out.println("Process success!");
System.exit(0);
}
else{
System.out.println("Process failed!");
System.exit(1);
}
return 0;
}
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
int res = ToolRunner.run(conf, new MyKpiJob(), args);
System.exit(res);
} catch (Exception e) {
e.printStackTrace();
}
}
}
整個操作中,我們主要是對Mapper與Reduce類的操作比較多,也主要是這些部分構成我們的業務需求實現。