1. 程式人生 > >第一個MapReduce案例叢集模式&本地模式

第一個MapReduce案例叢集模式&本地模式

                             第一個MapReduce案例叢集模式&Linux本地模式

記錄一下自己在開發MapReduce程式的過程。

  思考點是:如何下手,怎樣開發。

1.對於平臺的要求:環境已搭建完畢,且測試通過。

   我自己平臺是:Centos6.4 +jdk1.7+hadoop2.5.1  都是64位的

我這裡寫的是非常簡單的一種:

建立一個java工程,匯入jar包,寫三個類。分別是:Mapper類、Reducer類、Runner類

Mapper類、Reducer類就是繼承框架的虛類中的Mapper、Reducer ,實現其方法:map()、reduce()。

Runner類是描述job作業(使用那個Mapper、哪個Reducer),然後將作業提交給叢集。

關於MapReduce原理這裡就不說了。

先寫Mapper類:

Mapper類程式碼:

import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
map方法是讀取檔案將檔案中字元分割出來,進行最初劃分
/**
我們自定義的Mapper類繼承自Mapper(extends)實現其map()方法。
Mapper類含有四個引數分別代表著輸入和輸出,可以結合hadoop的內建封裝的資料型別來理解。

map()的功能:
接收鍵值對,輸出中間鍵值對,是將整個任務分解為多個小任務。其後由MapReduce框架將鍵值相同的值傳給reduce方法。
map()方法中的引數說明:
共有三個引數:LongWritable key     ---鍵值對中key
              Text         value   ---鍵值對中的value
			  Context      context ---記錄輸入的key/value,記錄key/value的運算狀態
這裡擴充套件的內容是hadoop內建的資料型別:
    BooleanWriteable :標準布林型數值
    ByteWritable :單位元組數值
	DoubleWritable: 雙位元組數值
	FloatWritable :浮點數
	IntWritable : 整型數
	LongWritable: 長整型數
	Text        :使用UTF-8格式儲存的文字
	NullWritable:當<key,value>中的key或value為空時使用
	
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
	
	/**
	  map()方法實現思路:
	  1.獲取檔案的每行內容
	  2.將這行內容切分,呼叫StringUtils的方法是split方法,分割符為“”,切分後的字元放到字串陣列內
	  3.遍歷輸出<word,1>
	*/
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {

		//獲取到一行檔案的內容
		String line = value.toString();
		//切分這一行的內容為一個單詞陣列
		String[] words = StringUtils.split(line, " ");
		//遍歷  輸出  <word,1>
		for(String word:words){
			
			context.write(new Text(word), new LongWritable(1));
			
		}
	}
}

Reducer類程式碼:

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
reduce方法得到從map方法提交的鍵值對,對具有相同鍵的進行合併,輸出。

1.定義一個Reducer類繼承自Reducer,Reducer包含有四個引數,<Text,LongWritable,Text,LongWritable>,
Reducer抽象類的四個形式引數型別指定了reduce函式的輸入和輸出型別。
在本例子中,輸入鍵是單詞,輸入值是單詞出現的次數,經過reduce()方法處理將單詞出現的次數進行疊加,
輸出單詞和單詞總數。
2.實現虛類Reducer的reduce()方法
reduce()方法的說明:
引數三個:
          Text                    key     -----單詞
	  Iterable<LongWritable>  values  -----單詞出現的次數
          Context                 context -----任務的上下文,包含整個任務的全部資訊

reduce()方法的功能是:
彙總並輸出單詞出現的總次數。
由上面我們可以得知reduce()方法接收的引數:key值為單詞,value值是迭代器,該迭代記憶體儲的是單詞出現的次數,
context負責將生成的k/v輸出。通過遍歷values,呼叫values的get()方法獲取Long值即為出現的次數,
累加後context物件呼叫其write()方法將結果輸出。
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
	// key: hello ,  values : {1,1,1,1,1.....}
	@Override
	protected void reduce(Text key, Iterable<LongWritable> values,Context context)
			throws IOException, InterruptedException {
		
		//定義一個累加計數器 定義為Long型別
		long count = 0;
		for(LongWritable value:values){
			//呼叫value的get()方法將long值取出來
			count += value.get();
		}
		//輸出<單詞:count>鍵值對
		context.write(key, new LongWritable(count));
	}
}
Runner類程式碼:
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 用來描述一個作業job(使用哪個mapper類,哪個reducer類,輸入檔案在哪,輸出結果放哪。。。。)
 * 然後提交這個job給hadoop叢集
 * @author [email protected]
 *兩個jar包,兩個型別,兩個類,兩個路徑
 */
//cn.itheima.bigdata.hadoop.mr.wordcount.WordCountRunner
public class WordCountRunner {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		//建立job物件需要conf物件,conf物件包含的資訊是:所用的jar包,
		Job wcjob = Job.getInstance(conf);
		//設定job所使用的jar包,使用Configuration物件呼叫set()方法,設定mapreduce.job.jar wcount.jar
		conf.set("mapreduce.job.jar", "wcount.jar");
		
		//設定wcjob中的資源所在的jar包  
		//呼叫job物件的setJarByClass()方法,引數是WordCountRunner.class,設定job作業中的資源所在的jar包
		wcjob.setJarByClass(WordCountRunner.class);
		
		
		//wcjob要使用哪個mapper類,job物件呼叫setMapperClass()方法,引數是WordCountMapper.class
		wcjob.setMapperClass(WordCountMapper.class);
		//wcjob要使用哪個reducer類,job物件呼叫setReducerClass()方法,引數為WordCountReducer.class
		wcjob.setReducerClass(WordCountReducer.class);
		
