1. 程式人生 > >編寫MapReduce程式(簡單的電話被呼叫分析程式)

編寫MapReduce程式(簡單的電話被呼叫分析程式)

由於Hadoop 2.2.0目前還沒有好用的Eclipse外掛,目前使用Eclipse上編寫程式碼,而後放到Hadoop環境執行的形式。

準備工作:

1、搭建Hadoop環境,建立專案,專案的BuildPath中新增所有Hadoop中的jar包;

2、構造資料集:每一行資料兩個號碼組成,呼叫號和被呼叫號,生成隨機測試資料,將生成的檔案放入hdfs中;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Random;

public class GenerateTestData {
	public static void writeToFile(String fileName) throws Exception{
		 OutputStream out = new FileOutputStream(new File(fileName));
		 BufferedOutputStream bo = new BufferedOutputStream(out);
		 
		 Random rd1 = new Random();
		 
		 for(int i=0; i<10000; i++){
			 int j=0;
			 StringBuffer sb = new StringBuffer("");
			 sb.append(1);
			 for(j=1;j<9;j++){
				 sb.append(rd1.nextInt(10));
				 //bo.write(rd1.nextInt(10));
			 }
			 sb.append(" ");
			 switch(rd1.nextInt(10)){
			    case 1:
			    	sb.append("10086");
			    	break;
			    case 2:
			    	sb.append("110");
			    	break;
			    case 3:
			    	sb.append("120");
			    	break;
			    case 4:
			    	sb.append("119");
			    	break;
			    case 5:
			    	sb.append("114");
			    	break;
			    case 6:
			    	sb.append("17951");
			    	break;
			    case 7:
			    	sb.append("10010");
			    	break;
			    case 8:
			    	sb.append("13323567897");
			    	break;
			    default:
			    	sb.append(1);
					 for(j=1;j<9;j++){
						 sb.append(rd1.nextInt(10));
						 //bo.write(rd1.nextInt(10));
					 }
			    	break;
			    
			 }
			
			 
			 sb.append("\r\n");
			 bo.write(sb.toString().getBytes());
		 }
	}
	
	public static void main(String[] args) {
			 try {
				writeToFile("d://helloa.txt");
				System.out.println("finish!");
			} catch (Exception e) {
				e.printStackTrace();
			}

	}

}

 

 

MapReduce程式如下,目前編寫的程式參考自Hadoop權威指南,用的還是老版本的API:

 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FirstTest extends Configured implements Tool{

	enum Counter{
		LINESKIP,
	}
	
	public static class Map extends Mapper<LongWritable, Text, Text, Text>{
		@Override
		public void map(LongWritable key, Text value, Context context)
			      throws IOException, InterruptedException{
			String line = value.toString();
			
			try {
				String []arr = line.split(" ");
				context.write(new Text(arr[1]), new Text(arr[0]));
			} catch (Exception e) {
				context.getCounter(Counter.LINESKIP).increment(1);
			}
		}
	}
	
	
	public static class Reduce extends Reducer<Text,Text,Text,Text>{
		@Override
		public void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException{
			String out = "";
			for(Text t:values){
				out += t.toString()+"|";
			}
			context.write(key, new Text(out));
		}
	}
	
	@Override
	public int run(String[] args) throws Exception {
		
		Configuration conf = getConf();
		
		Job job = new Job(conf, "First Map-Reduce Program");
	    job.setJarByClass(getClass());

	    FileInputFormat.addInputPath(job, new Path(args[0]));
	    FileOutputFormat.setOutputPath(job, new Path(args[1]));
	    
	    job.setMapperClass(Map.class);
	    job.setReducerClass(Reduce.class);
	    
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(Text.class);
	    
	    job.waitForCompletion(true);
		return job.isSuccessful()?0:1;
	}
	
	public static void main(String[] args) throws Exception {
	    int exitCode = ToolRunner.run(new Configuration(),new FirstTest(), args);
	    System.exit(exitCode);
		
	  }

}


在linux下編譯構造jar檔案後在hadoop環境執行:
hadoop jar FirstTest.jar /input/helloa.txt /output

 

注意出現的問題:

1、由於是在Eclipse編寫的程式,加了package,但是在Linux下打包時直接使用了jar cvfm abc.jar ..的命令,導致hadoop執行jar包時總提示找不到main class;

2、在linux下編譯時,FirstTest.java檔案是放在了HADOOP_CLASSPATH下編譯,在此目錄執行hadoop jar FirstTest.jar /input/helloa.txt /output時提示FirstTest&Map類找不著,將生成的FirstTest.jar放入其他目錄後執行正常。