1. 程式人生 > >圓周率π的近似計算(三)-MapReduce分散式計算入門

圓周率π的近似計算(三)-MapReduce分散式計算入門

MapReduce 分散式計算入門

一個胖子

在學 hadoop 的我們最先接觸的分散式框架就是MapReduce框架,本意就是通過使用MapReduce 框架進行實現圓周率 π 的分散式計算的小demo;

MapReduce 的處理流程

  1. Mapper 階段執行流程

    • 第一階段 將輸入目錄下的檔案按照一定的標準進行邏輯切片,形成片規劃.預設的切片規則是按照 檔案塊 切片.每片都有一個 MapTask 進行處理.(getSplits)

    • 第二階段 是對切片中的資料按照一定的規則解析成 key,value 元組.預設規則是把每一行文字內容解析成鍵值對. key 是每一行的起始位置,value 是本行的文字內容.(TextInputFormat)

    • 第三階段 是呼叫Mapper 類中的Map 方法. 上階段每解析出來一個 k,v 呼叫一次 map 方法.輸出同樣是鍵值對,但是可以有多個輸出.

    • 第四階段 是按照一定的規則對第三階段輸出的鍵值對進行分割槽。 預設是隻有一個區。 分割槽的數量就是 Reducer 任務執行的數量。預設只有一個Reducer 任務。

    • 第五階段是對每個分割槽中的鍵值對進行排序。如果有第六階段,那麼進入第六階段;如果沒有,直接輸出到檔案中。

    • 第六階段是對資料進行區域性聚合處理,也就是 combiner 處理。 鍵相等的鍵值對會呼叫一次 reduce 方法。經過這一階段,資料量會減少。 本階段預設是沒有的。

  2. Reduce 階段執行流程

    • 第一階段是 Reducer 任務會主動從 Mapper 任務複製其輸出的鍵值對。Mapper 任務可能會有很多,因此 Reducer 會複製多個 Mapper 的輸出。

    • 第二階段是把複製到 Reducer 本地資料,全部進行合併,即把分散的資料合併成一個大的資料。再對合並後的資料排序。

    • 第三階段是對排序後的鍵值對呼叫 reduce 方法。 鍵相等的鍵值對呼叫一次reduce 方法,每次呼叫會產生零個或者多個鍵值對。最後把這些輸出的鍵值對寫入到 HDFS 檔案中。

進行圓周率 π 的分散式計算

從分 MapReduce 的執行流程看出,分散式框架已經幫我們實現了任務的分發,因此我們的關注點就基本可以不用考慮關於分散式方面的任務,我們主要需要關注的就是 map 方法的構建
,和 reduce 方法的構建.

數學模型

進行分散式計算的任務有個先決條件,那就是可以進行平行計算,就是各 map 之間相互獨立,無依賴關係.

因此,我們在建立數學模型時最好是一個重複計算無相互關聯的模型,而 蒙特卡洛 的模型恰好符合這種特點,因此我們依次構建模型.

在上篇文章中,我們採用的通過面積比的方式來近似求解 圓周率 π ,即正方形面積為 1 ,扇形面積為 π/4,則正方形和扇形的面積比為 π/4,求得這個面積比,我們就能得到最後的 π;

蒙特卡羅方法,就是將面積比轉化為概率問題求解,就是在正方形中取一隨機點進行重複試驗,統計出這個點落在扇形中的概率.用這個概率去描述扇形和正方形的面積比,求得概率即可近似求出圓周率 π 的大小;

隨機試驗的優點是,每次試驗相互獨立互不影響,缺點隨機性大,資料不穩定,一般只用做近似求解.

綜上,我們可以將 求解圓周率的任務 轉化為隨機試驗的統計工作.

