1. 程式人生 > >大資料(hadoop-mapreduce案例講解)

大資料(hadoop-mapreduce案例講解)

package com.vip;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class MapReduceCaseAvg extends Configured implements Tool{

	public static class AvgMapper extends Mapper<Object, Text, Text, IntWritable>{
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			//獲取檔案內容
			String content = value.toString() ;
			//字串切分
			StringTokenizer st = new StringTokenizer(content) ;
			while(st.hasMoreElements()){
				String strName = st.nextToken() ;	//學員姓名
				String strSorce = st.nextToken() ;	//學員成績
				//輸出key,value
				context.write(new Text(strName),  new IntWritable(Integer.parseInt(strSorce)));
			}
		}
	}
	
	public static class AvgReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
		//<張三 ,{98,89,79}>
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			//平均值,即使將所有的成績相加除以科目數
			int sum = 0 ;	//總成績
			int num = 0 ;	//總科目
			for (IntWritable score : values) {
				sum += score.get() ;	//累加每門課得成績
				num ++ ;
			}
			context.write(key,  new IntWritable((int)sum/num));
		}
	}
	
	
	@Override
	public int run(String[] args) throws Exception {
		//任務和引數
		Job job = Job.getInstance(getConf(), "avg mr") ;
		job.setJarByClass(MapReduceCaseAvg.class);
		
		/*設定map方法的類*/
		job.setMapperClass(AvgMapper.class);
		job.setReducerClass(AvgReducer.class);
		
		/*設定輸出的key和value的型別*/
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		/*設定輸入輸出引數*/
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		/*提交作業到叢集並等待任務完成*/
		boolean isSuccess = job.waitForCompletion(true);
		
		return isSuccess ? 0 : 1 ;
	}
	
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new MapReduceCaseAvg(), args) ;
		System.exit(res);
	}
}

 

package com.vip;

import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapReduceCaseFilte extends Configured implements Tool {
	
	public static class FilterMapper extends Mapper<Object, Text, NullWritable, Text>{
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			//以空格切分欄位
			String[] strSrc = value.toString().split(" ");
			//拼接字串
			String strDst = strSrc[0] + " " + strSrc[1] + " " + strSrc[2] + " " + strSrc[6] ;
			context.write(NullWritable.get(), new Text(strDst));
		}
	}
	
	
	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf(), "mrfilter") ;
		job.setJarByClass(MapReduceCaseFilte.class);
		
		/*設定map方法的類*/
		job.setMapperClass(FilterMapper.class);
		
		/*設定輸出的key和value的型別*/
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);
		
		/*設定輸入輸出引數*/
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		/*提交作業到叢集並等待任務完成*/
		boolean isSuccess = job.waitForCompletion(true);
		
		return isSuccess ? 0 : 1 ;
	}

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new MapReduceCaseFilte(), args) ;
		System.exit(res);
	}
}

// cat act  
// tar art

//<act,{cat,tac,cta}>





 

package com.vip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class MapReduceCaseWords extends Configured implements Tool{

	
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration() ;
		//刪除已經存在的輸出目錄
		Path mypath = new Path(args[1]) ;
		FileSystem hdfs = mypath.getFileSystem(conf);
		if(hdfs.isDirectory(mypath)){
			hdfs.delete(mypath, true) ;
		}
		
		//設定任務資訊
		Job job = Job.getInstance(conf, "words mr") ;
		job.setJarByClass(MapReduceCaseWords.class);
		
		/*設定map方法的類*/
		job.setMapperClass(WordsMapper.class);
		
		job.setReducerClass(WordsReducer.class);
		
		/*設定輸出的key和value的型別*/
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		/*設定輸入輸出引數*/
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		/*提交作業到叢集並等待任務完成*/
		boolean isSuccess = job.waitForCompletion(true);
		
		return isSuccess ? 0 : 1 ;
	}
	
	public static void main(String[] args) throws Exception {
		String[] args0 = {"hdfs://192.168.153.111:9000/input5",
				"hdfs://192.168.153.111:9000/output12"} ;
		int res = ToolRunner.run(new MapReduceCaseWords(), args0) ;
		System.exit(res);
	}

}

 

package com.vip;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordsMapper extends Mapper<Object, Text, Text, Text>{
	private Text keyText = new Text() ;
	private Text valueText = new Text() ;
	
	@Override
	protected void map(Object key, Text value, Context context)
			throws IOException, InterruptedException {
		String word = value.toString() ;
		char[] wordChars = word.toCharArray();	//單詞轉化為字元陣列
		Arrays.sort(wordChars); 				//對字元陣列進行排序
		String sword = new String(wordChars) ;	//字元陣列在轉化為字串
		keyText.set(sword);              		//設定輸出key
		valueText.set(word);  					//設定輸出得value得值
		context.write(keyText, valueText);		//map輸出
	}
}

 

package com.vip;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordsReducer extends Reducer<Text, Text, Text, Text>{
	private Text outputKey = new Text() ;	//輸出key
	private Text outputValue = new Text() ;	//輸出的value
	
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		String output ="" ;
		//對相同字母組成的單詞,使用~符號進行拼接
		for (Text word : values) {
			if(!output.equals("")){
				output = output + "~" ;
			}
			output = output + word.toString() ;
		}
		//輸出有兩個單詞或以上的結果
		StringTokenizer outputTokenize = new StringTokenizer(output, "~") ;
		if(outputTokenize.countTokens() >= 2){
			output = output.replaceAll("~", ",") ;
			outputKey.set(key.toString()); 			//設定key的值
			outputValue.set(output);				//設定value的值
			context.write(outputKey, outputValue);	//輸出
		}
	}
	
	
}