1. 程式人生 > >大資料之八 hadoop MapReduce-WordCount

大資料之八 hadoop MapReduce-WordCount

前兩篇中,我們瞭解了MapReduce的執行流程及其架構實現,今天我們就在本地通過經典例項WordCount來了解一下MapReduce的程式設計實現

叢集配置

  stop-dfs.sh
  1. 配置mapred-site.xml檔案
  <!-- MapReduce的架構體系,這裡使用MapReduceV2,即YARN -->
  <property>
          <name>mapreduce.framework.name</name>
          <value>yarn</value>
  </property>
  1. 配置yarn-site.xml檔案
    <!-- NodeManager上執行的附屬服務。需配置成mapreduce_shuffle,才可執行MapReduce程式 -->
 	<property>
           <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
    </property>
    <property>
            <name>yarn.resourcemanager.ha.enabled</name>
            <value>true</value>			<!-- 啟用RM的高可用 -->
    </property>
    <property>
            <name>yarn.resourcemanager.cluster-id</name>
            <value>cluster1</value>				<!-- YARN對外提供的服務的id -->
    </property>
    <property>
            <name>yarn.resourcemanager.ha.rm-ids</name>
            <value>rm1,rm2</value>			<!-- 實現RM高可用的節點id -->
    </property>
    <property>
            <name>yarn.resourcemanager.hostname.rm1</name>
            <value>node01</value>				<!-- rm1對應的真實節點 -->
    </property>
    <property>
            <name>yarn.resourcemanager.hostname.rm2</name>
            <value>node02</value>			  <!-- rm2對應的真實節點 -->
    </property>

	<!--配置三臺zookeeper的位置資訊 -->
    <property>
           <name>yarn.resourcemanager.zk-address</name>
           <value>node02:2181,node03:2181,node04:2181</value>
    </property>
  1. 將配置完成的hadoop安裝包傳送到其他節點,覆蓋之前的安裝包,也可以先刪除之前的安裝包
  scp -r hadoop-2.6.5 [email protected]:/opt/zgl/
  scp -r hadoop-2.6.5 [email protected]:/opt/zgl
  scp -r hadoop-2.6.5 [email protected]:/opt/zgl/
  1. 在node01 或 node02 上啟動HDFS
  start-dfs.sh
  1. 在node01 或 node02 上啟動YARN,這裡我們在node01上啟動
  satrt-yarn.sh

因為 HDFS 和 YARN 都是hadoop的元件之一,這裡步驟6,7可以用 start-sll.sh

代替

  1. 啟動後使用jps檢視,成功的話node01節點上會啟動ResourceManager程序,node02,node03,node04上會啟動NodeManager程序。
[[email protected] ~]# jps
2390 NameNode
2726 DFSZKFailoverController
3111 Jps
2839 ResourceManager
2584 JournalNode

[[email protected] ~]# jps
2834 Jps
2706 NodeManager
2487 JournalNode
2393 DataNode
2571 DFSZKFailoverController
2268 QuorumPeerMain
2333 NameNode
  1. 這裡我們會發現Node02上並沒有啟動standby狀態的RM,這是因為系統預設就是不啟動的,這裡我們可以手動將其啟動
  yarn-daemon.sh start resourcemanager

 [[email protected] ~]# jps
 3008 ResourceManager
 2706 NodeManager
 2487 JournalNode
 2393 DataNode
 2571 DFSZKFailoverController
 2268 QuorumPeerMain
 2333 NameNode
 3055 Jps
  1. YARN也提供Web UI來顯示job相關資訊,瀏覽器輸入node01:8088。後面我們執行WordCount時,就可以在這裡檢視job狀態 job web UI

eclipse配置

開啟Windows->Show View中的Map/Reduce Locations,右鍵選擇Edit Hadoop location… edit 在下圖對MapReduce進行配置(確保node02上已啟動RM) MR Master 新建一個java專案,將hadoop安裝包中share\hadoop目錄下 common,hdfs,tools,yarn,mapreduce五個資料夾中的jar包及其下lib資料夾中的jar包整合匯入專案並build path

將mapred-site.xml,yarn-site.xml檔案拷貝到source資料夾中 source 接下來,我們就可以開始寫程式碼了

WordCount

我們先來回顧一下MapReduce處理大資料集的過程 MR處理流程 做一些準備工作 首先我們在本地建立兩個文字檔案file1.txt和file2.txt,使file1.txt內容為"Hello World\nBye World",file2.txt的內容為"Hello Hadoop\nBye Hadoop"。 在HDFS 上建立輸入資料夾 input,上傳本地檔案到叢集的input 目錄下

再來分析一下WordCount的執行過程

  1. 將檔案拆分成splits,由於測試用的檔案較小,所以我們寫兩個檔案,每個檔案為一個split,並將檔案按行分割形成<key,value>對。這一步由MapReduce框架自動完成,其中偏移量(即key值)包括了回車所佔的位元組數 split
  2. 將分割好的<key,value>對交給使用者定義的map方法進行處理,生成新的<k2,v2>對 map
  3. 得到map方法輸出的<key,value>對後,Mapper會將它們按照key值進行排序分組,並執行Combine,將key至相同value值累加,得到Mapper的最終輸出結果 shuff write
  4. Reducer先對從Mapper接收的資料進行排序,再交由使用者自定義的reduce方法進行處理,得到新的<k3,v3>對,並作為WordCount的輸出結果 reduce 分析完執行過程,我們來看一下程式碼實現。 要程式設計實現MapReduce計算,只需要繼承Mapper類實現其map()方法,繼承Reduce類實現其reduce()方法,並在main()函式中對job進行設定

