1. 程式人生 > >大數據學習——MapReduce配置及java代碼實現wordcount算法

大數據學習——MapReduce配置及java代碼實現wordcount算法

鍵值 example nds clas spl key lru 這樣的 java_home

---恢復內容開始---

配置MapReduce需要在之前配置的基礎上配置兩個xml文件一個是yarn-site.xml一個是mapred-site.xml,在之前配置的hadoop的etc目錄下可以找到

技術分享圖片

下邊進行配置過程首先

1、配置yarn-site.xml

<configuration>

<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>192.168.98.141</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

</configuration>

這裏需要解釋一下,yarn的基本思想是將JobTracker的兩個主要功能(資源管理和作業調度/監控)分離,主要方法是創建一個全局的ResourceManager(RM)和若幹個針對應用程序的ApplicationMaster(AM)。這裏的應用程序是指傳統的MapReduce作業或作業的DAG,其實yarn類似理解成tomcat, 在web項目上有tomcat這個平臺。yarn也是這樣的,YARN 分層結構的本質是 ResourceManager。這個實體控制整個集群並管理應用程序向基礎計算資源的分配。ResourceMannager將這些資源分配給NodeManager(yarn代理結點)。

上圖第一個value配置屬性是對應自己系統配置ip的主機號

配置mapred-site.xml

<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

這樣就是配置完成了

打開虛擬機,開啟yarn服務,輸入jps查看是否具有ResourceManager NodeManager兩部分。有則配置成功。

技術分享圖片

虛擬機下運行wordcount算法

進入hadoop——>share——>hadoop——mapreduce——>執行 hadoop-mapreduce-examples-2.7.3.jar中的wordcount算法

技術分享圖片

技術分享圖片

這裏需要註意,wordcount後的目錄 第一個為統計字符文件的目錄,第二個為輸出目錄,輸出的目錄必須是之前不存在的否則會報錯

介紹下MapReduce的工作流程,粗略劃分可以分為以下幾個步驟

1、代碼編寫

2、作業配置

3、提交作業

4、初始化作業

5、分配任務

6、執行任務

7、更新任務和狀態

MapReduce在處理數據時都是通過鍵值對的形式進行處理數據

1、MapReduce框架是通過Map讀取文件內容,解析成key、value對文件的每一行,解析成key、value對<key,value>每一個鍵值對調用一次map函數,寫自己的邏輯,對輸入的key、value處理轉換成新的key、value輸出,將輸出的中間鍵值對傳給Reduce;

2、在Reduce之前,有一個shuffle的過程對多個map任務的輸出進行合並、排序

3、寫Reduce函數自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出

4、把Reduce的輸出保存到文件中

上述內容是我學完之後對MapReduce的工作流程的一個理解,下邊通過java代碼實現wordcount算法。

首先建立一個maven項目,在pom.xml中引入以下依賴

</dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>

建立Map類

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    /*
     * 其中KEYIN(字節偏移量) VLAUE (獲取的數據類型)KEYOUT(輸出數據類型)VALUE(輸出數據值類型)
     */
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        String line=value.toString();//按行獲取文件內容
        String[] words=line.split(" ");//通過空格將每行內容進行分片
        for (String word : words) {
            context.write(new Text(word.trim()), new IntWritable(1));//將map函數的輸出溢寫到內存中的環形緩沖區
        }
    }
}

建立Reduce類

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    /*
     * Key 為map輸出Key的類型 ,叠代器類型對應map傳來的value值
     * 叠代器的使用是為了實現map中每個值都進行一次處理
     * 
     */
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        int sum=0;
        //數據處理
        for (IntWritable intWritable : values) {
            sum+=intWritable.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

建立Job類

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyJob extends Configured implements Tool{
    
    public static void main(String[] args) throws Exception {
        MyJob myJob=new MyJob();
        ToolRunner.run(myJob, null);
    }
    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf=new Configuration();//創建配置對象
        conf.set("fs.defaultFS", "hdfs://192.168.80.142:9000");
        //分配任務
        Job job=Job.getInstance(conf);
        job.setJarByClass(MyJob.class);
        job.setMapperClass(MyMap.class);
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //建立文件輸入輸出流
        FileInputFormat.addInputPath(job, new Path("/hadoop/hadoop.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/hadoop/out"));
        job.waitForCompletion(true);
        return 0;
    }

}

---恢復內容結束---

大數據學習——MapReduce配置及java代碼實現wordcount算法