1. 程式人生 > >MapReduce案例9——多個數字檔案的資料排序並新增序號(新增可並行方法)

MapReduce案例9——多個數字檔案的資料排序並新增序號(新增可並行方法)

題目:
數字排序並加序號源資料:
2
32
654
32
15
756
65223
5956
22
650
92
26
54
6


最張結果:
1  2
2  6
3  15
4  22
5  26
6  32
7  32
8  54
9  92
10 650
11 654
12 756
13 5956
14 65223


一定要考慮 當資料量一大的時候, 你的實現思路能否使用。

解題思路:當有多個無序檔案需要進行排序,並且在數字前面加入索引,首先考慮使用MapReduce的預設排序方法,在map裡面進行排序,然後設定全域性計數變數記錄索引值,通過設定全域性臨時變數記錄上個值的大小,如果當前值大於臨時值,計數變數加1,否則不變,然後進行輸出

程式碼如下:

/**
 * @author: lpj   
 * @date: 2018年3月16日 下午7:16:47
 * @Description:
 */
package lpj.reduceWork;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 *
 */
import org.apache.hadoop.security.token.Token.PrivateToken;
public class BigNumFileSortMR {
	private static int countnum = 0;
	private static int temNum = 0;
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
//		conf.addResource("hdfs-site.xml");//使用配置檔案
//		System.setProperty("HADOOP_USER_NAME", "hadoop");//使用叢集
		FileSystem fs = FileSystem.get(conf);//預設使用本地
		
		Job job = Job.getInstance(conf);
		job.setJarByClass(BigNumFileSortMR.class);
		job.setMapperClass(BigNumFileSortMR_Mapper.class);
		job.setReducerClass(BigNumFileSortMR_Reducer.class);
		
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		Path inputPath = new Path("d:/a/homework9/input/");//讀入多個檔案
		Path outputPath = new Path("d:/a/homework9/output/");//輸出一個檔案
		if (fs.exists(inputPath)) {
			fs.delete(outputPath, true);
		}
		
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		boolean isdone = job.waitForCompletion(true);
		System.exit(isdone ? 0 : 1);
	}
	
	public static class BigNumFileSortMR_Mapper extends Mapper<LongWritable, Text, IntWritable, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			int num = Integer.parseInt(value.toString());
			context.write(new IntWritable(num), NullWritable.get());
			
		}
	}
	public static class BigNumFileSortMR_Reducer extends Reducer<IntWritable, NullWritable, Text, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
			for(NullWritable niv : values){
				if (key.get() > temNum) {
					countnum++;//全域性排序變數
					temNum = key.get();//記錄當前臨時值
				}
				String kk = countnum + "\t" + key.toString();
				kout.set(kk);
				context.write(kout, NullWritable.get());
			}
		}
		
	}

}

測試:在輸入檔案中,將給出的數值複製3份,作為輸入檔案,執行後的結果為:

1	2
1	2
1	2
2	6
2	6
2	6
3	15
3	15
3	15
4	22
4	22
4	22
5	26
5	26
5	26
6	32
6	32
6	32
6	32
6	32
6	32
7	54
7	54
7	54
8	92
8	92
8	92
9	650
9	650
9	650
10	654
10	654
10	654
11	756
11	756
11	756
12	5956
12	5956
12	5956
13	65223
13	65223
13	65223

當輸入檔案為大檔案,多個檔案時,使用一個reduce任務輸出壓力過大,因此採用多reduce方法:

程式碼如下:

/**
 * @author: lpj   
 * @date: 2018年3月16日 下午7:16:47
 * @Description:
 */
package lpj.reduceWork;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.record.Index;
/**
 *
 */
import org.apache.hadoop.security.token.Token.PrivateToken;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import lpj.reduceWorkbean.MyPatitionerBigFileSum;