實現方法

  1. 首先我們給出Mapper階段方法

    public class SolvingPiMapper extends Mapper<LongWritable, Text, Text, Text> {       
        /**
         * key 輸入 讀取檔案的起始位置
         * value 輸入 檔案中一行的內容
         * context 輸出 <k,v>形式
         */
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            //將從檔案中讀取的隨機試驗次數解析出來
            String num = value.toString();
            Integer totle = new Integer(num);
            //宣告隨機試驗中落點在扇形中數量
            int sum = 0;
            //進行隨機試驗並統計
            for(int i=0;i<totle;i++){
                double x = Math.random();
                double y = Math.random();
                if((x*x+y*y)<1){
                    sum++;
                }
            }
            //將最後結果輸出
            context.write(new Text("PI"), new Text(totle+"--"+sum));
        }
    }
    
  2. 給出 reduce 方法

    public class SolvingPiReducer extends Reducer<Text, Text, Text, DoubleWritable> {
        /**
         * name 輸入的 "PI"
         * message 輸入的"totle--num"
         * context 輸出的<k,v>
         * 所有鍵位"PI"的輸入都用這個方法進行處理
         */
        @Override
        protected void reduce(Text name, Iterable<Text> message, Reducer<Text, Text, Text, DoubleWritable>.Context context)
                throws IOException, InterruptedException {
            //宣告試驗進行的總數
            long sumTotle =0;
            //宣告落點在扇形區域中的總數
            long sumOrder =0;
            //解析輸入的message資訊,從這提取上述兩個值
            for (Text text : message) {
                String[] nums = text.toString().split("--");
                sumTotle+= new Integer(nums[0]);
                sumOrder+= new Integer(nums[1]);
            }
            //System.out.println("π的近似值為"+sumOrder*4.0/sumTotle);
            //輸出最後結果
            context.write(name,new DoubleWritable(sumOrder*4.0/sumTotle));
        }
    }
    

    解析 map 方法返回的資訊,進行彙總並輸出最後結果.

  3. 定義一個主類,用來描述job並提交job

    public class SolvingPiRunner {
        //把業務邏輯相關的資訊(哪個是 mapper,哪個是 reducer,要處理的資料在哪裡,輸出的結果放在哪裡……)描述成一個 job 物件
        //把這個描述好的 job 提交給叢集去執行
        public static void main(String[] args) throws Exception {
            //使用者自定義輸入
            System.out.println("請輸入你想分的片數:");
            Scanner sc = new Scanner(System.in);
            int pice=new Integer(sc.nextLine());
            System.out.println("請輸入你每片執行多少次:");
            String line=sc.nextLine();
            //按照分片生成檔案(在實際環境中需要在hdfc中建立檔案)
            for(int i=0;i<pice;i++){
                BufferedWriter bw = new BufferedWriter(new FileWriter(new File("D:\\hadoop\\input\\"+(i+1)+".txt")));
                bw.write(line);
                bw.close();
            }
    
            //把業務邏輯相關的資訊(哪個是 mapper,哪個是 reducer,要處理的資料在哪裡,輸出的結果放在哪裡……)描述成一個 job 物件
            //把這個描述好的 job 提交給叢集去執行
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            //知道這個job所在jar包
            job.setJarByClass(SolvingPiRunner.class);
    
            job.setMapperClass(SolvingPiMapper.class);
            job.setReducerClass(SolvingPiReducer.class);
            //設定我們的業務邏輯Mapper類的輸出key 和  value 的資料
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //設定我們的業務邏輯Reducer 類的輸出Key和value 的資料型別
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
            //指定要處理的資料所在的位置
            FileInputFormat.setInputPaths(job, "D:\\hadoop\\input\\*.txt");
            //指定處理完成後,結果所儲存的位置
            FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output\\result"));
            //向yarn叢集提交這個job
    
            boolean res = job.waitForCompletion(true);
            System.exit(res?0:1);
        }
    }
    

在windows環境下模擬叢集環境執行測試;

遇到的問題

  • 啟動問題報錯

    Exception in thread "main" java.io.IOException: (null) entry in command string: null chmod 0700 D:\tmp\hadoop-lxc\mapred\staging\lxc1332581434\.staging
            at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
            at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
            at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
            at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
            at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:491)
            at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:532)
            at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:509)
            at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:312)
            at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:133)
            at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:144)
            at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
            at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:422)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
            at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
            at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
            at test.demo.SolvingPiRunner.main(SolvingPiRunner.java:54)
    

解決方法

報錯二

Exception in thread "main" java.lang.RuntimeException: Error while running command to get file permissions : java.io.IOException: (null) entry in command string: null ls -F D:\hadoop\input\1.txt
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
    at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)
    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:659)
    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:634)
    at org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:49)
    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1733)
    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1713)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:305)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
    at test.demo.SolvingPiRunner.main(SolvingPiRunner.java:54)  

原因及解決辦法

  • 在windows環境下讀取檔案不能直接寫檔案所在路徑,需要寫到檔案,如果需要讀多個檔案可以用萬用字元 * 代之多個;

測試結果

我啟動程式進行運算
    10X10000000 次
結果 
    PI  3.141599752