序列化 因為HDFS涉及到序列化的問題,Hadoop的基本資料型別都實現了一個Writable介面,而實現了這個介面的型別都支援序列化。在Mapper類和Reducer類中都使用Hadoop自帶的基本資料型別 序列化

map()

/**
     * @param KEYIN
     *            →k1 表示每一行的起始位置(偏移量offset)	LongWritable
     * @param VALUEIN
     *            →v1 表示每一行的文字內容		Text
     * @param KEYOUT
     *            →k2 表示每一行中的每個單詞		Text
     * @param VALUEOUT
     *            →v2 表示每一行中的每個單詞的出現次數,固定值為1		IntWritable
     */
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	Text myKey = new Text();	//設定Text型別的物件,用來封裝KEYOUT,即k2
	IntWritable myValue = new IntWritable(1);		//設定IntWritable型別的物件並固定其值為1,即v2
	
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		System.out.println(key+"=========="+value);		//輸出<k1,v1>值,這裡的key為offset
		
		//使用hadoop自帶的工具類StringUtil進行分詞(按空格進行切割),相當於value.toString().split(" ")
		String[] words = StringUtils.split(value.toString(), ' ');
		
		//遍歷分完的詞,即k2,產生<k2,v2>,使用context進行輸出。context是個上下文物件
		for (String word : words) {
			myKey.set(word);
			context.write(myKey,myValue);
		}
	}
}

reduce()

 /**
 * KEYIN     即K2     表示行中出現的單詞 	Text
 * VALUEIN     即V2     表示出現的單詞的次數 	IntWritable
 * KEYOUT     即K3     表示行中出現的不同單詞	Text
 * VALUEOUT 即V3     表示行中出現的不同單詞的總次數		IntWritable
*/
public class WCReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		//因為reduce處理的是“相同”key的一組資料,這裡的key是真的相同,所以只要遍歷v2累加計數即可
		 int sum = 0;
		 for (IntWritable value : values) {
			sum += value.get();
		}
		//輸出<k3,v3>
		context.write(key, new IntWritable(sum));
	}
}

main()

public class WC {
	public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {
	
		//讀取MapReduce配置資訊,包括HDFS。設定為true則從本地專案source資料夾中讀取配置檔案
		Configuration conf = new Configuration(true);
		
		//構建一個job並指定job名稱		
		Job job = Job.getInstance(conf,"WordCount");
		
		//設定當前main函式所在類
		job.setJarByClass(WC.class);
		
		//設定本地jar包位置 (第三種執行模式使用,另兩種不用配置。執行模式在下面介紹)
		job.setJar("d:/mapreduce/wc.jar");
		
		//設定輸入路徑	args[0]表示引數手動輸入
		FileInputFormat.setInputPaths(job, args[0]);
		
		//設定輸出路徑	這裡路徑寫死,先判斷路徑下是否有檔案,有則刪除
		Path outputPath = new Path("/output/wordcount");
		FileSystem fs = outputPath.getFileSystem(conf);	
		if(fs.exists(outputPath)){
			fs.delete(outputPath,true);
		}
		FileOutputFormat.setOutputPath(job, outputPath);
		
		//設定Map class
		job.setMapperClass(WCMapper.class);
		
		//設定map輸出key、value的型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		//設定reduce class
		job.setReducerClass(WCReduce.class);

		//設定reduce輸出key、value的型別
		 job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
		
		//設定reduce task的個數 
		job.setNumReduceTasks(2);
		
		//提交作業
		job.waitForCompletion(true);
	}
}

main函式中主要做了如下幾件事: 一是構建job,指定main函式所在類,指定輸入、輸出目錄; 二是指定自定義的Mapper類和Reducer類及其對應輸入輸出key,value的型別; 三是提交作業

接下來我們就可以執行程式碼,執行前我們先來看幾種執行模式 執行模式 1、local(在本地的eclipse上啟動多個執行緒來模擬map task,reduce task執行,並未啟動叢集,所以YARN的job web UI頁面不會有任務顯示) ,用於測試環境 修改mapred-site.xml 中的mapreduce.framework.name,設定為local

	<property>
   			<name>mapreduce.framework.name</name>
            <value>local</value>
    </property>

2、提交到叢集中執行,用於生產環境 在本地將程式碼打成jar包,提交到叢集。在叢集上執行hadoop jar + jar包路徑 + main()所在類的全類名 + 引數執行

[[email protected] mapreduce]# ls
wc.jar
[[email protected] mapreduce]# hadoop jar  ./wc.jar  com.hpe.mr.wc.WC  /input/*

3、在本機上的eclipse中直接提交任務到叢集中執行,這裡我們使用這種方式 (1) 修改配置檔案 mapred-site.xml

	<property>
      		<name>mapreduce.framework.name</name>
            <value>yarn</value>
     </property>
     <property>
	   		<name>mapreduce.app-submission.cross-platform</name>
	   		<value>true</value>			<!-- 跨平臺提交開啟 -->
	</property>

(2) 將本地Application打成jar包,放在window下某一個位置,這裡放在D盤的mapreduce資料夾中 export jar file finish ok

(3)右擊專案,Run as --> Run Configurations Run configurations

(4)點選Arguments,在Program arguments中填寫引數,即我們在main()中寫的args[0]的值,點選執行 arduments

(5)我們在console視窗可以看到我們在map中定義的<k1,v1>的輸出

 0==========hello hadoop
 13==========bye hadoop
 0==========hello world
 12==========bye world`

(6)在node01:8088頁面可以看到任務狀態 TASK (7)在HDFS上檢視output/wordcount HDFS 因為在main()中我們設定啟動兩個reduce task,所以最後輸出兩個檔案part-r-00000和part-r-00001。 這裡 bye world , world後面我不小心多加了個 ` ,所以最後結果是這樣