1. 程式人生 > >Pig使用者自定義函式(UDF)

Pig使用者自定義函式(UDF)

我們以氣溫統計和詞頻統計為例,講解以下三種使用者自定義函式。

使用者自定義函式

什麼時候需要使用者自定義函式呢?和其它語言一樣,當你希望簡化程式結構或者需要重用程式程式碼時,函式就是你不二選擇。

Pig的使用者自定義函式可以用Java編寫,但是也可以用Python或Javascript編寫。我們接下來以Java為例。

自定義過濾函式

我們仍然以先前的程式碼為例:

records = load 'hdfs://localhost:9000/input/temperature1.txt'as (year: chararray,temperature: int);

valid_records = filter records by temperature!=999;

第二個語句的作用就是篩選合法的資料。如果我們採用使用者自定義函式,則第二個語句可以寫成:

valid_records = filter records by isValid(temperature);

這種寫法更容易理解,也更容易在多個地方重用。接下來的問題就是如何定義這個isValid函式。程式碼如下:

packagecom.oserp.pigudf;

importjava.io.IOException;

importorg.apache.pig.FilterFunc;

importorg.apache.pig.data.Tuple;

public class IsValidTemperature extends

FilterFunc {

         @Override

         public Boolean exec(Tuple tuple) throws IOException {            

                   Object object = tuple.get(0);

                   int temperature = (Integer)object;            

                   return temperature != 999;

         }

}

接下來,我們需要:

1)  編譯程式碼並打包成jar檔案,比如pigudf.jar。

2)  通過register命令將這個jar檔案註冊到pig環境:

register/home/user/hadoop_jar/pigudf.jar //引數為jar檔案的本地路徑

此時,我們就可以用以下語句呼叫這個函式:

valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);

dump valid_records;

看起來這個函式名太長,不便輸入。我們可以用定義別名的方式代替:

define isValid com.oserp.pigudf.IsValidTemperature();

valid_records = filter records by isValid(temperature);

dump valid_records;

回到程式碼,我們可發現:

1)  需要定義一個繼承自FilterFunc的類。

2)  重寫這個類的exec方法。這個方法的引數只有一個tuple,但是呼叫時可以傳遞多個引數,你可以通過索引號獲得對應的引數值,比如tuple.get(1)表示取第二個引數。

3)  呼叫時,需要使用類的全名。(當然你可以自定義別名)

4)  更多的驗證需要讀者自行在函式中新增,比如判斷是否為null等等。

備註:用Eclipse編寫Pig自定義函式時,你可能需要引用到一些Hadoop的庫檔案。比較容易的方式是在新建專案時指定專案型別為MapReduce專案,這樣Eclipse就會自動設定庫引用的相關資訊。

自定義運算函式(Eval function)

仍然以前面的資料檔案為例:

1990 21

1990 18

1991 21

1992 30

1992 999

1990 23

假設我們希望通過溫度值獲得一個溫度的分類資訊,比如我們把溫度大於劃分為以下型別:

溫度                            分類

x>=30                          hot

x>=10 and x<30        moderate

x<10                                      cool

則我們可以定義以下函式,程式碼如下:

packagecom.oserp.pigudf;

importjava.io.IOException;

importorg.apache.pig.EvalFunc;

importorg.apache.pig.data.Tuple;

public class GetClassification extends EvalFunc<String> {

         @Override

         public String exec(Tuple tuple) throws IOException {               

                   Object object = tuple.get(0);

                   int temperature = (Integer)object;

                   if (temperature >= 30){

                            return "Hot";

                   }

                   else if(temperature >=10){

                            return "Moderate";

                   }

                   else {

                            return "Cool";

                   }                

         }

}

依次輸入以下Pig語句:

records = load'hdfs://localhost:9000/input/temperature1.txt' as (year: chararray,temperature:int);

register /home/user/hadoop_jar/pigudf.jar;

valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);

result = foreach valid_records generateyear,com.oserp.pigudf.GetClassification(temperature);

dump result;

輸出結果如下:

(1990,Moderate)

(1990,Moderate)

(1991,Moderate)

(1992,Hot)    

(1990,Moderate)