public class BigNumFileSortMR2 extends Configured implements Tool{
	public static void main(String[] args) throws Exception {
		int run = ToolRunner.run(new BigNumFileSortMR2(), args);
		System.exit(run);
	}
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		//---------------將檔案分組輸出-------------------------
		FileSystem fs = FileSystem.get(conf);//預設使用本地
		Job job = Job.getInstance(conf);
		job.setJarByClass(BigNumFileSortMR2.class);
		job.setMapperClass(BigNumFileSortMR_Mapper.class);
		job.setReducerClass(BigNumFileSortMR_Reducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(NullWritable.class);
		job.setPartitionerClass(MyPatitionerBigFileSum.class);
		job.setNumReduceTasks(4);
		Path inputPath = new Path("/a/homework9/input");//讀入多個檔案
		Path outputPath = new Path("/a/homework9/output1");//輸出多個檔案
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		//-----------------統計每個檔案的個數,記錄檔名----------------------------
		FileSystem fs2 = FileSystem.get(conf);//預設使用本地
		Job job2 = Job.getInstance(conf);
		job2.setJarByClass(BigNumFileSortMR2.class);
		job2.setMapperClass(BigNumFileSortMR2_Mapper.class);
		job2.setReducerClass(BigNumFileSortMR2_Reducer.class);
		job2.setMapOutputKeyClass(Text.class);
		job2.setMapOutputValueClass(IntWritable.class);
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(Text.class);
		Path inputPath2 = new Path("/a/homework9/output1");//讀入多個檔案
		Path outputPath2 = new Path("/a/homework9/output2");//輸出多個檔案
		if (fs2.exists(outputPath2)) {
			fs2.delete(outputPath2, true);
		}
		FileInputFormat.setInputPaths(job2, inputPath2);
		FileOutputFormat.setOutputPath(job2, outputPath2);
		//-----------------新增索引----------------------------
		FileSystem fs3 = FileSystem.get(conf);//預設使用本地
		Job job3 = Job.getInstance(conf);
		job3.setJarByClass(BigNumFileSortMR2.class);
		job3.setMapperClass(BigNumFileSortMR3_Mapper.class);
		job3.setNumReduceTasks(0);
		job3.setMapOutputKeyClass(Text.class);
		job3.setMapOutputValueClass(NullWritable.class);
		URI uri = new URI("/a/homework9/output2/part-r-00000");
		job3.addCacheFile(uri);
		Path inputPath3 = new Path("/a/homework9/output1");//讀入多個檔案
		Path outputPath3 = new Path("/a/homework9/output3");//輸出多個檔案
		if (fs3.exists(outputPath3)) {
			fs3.delete(outputPath3, true);
		}
		FileInputFormat.setInputPaths(job3, inputPath3);
		FileOutputFormat.setOutputPath(job3, outputPath3);
		
		//--------------------------------------
		ControlledJob aJob = new ControlledJob(job.getConfiguration());
		ControlledJob bJob = new ControlledJob(job2.getConfiguration());
		ControlledJob cJob = new ControlledJob(job3.getConfiguration());
		aJob.setJob(job);
		bJob.setJob(job2);
		cJob.setJob(job3);
		JobControl jc = new JobControl("jc");
		jc.addJob(aJob);
		jc.addJob(bJob);
		jc.addJob(cJob);
		bJob.addDependingJob(aJob);
		cJob.addDependingJob(bJob);
		Thread thread = new Thread(jc);
		thread.start();
		while(!jc.allFinished()){
			thread.sleep(1000);
		}
		jc.stop();
		return 0;
	}
	//------------------將檔案分組輸出-------------------------
	public static class BigNumFileSortMR_Mapper extends Mapper<LongWritable, Text, IntWritable, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			int num = Integer.parseInt(value.toString());
			context.write(new IntWritable(num), NullWritable.get());
		}
	}
	public static class BigNumFileSortMR_Reducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
			for(NullWritable niv : values){
				context.write(key, NullWritable.get());
			}
			Configuration configuration = context.getConfiguration();
			FileSystem fs = FileSystem.get(configuration);
		}
	}
	//--------------------------------統計每個檔案的個數,記錄檔名(根據分割槽資訊)------------------------------------------
	public static class BigNumFileSortMR2_Mapper extends Mapper<LongWritable, Text, Text, IntWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		Set<Integer> numset = new HashSet<>();//記錄不重複元素個數
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			int num = Integer.parseInt(value.toString());//記錄資料大小
			InputSplit inputSplit = context.getInputSplit();
			FileSplit fSplit = (FileSplit)inputSplit;
			System.out.println(inputSplit.getLength());
			String filename = fSplit.getPath().getName();//獲取當前檔名
			int count = 1;//記錄檔案元素
			if (num >=0 && num < 100) {
				numset.add(num);
			}else if (num >= 100 && num < 500) {
				numset.add(num);
			}else if (num >= 500 && num < 1000) {
				numset.add(num);
			}else {
				numset.add(num);
			}
			if (inputSplit.getLength() == 0) {
				context.write(new Text(filename), new IntWritable(0));
			}else{
				context.write(new Text(filename), new IntWritable(numset.size()));
			}
		}
	}
	public static class BigNumFileSortMR2_Reducer extends Reducer<Text, IntWritable, Text, Text>{
		Text kout = new Text();
		Text valueout = new Text();
		int index = 1;
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
			//每組取第一個,作為不重複輸入
			int count = 0;
			for(IntWritable niv : values){
				count++;
				if (count <= 1) {
					context.write(key,new Text(niv.get() + "\t" + index));
					index += niv.get();
				}else {
					return;
				}
			}
		}
	}
	//載入檔案資訊到記憶體,然後進行序號新增
	//-------------------新增序號------------------------
	public static class BigNumFileSortMR3_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		Map<String, Integer> filecount = new HashMap<>();
		int firstnum = 0;
		int index = 0;
		int count = 0;
		@SuppressWarnings("deprecation")
		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
			Path[] paths = context.getLocalCacheFiles();
			String str = paths[0].toUri().toString();
			BufferedReader bf = new BufferedReader(new FileReader(new File(str)));
			String readline = null;
			while((readline = bf.readLine()) != null){
				String[] split = readline.split("\t");
				filecount.put(split[0], Integer.parseInt(split[2]));
			}
			IOUtils.closeStream(bf);
