1. 程式人生 > >Hadoop系列-MapReduce自定義資料型別(序列化、反序列化機制)(十二)

Hadoop系列-MapReduce自定義資料型別(序列化、反序列化機制)(十二)

Github程式碼下載地址:

大家都知道,Hadoop中為Key的資料型別必須實現WritableComparable介面,而Value的資料型別只需要實現Writable介面即可;能做Key的一定可以做Value,能做Value的未必能做Key。但是具體應該怎麼應用呢?本篇文章將結合手機上網流量業務進行分析。


先介紹一下業務場景:統計每個使用者的上行流量和,下行流量和,以及總流量和。


本次描述所用資料,日誌格式描述:
手機號碼,上行流量,下行流量

測試的具體資料如下:


接下來貼出詳細程式碼,程式碼中含有詳細註釋,從程式碼中可以看出,用到了hadoop自定義的資料型別MyData,因為MyData只做value,所以在程式碼中只需要實現Writable介面。


package com.hadoop.minbo.mapreduce.custom2;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * 方式一
 */
public class FlowCount {

	static class WordCountMapper extends Mapper<Object, Text, Text, MyData> {

		@Override
		protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			// 拿到日誌中的一行資料
			String line = value.toString();
			// 切分各個欄位
			String[] splited = line.split(" ");
			// 獲取我們所需要的欄位:手機號、上行流量、下行流量
			String msisdn = splited[0];
			long upPayLoad = Long.parseLong(splited[1]);
			long downPayLoad = Long.parseLong(splited[2]);
			// 將資料進行輸出
			context.write(new Text(msisdn), new MyData(upPayLoad, downPayLoad));
		}
	}

	static class WordCountReducer extends Reducer<Text, MyData, Text, MyData> {
		@Override
		protected void reduce(Text key, Iterable<MyData> values, Reducer<Text, MyData, Text, MyData>.Context context)
				throws IOException, InterruptedException {
			long payLoadSum = 0L; // 計算每個使用者的上行流量和
			long downLoadSum = 0L; // 統計每個使用者的下行流量和
			// 資料傳遞過來的時候:<手機號,{MyData1, MyData2, MyData3……}>
			for (MyData md : values) {
				payLoadSum += md.upPayLoad;
				downLoadSum += md.downPayLoad;
			}
			// 在此需要重寫toString()方法
			context.write(key, new MyData(payLoadSum, downLoadSum)); 
		}
	}

	public static String path1 = "input1";
	public static String path2 = "output1";

	public static void main(String[] args) throws Exception {
		// Window下執行設定
		System.setProperty("hadoop.home.dir", "F:\\hadoop\\hadoop-2.7.3"); // 設定hadoop安裝路徑
		System.setProperty("HADOOP_USER_NAME", "hadoop"); // 使用者名稱

		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(conf);
		if (fileSystem.exists(new Path(path2))) {
			fileSystem.delete(new Path(path2), true);
		}

		Job job = Job.getInstance(conf);
		job.setJarByClass(FlowCount.class);

		FileInputFormat.setInputPaths(job, new Path(path1));
		job.setInputFormatClass(TextInputFormat.class);

		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		//設定reduce執行任務數,預設1個
		job.setNumReduceTasks(1);
		//設定分割槽方式,預設hash
		job.setPartitionerClass(HashPartitioner.class);

		// 指定maptask的輸出型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(MyData.class);

		// 指定reducetask的輸出型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(MyData.class);

		job.setOutputFormatClass(TextOutputFormat.class);
		FileOutputFormat.setOutputPath(job, new Path(path2));

		job.waitForCompletion(true);

		// 檢視執行結果:
		FSDataInputStream fr = fileSystem.open(new Path("output1/part-r-00000"));
		IOUtils.copyBytes(fr, System.out, 2048, true);
	}
}

package com.hadoop.minbo.mapreduce.custom2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 自定義資料型別需要在hadoop中傳輸需要實現Writable介面
 * @author MINBO
 */
public class MyData implements Writable {

	public long upPayLoad; // 上行流量
	public long downPayLoad; // 下行流量
	public long loadSum; // 總流量

	// 為了能夠反序列化必須要定義一個無引數的建構函式
	public MyData() {

	}

	public MyData(long upPayLoad, long downPayLoad) {
		this.upPayLoad = upPayLoad;
		this.downPayLoad = downPayLoad;
		this.loadSum = upPayLoad + downPayLoad;// 利用建構函式的技巧,建立建構函式時,總流量被自動求出
	}

