1. 程式人生 > >Hadoop 自定義InputFormat實現自定義Split

Hadoop 自定義InputFormat實現自定義Split

原文連結:http://blog.csdn.net/anbo724/article/details/6956286

上一篇文章中提到了如何進行RecordReader的重寫,本篇文章就是來實現如何實現自定義split的大小

要解決的需求:

(1)一個文字中每一行都記錄了一個檔案的路徑,

(2)要求處理路徑對應的檔案,但是因為檔案量比較大,所以想進行分散式處理

(3)所以就對輸入的文件進行預處理,讀取前N行做為一個splits,但是沒有實現,因為重寫FileSplit不是太容易實現,就偷懶直接定義一個split的大小是1000個位元組,這樣就可以將輸入的文件進行分片了。

直接貼程式碼:

InputFormat

  1. /** 
  2. * @file LineInputFormat.java 
  3. * @brief自定義InputFormat 實現split大小的控制 
  4. * @author anbo, [email protected] 
  5. * @version 1.0 
  6. * @date 2011-10-18 
  7. */
  8. /* Copyright(C) 
  9. * For free 
  10. * All right reserved 
  11. * 
  12. */
  13. package an.hadoop.test;  
  14. import java.io.IOException;  
  15. import
     java.util.ArrayList;  
  16. import java.util.List;  
  17. import org.apache.commons.logging.Log;   
  18. import org.apache.commons.logging.LogFactory;  
  19. import org.apache.hadoop.fs.BlockLocation;  
  20. import org.apache.hadoop.fs.FileStatus;  
  21. import org.apache.hadoop.fs.FileSystem;  
  22. import org.apache.hadoop.fs.Path;  
  23. import org.apache.hadoop.io.LongWritable;  
  24. import org.apache.hadoop.io.Text;  
  25. import org.apache.hadoop.io.compress.CompressionCodec;  
  26. import org.apache.hadoop.io.compress.CompressionCodecFactory;  
  27. import org.apache.hadoop.mapreduce.InputFormat;  
  28. import org.apache.hadoop.mapreduce.InputSplit;  
  29. import org.apache.hadoop.mapreduce.JobContext;  
  30. import org.apache.hadoop.mapreduce.RecordReader;  
  31. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  32. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  33. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  34. import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;  
  35. publicclass LineInputFormat extends FileInputFormat<LongWritable , Text> {  
  36.     publiclong mySplitSize = 1000;  
  37.      privatestaticfinal Log LOG = LogFactory.getLog(FileInputFormat.class);  
  38.       privatestaticfinaldouble SPLIT_SLOP = 1.1;   // 10% slop
  39.      @Override
  40.       public RecordReader<LongWritable, Text>   
  41.         createRecordReader(InputSplit split,  
  42.                            TaskAttemptContext context) {  
  43.         returnnew LineRecordReader(); //為什麼不行呢 
  44.       }  
  45.     @Override
  46.     protectedboolean isSplitable(JobContext context, Path file) {  
  47.         CompressionCodec codec =  
  48.         new CompressionCodecFactory(context.getConfiguration()).getCodec(file);  
  49.         //return codec == null;
  50.         returntrue;//要求分片
  51.     }  
  52.      /**  
  53.        * Generate the list of files and make them into FileSplits. 
  54.        */
  55.     @Override
  56.       public List<InputSplit> getSplits(JobContext job) throws IOException {  
  57.         long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  58.         long maxSize = getMaxSplitSize(job);  
  59.         // generate splits
  60.         List<InputSplit> splits = new ArrayList<InputSplit>(); //用以存放生成的split的  
  61.         for (FileStatus file: listStatus(job)) {//filestatues是檔案對應的資訊,具體看對應的類
  62.           Path path = file.getPath();  
  63.           FileSystem fs = path.getFileSystem(job.getConfiguration());  
  64.           long length = file.getLen(); //得到文字的長度
  65.           BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); //取得檔案所在塊的位置
  66.           if ((length != 0) && isSplitable(job, path)) { //如果檔案不為空,並且可以分片的話就進行下列操作,
  67.             long blockSize = file.getBlockSize();//
  68.             //long splitSize = computeSplitSize(blockSize, minSize, maxSize); //split的大小Math.max(minSize, Math.min(maxSize, blockSize));
  69.             //可以通過調整splitSize的大小來控制對應的檔案塊的大小,比如設定splitSize=100,那麼就可以控制成每個split的大小
  70.             //但是問題是,我是要求按行進行處理的,雖然這樣應該也可以按行進行切分了,不過卻不能保證每個split對應的行數都是相等的
  71.             //一般情況是如果檔案大於64M(32M)就會使用塊大小來作為split
  72.             long splitSize = mySplitSize;  
  73.             long bytesRemaining = length; //文字的長度
  74.             while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {//剩下的文字長度大於split大小的SPLIT_SLOP倍數
  75.               int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//找到對應block塊中對應的第0個字元開始,
  76.               splits.add(new FileSplit(path, length-bytesRemaining, splitSize,    
  77.                                        blkLocations[blkIndex].getHosts()));   
  78.             //這個是形成split的程式碼FileSplit(檔案路徑,0,split大小,host)
  79.               //原始函式為 FileSplit(Path file, long start, long length, String[] hosts) {
  80.               //但是應該可以通過重寫FileSplit來實現對應的要求
  81.               bytesRemaining -= splitSize;  
  82.             }  
  83.             if (bytesRemaining != 0) {  
  84.               splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,   
  85.                          blkLocations[blkLocations.length-1].getHosts()));  
  86.             }  
  87.           } elseif (length != 0) {  
  88.             splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));  
  89.           } else {   
  90.             //Create empty hosts array for zero length files
  91.             splits.add(new FileSplit(path, 0, length, new String[0]));  
  92.           }  
  93.         }  
  94.         LOG.debug("Total # of splits: " + splits.size());  
  95.         return splits;  
  96.       }  
  97. }  
