1. 程式人生 > >hadoop中使用MapReduce程式設計例項

hadoop中使用MapReduce程式設計例項

從網上搜到的一篇hadoop的程式設計例項,對於初學者真是幫助太大了,看過以後對MapReduce程式設計基本有了大概的瞭解。看了以後受益匪淺啊,趕緊儲存起來。

1、資料去重

   "資料去重"主要是為了掌握和利用並行化思想來對資料進行有意義篩選統計大資料集上的資料種類個數從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及資料去重。下面就進入這個例項的MapReduce程式設計。

1.1 例項描述

  對資料檔案中的資料進行去重。資料檔案中的每行都是一個數據。

  樣例輸入如下所示:

     1)file1:

2012-3-1 a

2012-3-2 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-7 c

2012-3-3 c

     2)file2:

2012-3-1 b

2012-3-2 a

2012-3-3 b

2012-3-4 d

2012-3-5 a

2012-3-6 c

2012-3-7 d

2012-3-3 c

     樣例輸出如下所示:

2012-3-1 a

2012-3-1 b

2012-3-2 a

2012-3-2 b

2012-3-3 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-6 c

2012-3-7 c

2012-3-7 d

1.2 設計思路

  資料去重最終目標是讓原始資料出現次數超過一次資料輸出檔案出現一次我們自然而然會想到將同一個資料的所有記錄都交給一臺reduce機器,無論這個資料出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以資料作為key,而對value-list則沒有要求。當reduce接收到一個<key,value-list>時就直接將key複製到輸出的key中,並將value設定成空值

  在MapReduce流程中,map的輸出<key,value>經過shuffle過程聚整合<key,value- list>後會交給reduce。所以從設計好的reduce輸入可以反推出map的輸出key應為資料,value任意。繼續反推,map輸出數 據的key為資料,而在這個例項中每個資料代表輸入檔案中的一行內容,所以map階段要完成的任務就是在採用Hadoop預設的作業輸入方式之後,將 value設定為key,並直接輸出(輸出中的value任意)。map中的結果經過shuffle過程之後交給reduce。reduce階段不會管每 個key有多少個value,它直接將輸入的key複製為輸出的key,並輸出就可以了(輸出中的value被設定成空了)。

1.3 程式程式碼

     程式程式碼如下所示:

package com.hebut.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Dedup {

    //map將輸入中的value複製到輸出資料的key上,並直接輸出

    public static class Map extends Mapper<Object,Text,Text,Text>{

        private static Text line=new Text();//每行資料

        //實現map函式

        public void map(Object key,Text value,Context context)

                throws IOException,InterruptedException{

            line=value;

            context.write(linenew Text(""));

        }

    }

    //reduce將輸入中的key複製到輸出資料的key上,並直接輸出

    public static class Reduce extends Reducer<Text,Text,Text,Text>{

        //實現reduce函式

        public void reduce(Text key,Iterable<Text> values,Context context)

                throws IOException,InterruptedException{

            context.write(key, new Text(""));

        }

    }

    public static void main(String[] args) throws Exception{

        Configuration conf = new Configuration();

        //這句話很關鍵

        conf.set("mapred.job.tracker""192.168.1.2:9001");

        String[] ioArgs=new String[]{"dedup_in","dedup_out"};

     String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();

     if (otherArgs.length != 2) {

     System.err.println("Usage: Data Deduplication <in> <out>");

     System.exit(2);

     }

     Job job = new Job(conf, "Data Deduplication");

     job.setJarByClass(Dedup.class);

     //設定MapCombineReduce處理類

     job.setMapperClass(Map.class);

     job.setCombinerClass(Reduce.class);

     job.setReducerClass(Reduce.class);

     //設定輸出型別

     job.setOutputKeyClass(Text.class);

     job.setOutputValueClass(Text.class);

     //設定輸入和輸出目錄

     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

     System.exit(job.waitForCompletion(true) ? 0 : 1);

     }

}

1.4 程式碼結果

     1)準備測試資料

     通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入檔案"dedup_in"資料夾(備註:"dedup_out"不需要建立。)如圖1.4-1所示,已經成功建立。

         

圖1.4-1 建立"dedup_in"                                   圖1.4.2 上傳"file*.txt"

     然後在本地建立兩個txt檔案,通過Eclipse上傳到"/user/hadoop/dedup_in"資料夾中,兩個txt檔案的內容如"例項描述"那兩個檔案一樣。如圖1.4-2所示,成功上傳之後。

     從SecureCRT遠處檢視"Master.Hadoop"的也能證實我們上傳的兩個檔案。

    檢視兩個檔案的內容如圖1.4-3所示:

圖1.4-3 檔案"file*.txt"內容

2)檢視執行結果

     這時我們右擊Eclipse 的"DFS Locations"中"/user/hadoop"資料夾進行重新整理,這時會發現多出一個"dedup_out"資料夾,且裡面有3個檔案,然後開啟雙 其"part-r-00000"檔案,會在Eclipse中間把內容顯示出來。如圖1.4-4所示。

圖1.4-4 執行結果

    此時,你可以對比一下和我們之前預期的結果是否一致。

2、資料排序

  "資料排序"是許多實際任務執行時要完成的第一項工作,比如學生成績評比資料建立索引等。這個例項和資料去重類似,都是原始資料進行初步處理,為進一步的資料操作打好基礎。下面進入這個示例。

2.1 例項描述

    對輸入檔案中資料進行排序。輸入檔案中的每行內容均為一個數字即一個數據。要求在輸出中每行有兩個間隔的數字,其中,第一個代表原始資料在原始資料集中的位次第二個代表原始資料

    樣例輸入

    1)file1:

2

32

654

32

15

756

65223

    2)file2:

5956

22

650

92

    3)file3:

26

54

6

    樣例輸出

1    2

2    6

3    15

4    22

5    26

6    32

7    32

8    54

9    92

10    650

11    654

12    756

13    5956

14    65223

2.2 設計思路

  這個例項僅僅要求對輸入資料進行排序,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個預設的排序,而不需要自己再實現具體的排序呢?答案是肯定的。

  但是在使用之前首先需要瞭解它的預設排序規則。它是按照key值進行排序的,如果key為封裝int的IntWritable型別,那麼MapReduce按照數字大小對key排序,如果key為封裝為String的Text型別,那麼MapReduce按照字典順序對字串排序。

  瞭解了這個細節,我們就知道應該使用封裝int的IntWritable型資料結構了。也就是在map中將讀入的資料轉化成 IntWritable型,然後作為key值輸出(value任意)。reduce拿到<key,value-list>之後,將輸入的 key作為value輸出,並根據value-list元素個數決定輸出的次數。輸出的key(即程式碼中的linenum)是一個全域性變數,它統計當前key的位次。需要注意的是這個程式中沒有配置Combiner,也就是在MapReduce過程中不使用Combiner。這主要是因為使用map和reduce就已經能夠完成任務了。

2.3 程式程式碼

    程式程式碼如下所示:

package com.hebut.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Sort {

    //map將輸入中的value化成IntWritable型別,作為輸出的key

    public static class Map extends

        Mapper<Object,Text,IntWritable,IntWritable>{

        private static IntWritable data=new IntWritable();

        //實現map函式

        public void map(Object key,Text value,Context context)

                throws IOException,InterruptedException{

            String line=value.toString();

            data.set(Integer.parseInt(line));

            context.write(datanew IntWritable(1));

        }

    }

    //reduce將輸入中的key複製到輸出資料的key上,

    //然後根據輸入的value-list中元素的個數決定key的輸出次數

    //用全域性linenum來代表key的位次

    public static class Reduce extends

            Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{

        private static IntWritable linenum = new IntWritable(1);

        //實現reduce函式

        public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)

                throws IOException,InterruptedException{

            for(IntWritable val:values){

                context.write(linenum, key);

                linenum = new IntWritable(linenum.get()+1);

            }

        }

    }

    public static void main(String[] args) throws Exception{

        Configuration conf = new Configuration();

        //這句話很關鍵

        conf.set("mapred.job.tracker""192.168.1.2:9001");

        String[] ioArgs=new String[]{"sort_in","sort_out"};

     String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();

     if (otherArgs.length != 2) {

     System.err.println("Usage: Data Sort <in> <out>");

         System.exit(2);

     }

     Job job = new Job(conf, "Data Sort");

     job.setJarByClass(Sort.class);

     //設定MapReduce處理類

     job.setMapperClass(Map.class);

     job.setReducerClass(Reduce.class);

     //設定輸出型別

     job.setOutputKeyClass(IntWritable.class);

     job.setOutputValueClass(IntWritable.class);

     //設定輸入和輸出目錄

     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

     System.exit(job.waitForCompletion(true) ? 0 : 1);

     }

}

2.4 程式碼結果

1)準備測試資料

    通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入檔案"sort_in"資料夾(備註:"sort_out"不需要建立。)如圖2.4-1所示,已經成功建立。

              

圖2.4-1 建立"sort_in"                                                  圖2.4.2 上傳"file*.txt"

    然後在本地建立三個txt檔案,通過Eclipse上傳到"/user/hadoop/sort_in"資料夾中,三個txt檔案的內容如"例項描述"那三個檔案一樣。如圖2.4-2所示,成功上傳之後。

    從SecureCRT遠處檢視"Master.Hadoop"的也能證實我們上傳的三個檔案。

檢視兩個檔案的內容如圖2.4-3所示:

相關推薦

hadoopmapreduce程式設計例項(系統日誌初步清洗過濾處理)

剛剛開始接觸hadoop的時候,總覺得必須要先安裝hadoop叢集才能開始學習MR程式設計,其實並不用這樣,當然如果你有條件有機器那最好是自己安裝配置一個hadoop叢集,這樣你會更容易理解其工作原理。我們今天就是要給大家演示如何不用安裝hadoop直接除錯程式設計MapR

hadoop使用MapReduce程式設計例項