		//wcjob的mapper類輸出的kv資料型別
		//job物件呼叫setMapperOutputKeyClass();設定Mapper類輸出的key值的型別--Text
		//job物件呼叫setMapperOutputValueClass();設定Mapper類輸出value值的型別--LongWritable
		wcjob.setMapOutputKeyClass(Text.class);
		wcjob.setMapOutputValueClass(LongWritable.class);
		
		//wcjob的reducer類輸出的kv資料型別
		//job物件呼叫setOutputKey
		wcjob.setOutputKeyClass(Text.class);
		wcjob.setOutputValueClass(LongWritable.class);
		
		//指定要處理的原始資料所存放的路徑
		//呼叫FileInputFormat物件的setInputPath()方法,引數的檔案路徑,是設定的源資料路徑,當此處為叢集的路徑是就是跑在叢集上的程式,
		//如果設定在當前機器的路徑,就是本地模式
		FileInputFormat.setInputPaths(wcjob, "hdfs://hadoop01:9000/wc/srcdata");
	
		//指定處理之後的結果輸出到哪個路徑,注意此時應當在路徑應當是差不多的
		FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hadoop:9000/wc/output"));
		//呼叫job物件的waitForCompletion()方法,提交作業。			
		boolean res = wcjob.waitForCompletion(true);
		System.exit(res?0:1);
	}
}

上面的程式碼是執行在叢集上的,如果執行在本地是將檔案輸入路徑和輸出路徑寫為本地的路徑即可。對於其他的不需要改變。

在建立完成這三個類後,開啟Linux終端:

1.選中建立完成的工程,點選右鍵,選擇Export,以jar包的格式匯出該檔案。

2.輸入命令:jps  檢視此時程序確定有沒有啟動hadoop、yarn,若未啟動hadoop則輸入命令:start-dfs.sh、start-yarn.sh啟動即可

3.輸入命令:hadoop jar wcount.jar Runner類的全類名

這樣就可以跑起來了。在跑完程式之後

4.輸入命令:hadoop fs -ls /wc/output  ----程式執行結束後輸出檔案的位置,確定檔案正常輸出

5.輸入命令:hadoop fs -cat /wc/output/xxxx  ----檢視該檔案

本地執行模式就是直接執行Runner的main方法產生一個RunJar的客戶端,會持有本地提交器,持有一個localsumbitter,向本地MR模擬器提交任務。在Linux本地實現的。


遇到的異常:

java.lang.Exception: java.io.IOException: Mkdirs failed to create file:/home/my/_temporary/0/_temporary/attempt_local2063472742_0001_r_000000_0
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.io.IOException: Mkdirs failed to create file:/home/my/_temporary/0/_temporary/attempt_local2063472742_0001_r_000000_0
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:132)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.<init>(ReduceTask.java:540)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:614)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

解決辦法:

輸出的結果檔案應當放到一起同一目錄,即如我的程式碼中,這個異常就是我的輸出的目錄與我的原始檔的目錄不一致造成的。

對於如何將本地模式的基礎上修改為叢集模式的方法是新增四個xml檔案:

      core-site.xml   hdfs-site.xml   mapred-site.xml   yarn-site.xml   

總結 MR Job的幾種執行模式  :

  1.在伺服器上執行yarn叢集模式:

      在Eclipse中開發好mr程式(在Windows或Linux中都可),然後打包成jar包,上傳到伺服器。

        執行命令 hadoop jar jar包名   Runner類的全類名

  2.在Linux中Eclipse開發平臺中直接啟動Runner類執行main方法,這種方式既可以為yarn叢集模式,也可以為本地模式:

關鍵點是:

       取決於一個配置引數:mapreduce.framework.name=local(yarn) 

       當為local時則執行為本地模式;當為yarn時則為執行yarn叢集模式。

 ----yarn模式:

      a.將mr工程匯出為jar包,放在工程目錄下。

      b.在main方法中,加入一個配置引數,conf.set("mapreduce.job.jar","mr匯出的jar包名")

----本地模式:

     在Eclipse中將mr工程匯出為jar包,拷貝到當前目錄下,直接執行main方法。

 3.在Windows的Eclipse中執行本地模式的步驟:

     a.在Windows中存放hadoop的安裝包,並配置其環境變數。

     b.根據Windows的版本,對應的替換掉hadoop安裝包中的本地庫(bin、lib)         

     c.mr工程中不需要設定mapreduce.framework.name。

 4.在Windows的Eclipse中執行yarn叢集模式的步驟:

   此時只新增該單一的引數,設定conf.set("mapreduce.job.jar","mr匯出的jar包名")是不行的,因為在Windows中還需要其他的系統變數如:JAVA_HOME 等。因為在將job提交到伺服器Linux中會有不相容的問題出現,比如環境變數%與Linux中的$的區別。此時我們可以修改yarn-Runner方法的原始碼。此處修改較多。不建議使用。

************在後期時遇到的一些異常***********************

1.connection refused ---沒寫埠號

2.在第一次提交作業後一旦任務執行成功,必須將輸出的檔案刪除,這是使用Hadoop shell命令:

   hadoop fs -rm -r /output

3.對於將工程匯出jar包,jar包的位置,並沒有特殊的設定,就是jar包在那兒,那麼執行hadoop jar  命令時必須在該目錄下。

4.對於jar包實在安裝包中找,按住ctr鍵檢視的原始碼是需要再下載的,並新增路徑