大數據學習——MapReduce配置及java代碼實現wordcount算法
---恢復內容開始---
配置MapReduce需要在之前配置的基礎上配置兩個xml文件一個是yarn-site.xml一個是mapred-site.xml,在之前配置的hadoop的etc目錄下可以找到
下邊進行配置過程首先
1、配置yarn-site.xml
<configuration> <!-- Site specific YARN configuration properties --> <property> <name>yarn.resourcemanager.hostname</name> <value>192.168.98.141</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
這裏需要解釋一下,yarn的基本思想是將JobTracker的兩個主要功能(資源管理和作業調度/監控)分離,主要方法是創建一個全局的ResourceManager(RM)和若幹個針對應用程序的ApplicationMaster(AM)。這裏的應用程序是指傳統的MapReduce作業或作業的DAG,其實yarn類似理解成tomcat, 在web項目上有tomcat這個平臺。yarn也是這樣的,YARN 分層結構的本質是 ResourceManager。這個實體控制整個集群並管理應用程序向基礎計算資源的分配。ResourceMannager將這些資源分配給NodeManager(yarn代理結點)。
上圖第一個value配置屬性是對應自己系統配置ip的主機號
配置mapred-site.xml
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
這樣就是配置完成了
打開虛擬機,開啟yarn服務,輸入jps查看是否具有ResourceManager NodeManager兩部分。有則配置成功。
虛擬機下運行wordcount算法
進入hadoop——>share——>hadoop——mapreduce——>執行 hadoop-mapreduce-examples-2.7.3.jar中的wordcount算法
這裏需要註意,wordcount後的目錄 第一個為統計字符文件的目錄,第二個為輸出目錄,輸出的目錄必須是之前不存在的否則會報錯
介紹下MapReduce的工作流程,粗略劃分可以分為以下幾個步驟
1、代碼編寫
2、作業配置
3、提交作業
4、初始化作業
5、分配任務
6、執行任務
7、更新任務和狀態
MapReduce在處理數據時都是通過鍵值對的形式進行處理數據
1、MapReduce框架是通過Map讀取文件內容,解析成key、value對文件的每一行,解析成key、value對<key,value>每一個鍵值對調用一次map函數,寫自己的邏輯,對輸入的key、value處理轉換成新的key、value輸出,將輸出的中間鍵值對傳給Reduce;
2、在Reduce之前,有一個shuffle的過程對多個map任務的輸出進行合並、排序
3、寫Reduce函數自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出
4、把Reduce的輸出保存到文件中
上述內容是我學完之後對MapReduce的工作流程的一個理解,下邊通過java代碼實現wordcount算法。
首先建立一個maven項目,在pom.xml中引入以下依賴
</dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency>
建立Map類
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMap extends Mapper<LongWritable, Text, Text, IntWritable> { @Override /* * 其中KEYIN(字節偏移量) VLAUE (獲取的數據類型)KEYOUT(輸出數據類型)VALUE(輸出數據值類型) */ protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String line=value.toString();//按行獲取文件內容 String[] words=line.split(" ");//通過空格將每行內容進行分片 for (String word : words) { context.write(new Text(word.trim()), new IntWritable(1));//將map函數的輸出溢寫到內存中的環形緩沖區 } } }
建立Reduce類
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> { @Override /* * Key 為map輸出Key的類型 ,叠代器類型對應map傳來的value值 * 叠代器的使用是為了實現map中每個值都進行一次處理 * */ protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int sum=0; //數據處理 for (IntWritable intWritable : values) { sum+=intWritable.get(); } context.write(key, new IntWritable(sum)); } }
建立Job類
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyJob extends Configured implements Tool{ public static void main(String[] args) throws Exception { MyJob myJob=new MyJob(); ToolRunner.run(myJob, null); } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf=new Configuration();//創建配置對象 conf.set("fs.defaultFS", "hdfs://192.168.80.142:9000"); //分配任務 Job job=Job.getInstance(conf); job.setJarByClass(MyJob.class); job.setMapperClass(MyMap.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //建立文件輸入輸出流 FileInputFormat.addInputPath(job, new Path("/hadoop/hadoop.txt")); FileOutputFormat.setOutputPath(job, new Path("/hadoop/out")); job.waitForCompletion(true); return 0; } }
---恢復內容結束---
大數據學習——MapReduce配置及java代碼實現wordcount算法