main類
  1. publicclass Test_multi {  
  2. 相關推薦

    Hadoop 定義InputFormat實現定義Split

    原文連結:http://blog.csdn.net/anbo724/article/details/6956286 上一篇文章中提到了如何進行RecordReader的重寫,本篇文章就是來實現如何實現自定義split的大小 要解決的需求: (1)一個

    Hadoop完全分散式下實現定義排序、分割槽和分組

        經過前面一段時間的學習,簡單的單詞統計已經不能實現更多的需求,就連自帶的一些函式方法等也是跟不上節奏了;加上前面一篇MapReduce的底層執行步驟的瞭解,今天學習自定義的排序、分組、分割槽相對也特別容易。 自定義排序 自定義的排序有許多許多,根據不同的業務需

    定義UITableView實現定義左滑刪除按鈕及多按鈕,拖拽cell和表頭進行排序

    本文介紹了能拖拽cell和表頭進行排序的自定義UITableView,並且能自定義左滑顯示的UIButton樣式。 先看左滑自定義按鈕效果圖  :     override func tableView(_ tableView: UITableView, editA

    MapReduce-XML處理-定義InputFormat定義RecordReader

    這一篇說明如何自定義InputFormat以及RecordReader這兩個元件,通過使用mapreduce處理xml檔案格式的檔案來說明其用法,這一個例子來自《hadoop硬實戰》一書的技術點12講解的用法,如果有說明得不清楚的可以自行進行查閱下面就來說說這個例項要達到的目

    MyBatis Generator系列(八)----MyBatis Generator定義外掛實現定義Mapper

    一、建立基類 有時候會通過建立實體的基類,用來複用一些程式碼,然後讓其他的實體類整合這個類: package com.fendo.bean; /** * @Title: BaseModel.java * @Package com.fendo.bean *

    Hadoop完全分散式用MapReduce實現定義排序、分割槽和分組

        經過前面一段時間的學習,簡單的單詞統計已經不能實現更多的需求,就連自帶的一些函式方法等也是跟不上節奏了;加上前面一篇MapReduce的底層執行步驟的瞭解,今天學習自定義的排序、分組、分割槽相對也特別容易。 認為不好理解,先參考一下前面的一篇:https://bl

    如何實現定義同步組件

    nds 允許 oid try unlock all 同步 while name package com.chen;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQ

    JS簡單實現定義右鍵菜單

    ans idt 右鍵 動畫 忘記 span spa round 部分 RT,一個簡單的例子,僅僅講述原理 <div id="menu" style="width: 0;height: 0;background: cadetblue;position: absolu

    Notification的基本用法以及使用RemoteView實現定義布局

    解決 edi ngs 取消 ets lsp 過程 net tde Notification的作用 Notification是一種全局效果的通知,在系統的通知欄中顯示。既然作為通知,其基本作用有: 顯示接收到短消息、即時信息等 顯示客戶端的推送(廣告、優惠、新聞等)

    Android -- 定義view實現keep歡迎頁倒計時效果

    super onfinish -m use new getc awt ttr alt 1,最近打開keep的app的時候,發現它的歡迎頁面的倒計時效果還不錯,所以打算自己來寫寫,然後就有了這篇文章。 2,還是老規矩,先看一下我們今天實現的效果   相較於我們常見的倒計時

    Android定義View——實現水波紋效果類似剩余流量球

    string 三個點 pre ber block span 初始化 move 理解 最近突然手癢就想搞個貝塞爾曲線做個水波紋效果玩玩,終於功夫不負有心人最後實現了想要的效果,一起來看下吧: 效果圖鎮樓 一:先一步一步來分解一下實現的過程 需要繪制一個正弦曲線(sin

    匿名類型與Select方法實現定義對象插入局部表結構中

    aso 直接 菜單 ember new order ber 構建 als 在提取局部表結構數據時,通過Select選取需要的字段,如下句,此時其實產生了一個不用於_menuMan的原新數據類型new { c.SYS_COMMANDS_ID,c.TXT_COMMANDTITL

    java中實現Comparable接口實現定義排序

    static -1 return rabl generated args logs ava sca 1 class Student implements Comparable{ 2 String name; 3 int gpa; 4 @Ov

    freemarker實現定義指令和定義函數

    數據 dir variables macro 內置 引擎 eem fig turn 自定義指令: 1.指令在前臺實現   <#macro name param1,param2,param3...paramN>   </#macro> 2.指令在後臺實

    實現定義查詢的數據庫設計及實現(一)

    bre 名稱 審批流程 work 數據庫名 需要 自定義查詢 perm 枚舉 需求 先說一下需求:實現用戶自定義的查詢,用戶可以自定義要查詢的列、自定義條件條件、自定義排序。除了查詢使用外,還可以使用於各個需要根據條件進行約束的業務,如權限; 本設計和實現,很大部分是通過數

    實現定義查詢的數據庫設計及實現(二)

    表名 table abr bigint sts 處理 update 關聯表 creat 上部分大概講了一下表設計,這部分講一下處理。 處理的結構 處理結構的內容比較多,分為幾個部分分別講解一下。首先講解一下尋找關系表。 尋找關系表 尋找關系表根據“表間關系登記表”進行處

    spring mvc實現定義註解

    poi org param 運行時 onf dha ogg logs exec 實現方式:使用@Aspect實現: 1. 新建註解接口:CheckSign package com.soeasy.web.utils; import org.springframework.

    ApiController實現定義身份認證

    del api color span () log list() etc serialize 1 /// <summary> 2 /// 身份認證 3 /// </summary> 4 public class Au

    利用echarts highcharts 實現定義地圖 關系圖效果 側邊3D柱形圖餅圖散點圖

    技術 ges 散點圖 chart blog 餅圖 git 分享 charts github 地址: https://https://github.com/Gengshaoxuan/medataMap github 地址: https://https://github.c

    Android定義processor實現bindView功能

    lis dds 定義 java代碼 cli 註冊 文章 type() mage 一、簡介 在現階段的Android開發中,註解越來越流行起來,比如ButterKnife,Retrofit,Dragger,EventBus等等都選擇使用註解來配置。按照處理時期,註解又分為兩