	// 只要資料在網路中進行傳輸,就需要序列化與反序列化
	// 先序列化,將物件(欄位)寫到位元組輸出流當中
	@Override
	public void write(DataOutput fw) throws IOException {
		fw.writeLong(upPayLoad);
		fw.writeLong(downPayLoad);
	}

	// 反序列化,將物件從位元組輸入流當中讀取出來,並且序列化與反序列化的欄位順序要相同
	@Override
	public void readFields(DataInput fr) throws IOException {
		this.upPayLoad = fr.readLong();// 將上行流量給反序列化出來
		this.downPayLoad = fr.readLong(); // 將下行流量給反序列化出來
	}

	@Override
	public String toString() {
		return "" + this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.loadSum;
	}
}

檢視執行結果:


從上面的例項可以看出,Hadoop中的自定義資料型別其實是很簡單的,但是Hadoop為什麼需要自己定義一套資料型別呢?原因在於:Java中的資料型別在序列化與反序列化的過程中太麻煩了。Java中的資料型別在序列化與反序列化的過程中必須要保證這些類與類之間的關係,從這個角度講,意味著程式碼量就很大,資料在網路中傳輸就很佔網寬,而hadoop認為這樣太麻煩了,所以有自定義的資料型別,簡化了序列化與反序列化的過程,保證了程式碼量的簡潔。

其實如果對hadoop中的自定義資料型別不是很瞭解的話,我們也可以用現有的hadoop資料型別,比如收LongWritable,Text等來解決業務問題,比如對於上面給的手機上網流量統計業務,我們的程式碼也可以這麼設計:

package com.hadoop.minbo.mapreduce.custom2;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * 方式二
 */
public class FlowCount2 {

	static class WordCountMapper extends Mapper<LongWritable, Text, Text, Text> {

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			// 拿到日誌中的一行資料
			String line = value.toString();
			// 切分各個欄位
			String[] splited = line.split(" ");
			// 獲取我們所需要的欄位:手機號、上行流量、下行流量
			String num = splited[0];
			String upPayLoad = splited[1];
			String downPayLoad = splited[2];
			String str = "" + upPayLoad + " " + downPayLoad;// 這樣改變即可
			// 將資料進行輸出
			context.write(new Text(num), new Text(str));
		}
	}

	static class WordCountReducer extends Reducer<Text, Text, Text, Text> {

		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			long payLoadSum = 0L; // 計算每個使用者的上行流量和
			long downLoadSum = 0L; // 統計每個使用者的下行流量和
			long sum = 0L;
			for (Text v : values) {
				String[] splited = v.toString().split(" ");
				payLoadSum += Long.parseLong(splited[0]);
				downLoadSum += Long.parseLong(splited[1]);
			}

			sum = payLoadSum + downLoadSum;
			String result = "" + payLoadSum + " " + downLoadSum + " " + sum;
			context.write(key, new Text(result));
		}

	}

	public static String path1 = "input1";
	public static String path2 = "output1";

	public static void main(String[] args) throws Exception {
		// Window下執行設定
		System.setProperty("hadoop.home.dir", "F:\\hadoop\\hadoop-2.7.3"); // 設定hadoop安裝路徑
		System.setProperty("HADOOP_USER_NAME", "hadoop"); // 使用者名稱

		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(conf);
		if (fileSystem.exists(new Path(path2))) {
			fileSystem.delete(new Path(path2), true);
		}

		Job job = Job.getInstance(conf);
		job.setJarByClass(FlowCount2.class);

		FileInputFormat.setInputPaths(job, new Path(path1));
		job.setInputFormatClass(TextInputFormat.class);

		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		// 設定reduce執行任務數,預設1個
		job.setNumReduceTasks(1);
		// 設定分割槽方式,預設hash
		job.setPartitionerClass(HashPartitioner.class);

		// 指定maptask的輸出型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		// 指定reducetask的輸出型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setOutputFormatClass(TextOutputFormat.class);
		FileOutputFormat.setOutputPath(job, new Path(path2));

		job.waitForCompletion(true);

		// 檢視執行結果:
		FSDataInputStream fr = fileSystem.open(new Path("output1/part-r-00000"));
		IOUtils.copyBytes(fr, System.out, 2048, true);
	}
}

執行結果:


從上面也說明了一個道理,對於知識的運用,是需要靈活的掌握。
如果問題,歡迎指正!

參考資料: