1. 程式人生 > >MapReduce-XML處理-自定義InputFormat及自定義RecordReader

MapReduce-XML處理-自定義InputFormat及自定義RecordReader

這一篇說明如何自定義InputFormat以及RecordReader這兩個元件,通過使用mapreduce處理xml檔案格式的檔案來說明其用法,這一個例子來自《hadoop硬實戰》一書的技術點12講解的用法,如果有說明得不清楚的可以自行進行查閱
下面就來說說這個例項要達到的目的以下是輸入資料:
<configuration>
  <property>
    <name>hadoop.kms.authentication.type</name>
    <value>simple</value>
    <description>

      Authentication type for the KMS. Can be either &quot;simple&quot;
      or &quot;kerberos&quot;.
    </description>
  </property>
  <property>
    <name>hadoop.kms.authentication.kerberos.keytab</name>
    <value>${user.home}/kms.keytab</value>
    <description>

      Path to the keytab with credentials for the configured Kerberos principal.
    </description>
  </property>
  <property>
    <name>hadoop.kms.authentication.kerberos.principal</name>
    <value>HTTP/localhost</value>
    <description>
      The Kerberos principal to use for the HTTP endpoint.

      The principal must start with 'HTTP/' as per the Kerberos HTTP SPNEGO specification.
    </description>
  </property>
  <property>
    <name>hadoop.kms.authentication.kerberos.name.rules</name>
    <value>DEFAULT</value>
    <description>
      Rules used to resolve Kerberos principal names.
    </description>
  </property>
</configuration>
實現的結果:<name>標籤中的資料提取出來做為key,把<value>標籤中的提取出來做為值進行鍵值對的輸出
hadoop.kms.authentication.kerberos.keytab       ${user.home}/kms.keytab
hadoop.kms.authentication.kerberos.name.rules   DEFAULT
hadoop.kms.authentication.kerberos.principal    HTTP/localhost
hadoop.kms.authentication.type  simple

實現步驟:
1.定製InputFormat實現方法:實現InputFormat介面,或者繼承InputFormat的子類,主要實現以下兩個方法:
List<InputSplit> getSplits(), 獲取由輸入檔案計算出輸入分片(InputSplit),解決資料或檔案分割成片問題。
RecordReader<K,V> createRecordReader(),建立RecordReader,從InputSplit中讀取資料,解決讀取分片中資料問題

