1. 程式人生 > >Hadoop自定義型別處理手機上網日誌

Hadoop自定義型別處理手機上網日誌

job提交原始碼分析

在eclipse中的寫的程式碼如何提交作業到JobTracker中的哪?
(1)在eclipse中呼叫的job.waitForCompletion(true)實際上執行如下方法
  connect();
  info = jobClient.submitJobInternal(conf);
(2)在connect()方法中,實際上建立了一個JobClient物件。
  在呼叫該物件的構造方法時,獲得了JobTracker的客戶端代理物件JobSubmissionProtocol。
  JobSubmissionProtocol的實現類是JobTracker。
(3)在jobClient.submitJobInternal(conf)方法中,呼叫了
  JobSubmissionProtocol.submitJob(…),
  即執行的是JobTracker.submitJob(…)。

Hadoop資料型別

1.Hadoop的資料型別要求必須實現Writable介面。
2.java基本型別與Hadoop常見基本型別的對照
    Long     LongWritable
    Integer    IntWritable
    Boolean    BooleanWritable
    String     Text

java型別如何轉化為hadoop基本型別?
    呼叫hadoop型別的構造方法,或者呼叫set()方法。
      new LongWritable(123L);

hadoop基本型別如何轉化為java型別?
    對於Text,需要呼叫toString()方法,其他型別呼叫get()方法。

使用Hadoop自定義型別處理手機上網日誌

1、首先,將手機上網日誌檔案HTTP_20130313143750.dat通過WinSCP工具複製到/usr/local目錄下

2、將日誌檔案上傳到hdfs://chaoren:9000/wlan資料夾下

在這裡插入圖片描述

日誌檔案:

在這裡插入圖片描述

日誌檔案中各欄位含義:
在這裡插入圖片描述

3、編寫Java程式碼將日誌檔案中想要的資料統計出來。

複製程式碼
1 package mapreduce;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6
7 import org.apache.hadoop.conf.Configuration;
8 import org.apache.hadoop.fs.Path;
9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.io.Writable;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
20
21 public class KpiApp {
22 static final String INPUT_PATH = “hdfs://chaoren:9000/wlan”;//wlan是個資料夾,日誌檔案放在/wlan目錄下
23 static final String OUT_PATH = “hdfs://chaoren:9000/out”;
24
25 public static void main(String[] args) throws Exception {
26 final Job job = new Job(new Configuration(),
27 KpiApp.class.getSimpleName());
28 // 1.1 指定輸入檔案路徑
29 FileInputFormat.setInputPaths(job, INPUT_PATH);
30 // 指定哪個類用來格式化輸入檔案
31 job.setInputFormatClass(TextInputFormat.class);
32
33 // 1.2指定自定義的Mapper類
34 job.setMapperClass(MyMapper.class);
35 // 指定輸出<k2,v2>的型別
36 job.setMapOutputKeyClass(Text.class);
37 job.setMapOutputValueClass(KpiWritable.class);
38
39 // 1.3 指定分割槽類
40 job.setPartitionerClass(HashPartitioner.class);
41 job.setNumReduceTasks(1);
42
43 // 1.4 TODO 排序、分割槽
44
45 // 1.5 TODO (可選)歸約
46
47 // 2.2 指定自定義的reduce類
48 job.setReducerClass(MyReducer.class);
49 // 指定輸出<k3,v3>的型別
50 job.setOutputKeyClass(Text.class);
51 job.setOutputValueClass(KpiWritable.class);
52
53 // 2.3 指定輸出到哪裡
54 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
55 // 設定輸出檔案的格式化類
56 job.setOutputFormatClass(TextOutputFormat.class);
57
58 // 把程式碼提交給JobTracker執行
59 job.waitForCompletion(true);
60 }
61
62 static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable> {
63 protected void map(
64 LongWritable key,
65 Text value,
66 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
67 throws IOException, InterruptedException {
68 final String[] splited = value.toString().split("\t");
69 final String msisdn = splited[1];
70 final Text k2 = new Text(msisdn);
71 final KpiWritable v2 = new KpiWritable(splited[6], splited[7],
72 splited[8], splited[9]);
73 context.write(k2, v2);
74 };
75 }
76
77 static class MyReducer extends
78 Reducer<Text, KpiWritable, Text, KpiWritable> {
79 /**
80 * @param k2
81 * 表示整個檔案中不同的手機號碼
82 * @param v2s
83 * 表示該手機號在不同時段的流量的集合
84 */
85 protected void reduce(
86 Text k2,
87 java.lang.Iterable v2s,
88 org.apache.hadoop.mapreduce.Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
89 throws IOException, InterruptedException {
90 long upPackNum = 0L;
91 long downPackNum = 0L;
92 long upPayLoad = 0L;
93 long downPayLoad = 0L;
94
95 for (KpiWritable kpiWritable : v2s) {
96 upPackNum += kpiWritable.upPackNum;
97 downPackNum += kpiWritable.downPackNum;
98 upPayLoad += kpiWritable.upPayLoad;
99 downPayLoad += kpiWritable.downPayLoad;
100 }
101
102 final KpiWritable v3 = new KpiWritable(upPackNum + “”, downPackNum
103 + “”, upPayLoad + “”, downPayLoad + “”);
104 context.write(k2, v3);
105 };
106 }
107 }
108
109 class KpiWritable implements Writable {
110 long upPackNum;
111 long downPackNum;
112 long upPayLoad;
113 long downPayLoad;
114
115 public KpiWritable() {
116 }
117
118 public KpiWritable(String upPackNum, String downPackNum, String upPayLoad,
119 String downPayLoad) {
120 this.upPackNum = Long.parseLong(upPackNum);
121 this.downPackNum = Long.parseLong(downPackNum);
122 this.upPayLoad = Long.parseLong(upPayLoad);
123 this.downPayLoad = Long.parseLong(downPayLoad);
124 }
125
126 public void readFields(DataInput in) throws IOException {
127 this.upPackNum = in.readLong();
128 this.downPackNum = in.readLong();
129 this.upPayLoad = in.readLong();
130 this.downPayLoad = in.readLong();
131 }
132
133 public void write(DataOutput out) throws IOException {
134 out.writeLong(upPackNum);
135 out.writeLong(downPackNum);
136 out.writeLong(upPayLoad);
137 out.writeLong(downPayLoad);
138 }
139
140 @Override
141 public String toString() {
142 return upPackNum + “\t” + downPackNum + “\t” + upPayLoad + “\t”
143 + downPayLoad;
144 }
145 }

複製程式碼

4、執行結果
在這裡插入圖片描述