1. 程式人生 > >Pig系統分析(8)-Pig可擴展性

Pig系統分析(8)-Pig可擴展性

abstract nike 自己 this script dex java fcm cep

本文是Pig系統分析系列中的最後一篇了,主要討論怎樣擴展Pig功能。不僅介紹Pig本身提供的UDFs擴展機制,還從架構上探討Pig擴展可能性。

補充說明:前些天同事發現twitter推動的Pig On Spark項目:Spork,準備研究下。

UDFs

通過UDFs(用戶自己定義函數),能夠自己定義數據處理方法,擴展Pig功能。實際上,UDFS除了使用之前須要register/define外。和內置函數沒什麽不同。

主要的EvalFunc

以內置的ABS函數為例:

public class ABS extends EvalFunc<Double>{
    /**
     * java level API
     * @param input expectsa single numeric value
     * @return output returns a single numeric value, absolute value of the argument
     */
    public Double exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0)
            return null;
 
        Double d;
        try{
            d = DataType.toDouble(input.get(0));
        } catch (NumberFormatException nfe){
            System.err.println("Failed to process input; error -" + nfe.getMessage());
            return null;
        } catch (Exception e){
            throw new IOException("Caught exception processing input row", e);
        }
        return Math.abs(d);
    }
    ……
    public Schema outputSchema(Schema input) ;
    public List<FuncSpec> getArgToFuncMapping() throws FrontendException;
 
}
  1. 函數都繼承EvalFunc接口,泛型參數Double代表返回類型。

  2. exec方法:輸入參數類型為元組,代表一行記錄。
  3. outputSchema方法:用於處理輸入和輸出Schema
  4. getArgToFuncMapping:用於支持各種數據類型重載。

聚合函數

EvalFuc方法也能實現聚合函數,這是由於group操作對每一個分組都返回一條記錄,每組中包括一個Bag,所以exec方法中叠代處理Bag中記錄就可以。

以Count函數為例:

public Long exec(Tuple input) throws IOException {
    try {
        DataBag bag = (DataBag)input.get(0);
        if(bag==null)
            return null;
        Iterator it = bag.iterator();
        long cnt = 0;
        while (it.hasNext()){
            Tuple t = (Tuple)it.next();
            if (t != null && t.size() > 0 && t.get(0) != null )
                cnt++;
        }
        return cnt;
    } catch (ExecException ee) {
        throw ee;
    } catch (Exception e) {
        int errCode = 2106;               
        String msg = "Error while computing count in " + this.getClass().getSimpleName();
        throw new ExecException(msg, errCode, PigException.BUG, e);
    }
}

Algebraic 和Accumulator 接口

如前所述,具備algebraic性質的聚合函數在Map-Reduce過程中能被Combiner優化。直觀來理解,具備algebraic性質的函數處理過程能被分為三部分:initial(初始化,處理部分輸入數據)、intermediate(中間過程,處理初始化過程的結果)和final(收尾,處理中間過程的結果)。

比方COUNT函數,初始化過程為count計數操作。中間過程和收尾為sum求和操作。更進一步。假設函數在這三個階段中都能進行同樣的操作,那麽函數具備distributive性質。比方SUM函數。

Pig提供了Algebraic 接口:

public interface Algebraic{
    /**
     * Get the initial function.
     * @return A function name of f_init. f_init shouldbe an eval func.
     * The return type off_init.exec() has to be Tuple
     */
    public String getInitial();
 
    /**
     * Get the intermediatefunction.
     * @return A function name of f_intermed. f_intermedshould be an eval func.
     * The return type off_intermed.exec() has to be Tuple
     */
    public String getIntermed();
 
    /**
     * Get the final function.
     * @return A function name of f_final. f_final shouldbe an eval func parametrized by
     * the same datum as the evalfunc implementing this interface.
     */
    public String getFinal();
}

當中每一個方法都返回EvalFunc實現類的名稱。

繼續以COUNT函數為例,COUNT實現了Algebraic接口。針對下面語句:

input= load ‘data‘ as (x, y);
grpd= group input by x;
cnt= foreach grpd generate group, COUNT(input);
storecnt into ‘result‘;
Pig會重寫MR運行計劃:

Map
load,foreach(group,COUNT.Initial)
Combine
foreach(group,COUNT.Intermediate)
Reduce
foreach(group,COUNT.Final),store
Algebraic 接口通過Combiner優化降低傳輸數據量,而Accumulator接口則關註的是內存使用量。UDF實現Accumulator接口後,Pig保證全部key相同的數據(通過Shuffle)以增量的形式傳遞給UDF(默認pig.accumulative.batchsize=20000)。相同。COUNT也實現了Accumulator接口。

/* Accumulator interface implementation */
    private long intermediateCount = 0L;
    @Override
    public void accumulate(Tuple b) throws IOException {
       try {
           DataBag bag = (DataBag)b.get(0);
           Iterator it = bag.iterator();
           while (it.hasNext()){
                Tuple t = (Tuple)it.next();
                if (t != null && t.size() > 0 && t.get(0) != null) {
                    intermediateCount += 1;
                }
            }
       } catch (ExecException ee) {
           throw ee;
       } catch (Exception e) {
           int errCode = 2106;
           String msg = "Error while computing min in " + this.getClass().getSimpleName();
           throw new ExecException(msg, errCode, PigException.BUG, e);          
       }
    }
 
    @Override
    public void cleanup() {
       intermediateCount = 0L;
    }
    @Override
    /*
    *當前key都被處理完之後被調用
    */
    public Long getValue() {
       return intermediateCount;
    }

前後端數據傳遞

通過UDFs構造函數傳遞數據是最簡單的方法。然後通過define語句定義UDF實例時指定構造方法參數。但有些情況下。比方數據在執行期才產生,或者數據不能用String格式表達,這時候就得使用UDFContext了。

UDF通過getUDFContext方法獲取保存在ThreadLoacl中的UDFContext實例。

UDFContext包括下面信息:

  1. jconf:Hadoop Configuration。

  2. clientSysProps:系統屬性。
  3. HashMap<UDFContextKey,Properties> udfConfs:用戶自己保存的屬性,當中UDFContextKey由UDF類名生成。

UDFs運行流程

技術分享

Pig架構可擴展性

Pig哲學之三——Pigs Live Anywhere。

理論上。Pig並不被限定執行在Hadoop框架上,有幾個能夠參考的實現和提議。

  1. Pigen。Pig on Tez。https://github.com/achalsoni81/pigeon,架構圖例如以下:技術分享
  2. Pig的後端抽象層:https://wiki.apache.org/pig/PigAbstractionLayer。

    眼下已經實現了PigLatin執行在Galago上。

    http://www.galagosearch.org/

參考資料

Pig官網:http://pig.apache.org/

Pig paper at SIGMOD 2008:Building a High Level DataflowSystem on top of MapReduce:The Pig Experience

Programming.Pig:Dataflow.Scripting.with.Hadoop(2011.9).Alan.Gates

Pig系統分析(8)-Pig可擴展性