2.定製RecordReader實現方法:實現RecordReader介面(舊版API)繼承RecordReader類(新版API),下面以新版API為例實現以下方法:

  public abstract void initialize(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException;
  public abstract boolean nextKeyValue() throws IOException, InterruptedException;
  public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
  public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
  public abstract float getProgress() throws IOException, InterruptedException;
  public abstract void close() throws IOException;
其中nextKeyValue(),getCurrentKey(),getCurrentValue()方法會在Mapper換執行過程中反覆呼叫直到該MAP任務所分到的分片被完全的處理,hadoop1.2.1的原始碼如下:
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
//map通過這裡反覆呼叫RecordReader的方法
while (context.nextKeyValue()) {
//context.getCurrentKey()在MapContext的方法中呼叫相關RecordReader的方法
/**
* @Override
*public KEYIN getCurrentKey() throws IOException, InterruptedException {
*return reader.getCurrentKey();
*}
*/
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }

最核心的就是處理好迭代多行文字的內容的邏輯,每次迭代通過呼叫nextKeyValue()方法來判斷是否還有可讀的文字行,直接設定當前的Key和Value,分別在方法getCurrentKey()和getCurrentValue()中返回對應的值。在實現的程式碼中會有相應註釋說明。

定製InputFormat:

  1. import java.io.IOException;  
  2. import org.apache.hadoop.fs.Path;  
  3. import org.apache.hadoop.io.LongWritable;  
  4. import org.apache.hadoop.io.Text;  
  5. import org.apache.hadoop.mapreduce.InputSplit;  
  6. import org.apache.hadoop.mapreduce.JobContext;  
  7. import org.apache.hadoop.mapreduce.RecordReader;  
  8. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  9. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  10. import org.slf4j.Logger;  
  11. import org.slf4j.LoggerFactory;  
  12. publicclass XMLInputFormat extends TextInputFormat {  
  13.     privatestaticfinal Logger log = LoggerFactory.getLogger(XMLInputFormat.class);  
  14.     @Override
  15.     public RecordReader<LongWritable, Text> createRecordReader(  
  16.             InputSplit inputSplit, TaskAttemptContext context) {  
  17.         try {  
  18.             returnnew XMLRecordReader(inputSplit, context.getConfiguration());  
  19.         } catch (IOException e) {  
  20.             log.warn("Error while creating XmlRecordReader", e);  
  21.             returnnull;  
  22.         }  
  23.     }  
  24.     @Override
  25.     protectedboolean isSplitable(JobContext context, Path file) {  
  26.         // TODO Auto-generated method stub
  27.         returnsuper.isSplitable(context, file);  
  28.     }  
  29. }  
定義RecordReader:(這是xml檔案處理的關鍵)
  1. import java.io.IOException;  
  2. import org.apache.hadoop.conf.Configuration;  
  3. import org.apache.hadoop.fs.FSDataInputStream;  
  4. import org.apache.hadoop.fs.FileSystem;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.DataOutputBuffer;  
  7. import org.apache.hadoop.io.LongWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapreduce.InputSplit;  
  10. import org.apache.hadoop.mapreduce.RecordReader;  
  11. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  13. publicclass XMLRecordReader extends RecordReader<LongWritable, Text> {  
  14.     privatelong start;  
  15.     privatelong end;  
  16.     private FSDataInputStream fsin;  
  17.     private DataOutputBuffer buffer = new DataOutputBuffer();  
  18.     privatebyte[] startTag;  
  19.     privatebyte[] endTag;  
  20.     private LongWritable currentKey;  
  21.     private Text currentValue;  
  22.     publicstaticfinal String START_TAG_KEY = "xmlinput.start";  
  23.     publicstaticfinal String END_TAG_KEY = "xmlinput.end";  
  24.     public XMLRecordReader() {  
  25.     }  
  26.     /** 
  27.      * 初始化讀取資源以及相關的引數也可以放到initialize()方法中去執行 
  28.      * @param inputSplit 
  29.      * @param context 
  30.      * @throws IOException 
  31.      */
  32.     public XMLRecordReader(InputSplit inputSplit, Configuration context) throws IOException {  
  33.         /** 
  34.          * 獲取開傳入的開始和結束標籤 
  35.          */
  36.         startTag = context.get(START_TAG_KEY).getBytes("UTF-8");  
  37.         endTag = context.get(END_TAG_KEY).getBytes("UTF-8");  
  38.         FileSplit fileSplit = (FileSplit) inputSplit;  
  39.         /** 
  40.          * 獲取分片的開始位置和結束的位置 
  41.          */
  42.         start = fileSplit.getStart();  
  43.         end = start + fileSplit.getLength();  
  44.         Path file = fileSplit.getPath();  
  45.         FileSystem fs = file.getFileSystem(context);  
  46.         /** 
  47.          * 根據分片開啟一個HDFS的檔案輸入流 
  48.          */
  49.         fsin = fs.open(fileSplit.getPath());  
  50.         /** 
  51.          * 定位到分片開始的位置 
  52.          */
  53. 相關推薦

    MapReduce-XML處理-定義InputFormat定義RecordReader

    這一篇說明如何自定義InputFormat以及RecordReader這兩個元件,通過使用mapreduce處理xml檔案格式的檔案來說明其用法,這一個例子來自《hadoop硬實戰》一書的技術點12講解的用法,如果有說明得不清楚的可以自行進行查閱下面就來說說這個例項要達到的目

    hive中定義函式定義json字串解析函式

    hive中如何定義自己的函式 寫一個Java 程式,實現想要的函式功能 1.匯入hive安裝目錄的lib目錄的包 2新建一個類繼承 UDF類 3.過載父類中evaluate方法; 4.寫下自己的邏輯 package test; import

    C# Winform中定義篩選帶統計行的Datagridview控制元件

    網上分享有很多種自制DGV控制元件,都有不小的缺陷。 沒辦法,按需求自己定製了一個。 一、過濾方面類似於Excel的篩選功能。支援右鍵選單篩選,同時也支援在文字框輸入文字按焦點列進行篩選;  二、統計行我採用的是雙Datagridview方案。在構建控制元件時加入一個Dock為Bottom的子Datagr

    Hadoop 定義InputFormat實現定義Split

    原文連結:http://blog.csdn.net/anbo724/article/details/6956286 上一篇文章中提到了如何進行RecordReader的重寫,本篇文章就是來實現如何實現自定義split的大小 要解決的需求: (1)一個

    Laravel之加密解密/日誌/異常處理定義錯誤

    文件中 例如 tom 處理器 crypt return cat 情況 而不是 一.加密解密 1.加密Crypt::encrypt($request->secret) 2.解密try {   $decrypted = Crypt::decrypt($encryptedV

    Spring Boot-錯誤處理定義全域性異常處理機制

    正常的Web應用開發時,需要考慮到應用執行發生異常時或出現錯誤時如何來被處理,例如捕獲必要的異常資訊,記錄日誌方便日後排錯,友好的使用者響應輸出等等。 當然應用發生錯誤,有可能是應用自身的問題,也有可能是客戶端操作的問題。 Spring Boot預設提供了一種錯誤處理機制。 預設錯誤處理機制

    MapReduce常見演算法 與定義排序Hadoop序列化

    MapReduce常見演算法 •單詞計數 •資料去重 •排序 •Top K •選擇  以求最值為例,從100萬資料中選出一行最小值 •投影  以求處理手機上網日誌為例,從其11個欄位選出了五個欄位(列)來顯示我們的手機上網流量 •分組  相當於分割槽,以求處理手機上網日誌為例,喊手機號和非手

    定義異常異常全域性處理

    一、Java異常分類 java中異常均繼承自Throwable,其有兩個重要的直接子類error與exception Error: 大部分是由虛擬機器報出來的錯誤,是程式無法處理的錯誤,如 OutOfMemoryError,當JVM需要更多記憶體空間而得不

    php定義函式內部函式----字串處理函式

    一、統計字串 (1)strlen() 統計字串長度 函式返回值為整形,表示字串長度。若返回值為0,表示該字串為空。該函式對待一個UTF8的中文字元,處理為3個位元組長度。 echo strlen("xiaolin!");// 輸出:8 echo strlen("小林!"); // 輸出:7

    C# 訊息處理機制定義過濾方式

    一、訊息概述 Windows 下應用程式的執行是通過訊息驅動的。訊息是整個應用程式的工作引擎,我們需要理解掌握我們使用的程式語言是如何封裝訊息的原理。1. 什麼是訊息(Message) 訊息就是通知和命令。在.NET框架類庫中的System.Windows.Forms名稱

    (七)MapReduce定義型別分割槽演算法

    需求 有以下資料:電話 | 地區 | 姓名 | 使用流量 三個reduce生成三個檔案,按照地區來分割槽,得到每個人使用流量的彙總結果。 13877779999 bj zs 2145 13766668888 sh ls 1028 13766668888 sh ls 9987 1387

    springboot 異常處理-定義異常攔截異常

    一般解決異常的思路如下      這裡我就主要講講自定義異常的攔截   為什麼要拋自定義異常? 1.service方法在執行過程出現異常在哪捕獲?在service中需要都加try/catch,如果在controller也需要新增

    c#基礎之異常處理定義異常

    一、什麼是c#中的異常?         異常是程式執行中發生的錯誤,異常處理是程式的一部分。c#中的異常類主要是直接或者間接的派生於 System.Exception類 ,也就是說System.Exception類是所有預定義的系統異常的基類。錯誤的出現並不總是編寫

    VB.NET學習筆記:事件處理定義事件

    測試環境:windows 7和Microsoft Visual Studio 2015 程式設計必然要知道事件,比如單擊按鈕事件,然後編寫當事件發生時的處理程式碼,VB.NET是怎樣進行事件處理的呢?可否自定義事件呢?因擴充套件控制元件,使其列頭增加全選全不選複選框並具備分頁功能需要用到

    spring boot 全域性異常處理定義異常類

    全域性異常處理:定義一個處理類,使用@ControllerAdvice註解。@ControllerAdvice註解:控制器增強,一個被@Component註冊的元件。配合@ExceptionHandler來增強所有的@requestMapping方法。例如:@Exceptio

    Spring Boot學習筆記-錯誤處理定義

    正常的Web應用開發時,需要考慮到應用執行發生異常時或出現錯誤時如何來被處理,例如捕獲必要的異常資訊,記錄日誌方便日後排錯,友好的使用者響應輸出等等。 當然應用發生錯誤,有可能是應用自身的問題,也有可能是客戶端操作的問題。 Spring Boot預設提供了一種錯誤處理機制。 預設錯誤處理機制 預設情況下,S

    Delphi 中定義異常異常處理的一般方法

       delphi中異常定義如下: TCustomException   =   class(Exception)     private     public         constructor   Create(const   Msg:   string );

    MapReduce資料型別定義MapReduce資料型別

    MapReduce資料型別 資料型別都要實現Writable介面,以便用這些型別定義的資料可以被序列化進行網路傳輸和檔案儲存。自定義key資料型別的時候,因為需要對key進行排序,需要繼承java中的比較器,所以可以直接繼承WritableComparable

    Spring Boot 定義異常統一異常處理

    自定義異常 自定義異常要繼承RuntimeException public class ShowtimeException extends RuntimeException { private Integer code; public

    MapReduce中的combiner類詳解定義combiner類(轉)

    一、Combiner的出現背景 1.1 回顧Map階段五大步驟   在第四篇博文《初識MapReduce》中,我們認識了MapReduce的八大步湊,其中在Map階段總共五個步驟,如下圖所示:   其中,step1.5是一個可選步驟,它就是我們今天需要了解的 M