//			filecount.put("part-r-00000", 1);
//			filecount.put("part-r-00002", 9);
//			filecount.put("part-r-00003", 12);
			//取出當前檔案的起始索引
			InputSplit inputSplit = context.getInputSplit();
			FileSplit fileSplit = (FileSplit)inputSplit;
			String name = fileSplit.getPath().getName();
			if (fileSplit.getLength() > 0) {
				
				index = filecount.get(name);
			}
			
		}

		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			int num = Integer.parseInt(value.toString());
			count++;
			if (count == 1) {
				firstnum = num;
				String kk = index + "\t" + num;
				kout.set(kk);
				context.write(kout, NullWritable.get());
			}else {
				if (num == firstnum) {
					String kk = index + "\t" + num;
					kout.set(kk);
					context.write(kout, NullWritable.get());
				}else {
					index ++;
					String kk = index + "\t" + num;
					kout.set(kk);
					context.write(kout, NullWritable.get());
				}
				firstnum = num;
			}
		}
	}
	public static class BigNumFileSortMR3_Reducer extends Reducer<Text, IntWritable, Text, Text>{
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
		}
	}
}

總結:此類問題無需一個MapReduce就完成任務,將問題拆分後,就會變得簡單明瞭。對於全域性排序,當檔案資料較少,檔案較小時,選用第一種方法

對於大資料處理來說,第二種更為通用

並行難點在於將前面排序的好的檔案的元素個數,以及索引起始位置記錄,然後進行排序