程式碼比較簡單,該類繼承自EvalFunc類,且我們要明確定義返回值型別。

有些時候其它類庫可能包含有功能相近的Java函式,我們是否可以直接將這些庫函式拿過來使用呢?可以。以下語句呼叫了trim函式,用於去掉name欄位前後的空格:

DEFINE trim InvokeForString('org.apache.commons.lang.StringUtils.trim','String');

B = FOREACH A GENERATE trim(name);

其中的InvokeForString是一個Invoker(不知道該如何翻譯啊),其通過反射機制呼叫,返回值是String型別。其它類似的還有InvokeForInt,InvokeForLong, InvokeForDouble,InvokeForFloat等等。

自定義載入函式

我們以詞頻統計為例,講解如何自定義載入函式。(統計各個單詞出現的頻率,由高到低排序)

一般情況下,load語句載入資料時,一行會被生成一個tuple。而統計詞頻時,我們希望每個單詞生成一個tuple。我們的測試資料檔案只有兩行資料,如下:

Thisis a map a reduce program

mapreduce partition combiner

我們希望load後能得到如下形式的資料,每個單詞一個tuple:

(This)

(is)

(a)

(map)

(a)

(reduce)

先看程式碼:

package com.oserp.pigudf;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.InputFormat;

import org.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.RecordReader;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.pig.LoadFunc;

importorg.apache.pig.backend.executionengine.ExecException;

importorg.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;

import org.apache.pig.data.BagFactory;

import org.apache.pig.data.DataBag;

import org.apache.pig.data.Tuple;

import org.apache.pig.data.TupleFactory;

public class WordCountLoadFunc extends LoadFunc {

         private RecordReader reader;

         TupleFactorytupleFactory = TupleFactory.getInstance();

         BagFactorybagFactory = BagFactory.getInstance();

         @Override        

         public InputFormatgetInputFormat() throws IOException {

                   return new TextInputFormat();

         }      

         @Override

         public Tuple getNext() throws IOException {

                   try {

                            // 當讀取到分割槽資料塊的末尾時,返回null表示資料已讀取完

                            if (!reader.nextKeyValue()){

                                     return null;

                                     }

                            Textvalue = (Text)reader.getCurrentValue();

                            Stringline = value.toString();

                            String[]words =  line.split("\\s+"); // 斷詞

                            // 因為getNext函式只能返回一個tuple

                            // 而我們希望每個單詞一個單獨的tuple

                            // 所以我們將多個tuple放到一個bag裡面,

                            // 然後返回一個包含一個bagtuple

                            // 注:這只是一個用於演示用法的示例,實際中這樣使用不一定合理。

                            List<Tuple>tuples = new ArrayList<Tuple>();                    

                            Tupletuple = null;

                            for (String word : words) {

                                     tuple= tupleFactory.newTuple();

                                     tuple.append(word);

                                     tuples.add(tuple);

                                     }

                            DataBagbag = bagFactory.newDefaultBag(tuples);

                            Tupleresult = tupleFactory.newTuple(bag);

                            return result;

                   }

                   catch (InterruptedException e) {

                            throw new ExecException(e);

                   }

         }

         @Override

         public void prepareToRead(RecordReader reader,PigSplit arg1)

                            throws IOException {

                   this.reader = reader;

         }

         @Override

         public void setLocation(String location, Job job) throws IOException {

                   FileInputFormat.setInputPaths(job,location);          

         }

}

依次執行以下命令:

1)       records= load 'hdfs://localhost:9000/input/sample_small.txt' usingcom.oserp.pigudf.WordCountLoadFunc() as (words:bag{word:(w:chararray)});

2)       flatten_records= foreach records generate flatten($0);

3)       grouped_records= group flatten_records by words::w;

4)       result= foreach grouped_records generate group,COUNT(flatten_records);

5)       final_result= order result by $1 desc,$0;

6)       dumpfinal_result;

顯示結果如下:

(a,2)                  

(map,2)            

(reduce,2)        

(This,1)             

(combiner,1)   

(is,1)                           

(partition,1)    

(program,1)     

注意schema的定義格式:(words:bag{word:(w:chararray)})