1. 程式人生 > >在eclipse中實現MapReduce

在eclipse中實現MapReduce

1.準備環境

  • Windows下的Hadoop的mapred-site.xml 和 yarn-site.xml配置檔案更新為和虛擬機器中的一樣。
  • 將mapred-site.xml和yarn-site.xml配置檔案拷貝到工程下。
  • 新增依賴包。

2.執行模式

  • 本地執行(在本地的eclipse中啟動多個執行緒來模擬map task,和reduce task執行)。主要用於測試環境。  需要修改mapred-site.xml配置檔案中的 mapreduce.framework.name,修改為local。
  • 提交到叢集中執行。主要用於生產環境。  需要先將工程打成一個jar包,拷貝到虛擬機器中。使用hadoop jar命令執行。
  • 在本機上的eclipse中直接提交任務到叢集中執行。  需要先將工程達成一個jar包,放在本地一個地方。比如d盤下。然後在程式中設定job.setJar(“jar包的路徑”)。最後還要修改mapred-site.xml配置檔案為
			<property>
			     <name>mapreduce.framework.name</name>
			     <value>yarn</value>
			</property>
			<property>
			     <name>mapreduce.app-submission.cross-platform</name>
			     <value>true</value>
			 </property>

3.一個簡單的wordcount例項,用於統計一篇文章中某個單詞出現的次數

  • 主函式
public class WC {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		//設定配置物件
		Configuration conf = new Configuration(true);
		
		//設定一個job物件
		Job job = Job.getInstance(conf);
		
		//設定當前main函式所在類
		job.setJarByClass(WC.class);
		
		//需要使用到第三種執行模式時要設定本地jar包的位置
		job.setJar("d:/wc.jar");

		//設定輸入路徑
		FileInputFormat.setInputPaths(job, "/input/wc");

		//設定輸出路徑
		Path outputPath = new Path("/output/");
		FileSystem fs = outputPath.getFileSystem(conf);
		//判斷這個輸出路徑是否存在,存在就把它刪除
		if(fs.exists(outputPath)){
			fs.delete(outputPath,true);
		}
		FileOutputFormat.setOutputPath(job, outputPath);

		//設定Map class
		job.setMapperClass(WCMapper.class);
		
		//設定map輸出key、value的型別 key是單詞,value是1
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		//設定reduce class
		job.setReducerClass(WCReduce.class);

		//設定reduce task的個數
		job.setNumReduceTasks(2);

		//列印資訊
		job.waitForCompletion(true);
	}
}
  • Map class
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	Text myKey = new Text();
	IntWritable myValue = new IntWritable(1);
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//根據空格將單詞切割出來
		String[] words = StringUtils.split(value.toString(), ' ');
		for (String word : words) {
			myKey.set(word);
			context.write(myKey,myValue);
		}
	}
}
  • Reduce class
public class WCReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		 int sum = 0;
		 for (IntWritable value : values) {
			sum += value.get();
		}
		 context.write(key, new IntWritable(sum));
	}
}