1. 程式人生 > >大數據學習之MapReduce編程案例二流量日誌統計 10

大數據學習之MapReduce編程案例二流量日誌統計 10

本地 編寫代碼 效果 system 持久 window highlight 手機 img

每一個用戶的統計總流量

一:編寫代碼之前。先了解一下hadoop中的序列化

JAVA 類型 HADOOP 類型

int     IntWritable

float     FloatWritable

long     LongWritable

double    DoubleWritable

string     Text

boolean   BooleanWritable

byte     ByteWritable

map     MapWritable

array ArrayWritable

1:為什麽要序列化?

存儲“活的對象”

2:什麽是序列化?

序列化就是把內存當中的對象,轉換成字節序列以便於存儲和網絡傳輸

反序列化就是將受到的字節序列或者硬盤的持久化數據,轉換成內存中的對象

java的序列化->Serializable

為什麽不使用java提供的序列化接口?

java的序列化是一個重量級的序列化框架,一個對象被序列化後會附帶很多額外的信息(效驗信息,header,繼承體系等)。

不便於在網絡中高效傳輸,所以hadoop開發了一套序列化機制(Writable),精簡/高效.

為什麽序列化在hadoop中很重要?

hadoop通信是通過遠程調用(rpc)實現的,需要進行序列化

3:特點:

1)緊湊

2)快速

3)可拓展

4)互操作

二 代碼編寫:

1:準備好數據,如下

技術分享圖片

技術分享圖片

2 :先寫一個序列化類

package it.dawn.YARNPra.flow流量匯總序列化;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * @author Dawn
 * @date 2019年5月2日14:44:27
 * @version 1.0
 * 
 * 封裝類 數據的傳輸
 */
public class FlowBean implements Writable{
	
	//定義屬性
	private long upFlow;//上行流量
	private long dfFlow;//下行流量
	private long flowSum;//總流量
	
	public FlowBean() {}

	//流量累加
	public FlowBean(long upFlow,long dfFlow) {
		this.upFlow=upFlow;
		this.dfFlow=dfFlow;
		this.flowSum=upFlow+dfFlow;
	}
	
	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDfFlow() {
		return dfFlow;
	}

	public void setDfFlow(long dfFlow) {
		this.dfFlow = dfFlow;
	}

	public long getFlowSum() {
		return flowSum;
	}

	public void setFlowSum(long flowSum) {
		this.flowSum = flowSum;
	}

	//反序列化
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow=in.readLong();
		dfFlow=in.readLong();
		flowSum=in.readLong();
	}
	//序列化
	@Override
	public void write(DataOutput out) throws IOException {

		out.writeLong(upFlow);
		out.writeLong(dfFlow);
		out.writeLong(flowSum);
	}

	@Override
	public String toString() {
		return upFlow + "\t" + dfFlow + "\t" + flowSum;
	}
	
	
	

}

  

3:Map階段

package it.dawn.YARNPra.flow流量匯總序列化;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author Dawn
 * @date 2019年5月2日14:53:17
 * @version 1.0
 * 
 * 數據源:
 * 3631279850362	13726130503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	www.itstaredu.com	教育網站	24	27	299	681	200
 * 
 * map輸出結果:
 * 13726130503  299	681 980
 */
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//1.獲取數據
		String line=value.toString();
		
		//2.切割
		String[] fields=line.split("\t");
		
		//3.封裝對象 拿到關鍵字段 數據清洗
		String phoneNum=fields[1];
		long upFlow=Long.parseLong(fields[fields.length - 3]);
		long dfFlow=Long.parseLong(fields[fields.length - 2]);
		
		//4.輸出到reducer端13726130503  299	681  980
		context.write(new Text(phoneNum), new FlowBean(upFlow, dfFlow));
	}
	

}

  

4:Reduce階段

package it.dawn.YARNPra.flow流量匯總序列化;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author Dawn
 * @date 2019年5月2日14:59:15
 * @version 1.0
 * 13726130503  299	681 980
 */
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

	@Override
	protected void reduce(Text k3, Iterable<FlowBean> v3, Context context)
			throws IOException, InterruptedException {
		//1.相同手機號 的流量使用再次匯總
		long upFlow_sum=0;
		long dfFlow_sum=0;
		
		//2.累加
		for(FlowBean v:v3) {
			upFlow_sum+=v.getUpFlow();
			dfFlow_sum+=v.getDfFlow();
		}
		
		FlowBean rs=new FlowBean(upFlow_sum, dfFlow_sum);
		
		//3.輸出
		context.write(k3, rs);
	}
	
}

  

5:編寫Driver(本地Windows上運行的。懶得上傳到集群上了,和本地運行的效果都一樣)

package it.dawn.YARNPra.flow流量匯總序列化;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Dawn
 * @date 2019年5月2日15:03:59
 * @version 1.0
 * 
 * 就在本地跑算了。方便點,其實和在虛擬機中效果一樣
 */
public class FlowCountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);
		
		job.setJarByClass(FlowCountDriver.class);
		
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		FileInputFormat.setInputPaths(job, new Path("f:/temp/流量日誌.dat"));
		FileOutputFormat.setOutputPath(job, new Path("/f:/temp/流量統計結果"));
		
		boolean rs=job.waitForCompletion(true);
		System.out.println(rs?"成功":"失敗");
		
	}

}

  

大功告成:可以看下運行結果

暈怎麽差不了圖片了呀!!!!算了。直接上結果數據吧

13480253104 120 1320 1440
13502468823 735 11349 12084
13510439658 1116 954 2070
13560436326 1136 94 1230
13560436666 1136 94 1230
13560439658 918 4938 5856
13602846565 198 910 1108
13660577991 660 690 1350
13719199419 240 0 240
13726130503 299 681 980
13726238888 2481 24681 27162
13760778710 120 120 240
13822544101 264 0 264
13884138413 4116 1432 5548
13922314466 3008 3720 6728
13925057413 11058 4243 15301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 369 338 707
15889002119 938 380 1318
15920133257 316 296 612
18212575961 1527 2106 3633
18320173382 9531 212 9743

技術分享圖片

大數據學習之MapReduce編程案例二流量日誌統計 10