從網上搜到的一篇hadoop的程式設計例項,對於初學者真是幫助太大了,看過以後對MapReduce程式設計基本有了大概的瞭解。看了以後受益匪淺啊,趕緊儲存起來。 1、資料去重    "資料去重"主要是為了掌握和利用並行化思想來對資料進行有意義的篩選

大資料之Hadoop學習——動手實戰學習MapReduce程式設計例項

文章目錄 一、MapReduce程式設計例項 1.自定義物件序列化 需求分析 報錯:Exception in thread "main" java.lang.IllegalArgumentExcept

HadoopMapReduce多種join實現例項分析

感謝分享:http://database.51cto.com/art/201410/454277.htm 1、在Reudce端進行連線。 在Reudce端進行連線是MapReduce框架進行表之間join操作最為常見的模式,其具體的實現原理如下: Map端的主要工作:為來自

hadoop MapReduce因為檔案開啟檔案數目超過linux限制報錯

haoop中mapreduce報錯 java.io.IOException: All datanodes xxx.xxx.xxx.xxx:xxx are bad. Aborting… at org.apache.hadoop.dfs.DFSClient$DFSOutputSt

MapReduce程式設計例項(六)

package com.t.hadoop; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import

Hadoop MapReduceInputSplit的分析

前言 MapReduce的原始碼分析是基於Hadoop1.2.1基礎上進行的程式碼分析。   什麼是InputSplit        InputSplit是指分片,在MapReduce當中作業中,作為map ta

MapReduce程式設計例項(一)

前提準備: MapReduce程式設計例項: 開發示例:WordCount 本文例詳細的介紹如何在整合環境中執行第一個MapReduce程式 WordCount,以及WordCount程式碼分析 新建MapReduce專案:   Finish生成

MapReduce程式設計例項(三)

前提準備: MapReduce程式設計例項: 輸入: 2013-11-01 aa 2013-11-02 bb 2013-11-03 cc 2013-11-04 aa 2013-11-05 dd 2013-11-06 dd 2013-11-07

MapReduce程式設計例項之WordCount

1.MapReduce計算框架 2.例項WordCount package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokeni

HadoopMapReduce程式設計Demo新舊

1.wordcount。 import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Con

hadoopMapReduce的sort(部分排序,完全排序,二次排序)

1.部分排序 MapReduce預設就是在每個分割槽裡進行排序 2.完全排序 在所有的分割槽中,整體有序                 1)使用一個reduce             2)自定義分割槽函式 不同的key進入的到不同的分割槽之中,在每個分割槽中自動

HadoopMapReduce程式設計模型

一、MapReduce程式設計模型         MapReduce將作業的整個執行過程分為兩個階段:Map階段和Reduce階段         Map階段由一定數量的Map Task組成             輸入資料格式解析:InputFormat        

大資料技術學習筆記之Hadoop框架基礎2-MapReduce程式設計及執行流程

一、回顧     -》hadoop的功能?         -》海量資料儲存和海量計算問題         -》分散式檔案儲存框架hdfs和

HadoopMapReduce例項講解—Python寫的WordCount Demo

MapReduce是hadoop這隻大象的核心,Hadoop 中,資料處理核心就是 MapReduce 程式設計模型。一個Map/Reduce 作業(job) 通常會把輸入的資料集切分為若干獨立的資料塊,由 map任務(task)以完全並行的方式處理它們

hbase資料匯入hdfs之(使用MapReduce程式設計統計hbase庫的mingxing表男女數量)

資料 zhangfenglun,M,20,13522334455,[email protected],23521472 chenfei,M,20,13684634455,[email protected],84545472 liyuchen,M,20,1352233425

Hadoop3 在eclipse訪問hadoop並執行WordCount例項

前言:       畢業兩年了,之前的工作一直沒有接觸過大資料的東西,對hadoop等比較陌生,所以最近開始學習了。對於我這樣第一次學的人,過程還是充滿了很多疑惑和不解的,不過我採取的策略是還是先讓環境跑起來,然後在能用的基礎上在多想想為什麼。       通過這三個禮拜(基本上就是週六週日,其他時間都在

如何在HadoopMapReduce程式處理JSON檔案

簡介: 最近在寫MapReduce程式處理日誌時,需要解析JSON配置檔案,簡化Java程式和處理邏輯。但是Hadoop本身似乎沒有內建對JSON檔案的解析功能,我們不得不求助於第三方JSON工具包。這裡選擇json-simple實現我們的功能。 在Hadoop上執行Jav

CentOS虛擬機器Java環境MapReduce Hadoop的WordCount(詞頻運算)程式連線資料入門

目錄 1. Hadoop 簡介 2. Hadoop 的架構 3. MapReduce 簡介 4. Hadoop HDFS 簡介 5. HDFS架構 6. MapReduce開發流程概念(重點) 7. maperuce 運算開發示例(重點) 8. hdfs 的資料型

hadoop map、reduce數量對mapreduce執行速度的影響

增加task的數量,一方面增加了系統的開銷,另一方面增加了負載平衡和減小了任務失敗的代價;map task的數量即mapred.map.tasks的引數值,使用者不能直接設定這個引數。Input Split的大小,決定了一個Job擁有多少個map。預設input spli