1. 程式人生 > >java-spark中各種常用運算元的寫法示例

java-spark中各種常用運算元的寫法示例

Spark的運算元的分類

從大方向來說,Spark 運算元大致可以分為以下兩類:

1)Transformation 變換/轉換運算元:這種變換並不觸發提交作業,完成作業中間過程處理。

Transformation 操作是延遲計算的,也就是說從一個RDD 轉換生成另一個 RDD 的轉換操作不是馬上執行,需要等到有 Action 操作的時候才會真正觸發運算。

2)Action 行動運算元:這類運算元會觸發 SparkContext 提交 Job 作業。

Action 運算元會觸發 Spark 提交作業(Job),並將資料輸出 Spark系統。

從小方向來說,Spark 運算元大致可以分為以下三類:

1)Value資料型別的Transformation運算元,這種變換並不觸發提交作業,針對處理的資料項是Value型的資料。

2)Key-Value資料型別的Transfromation運算元,這種變換並不觸發提交作業,針對處理的資料項是Key-Value型的資料對。

3)Action運算元,這類運算元會觸發SparkContext提交Job作業。

引言

通常寫spark的程式用scala比較方便,畢竟spark的原始碼就是用scala寫的。然而,目前java開發者特別多,尤其進行資料對接、上線服務的時候,這時候,就需要掌握一些spark在java中的使用方法了

一、map

map在進行資料處理、轉換的時候,不能更常用了

在使用map之前 首先要定義一個轉換的函式 格式如下:

Function<String, LabeledPoint> transForm = new Function<String, LabeledPoint>() {//String是某一行的輸入型別 LabeledPoint是轉換後的輸出型別
 @Override
 public LabeledPoint call(String row) throws Exception {//重寫call方法
 String[] rowArr = row.split(",");
 int rowSize = rowArr.length;
  
 double[] doubleArr = new double[rowSize-1];
  
 //除了第一位的lable外 其餘的部分解析成double 然後放到陣列中
 for (int i = 1; i < rowSize; i++) {
  String each = rowArr[i];
  doubleArr[i] = Double.parseDouble(each);
 }
  
 //用剛才得到的資料 轉成向量
 Vector feature = Vectors.dense(doubleArr);
 double label = Double.parseDouble(rowArr[0]);
 //構造用於分類訓練的資料格式 LabelPoint
 LabeledPoint point = new LabeledPoint(label, feature);
 return point;
 }
 };

需要特別注意的是:

1、call方法的輸入應該是轉換之前的資料行的型別  返回值應是處理之後的資料行型別

2、如果轉換方法中呼叫了自定義的類,注意該類名必須實現序列化 比如

public class TreeEnsemble implements Serializable {
}

3、轉換函式中如果呼叫了某些類的物件,比如該方法需要呼叫外部的一個引數,或者數值處理模型(標準化,歸一化等),則該物件需要宣告是final

然後就是在合適的時候呼叫該轉換函數了

JavaRDD<LabeledPoint> rdd = oriData.toJavaRDD().map(transForm);

這種方式是需要將普通的rdd轉成javaRDD才能使用的,轉成javaRDD的這一步操作不耗時,不用擔心

二、filter

在避免資料出現空值、0等場景中也非常常用,可以滿足sql中where的功能

這裡首先也是要定義一個函式,該函式給定資料行 返回布林值 實際效果是將返回為true的資料保留

Function<String, Boolean> boolFilter = new Function<String, Boolean>() {//String是某一行的輸入型別 Boolean是對應的輸出型別 用於判斷資料是否保留
 @Override
 public Boolean call(String row) throws Exception {//重寫call方法
 boolean flag = row!=null;
 return flag;
 }
 };

通常該函式實際使用中需要修改的僅僅是row的型別 也就是資料行的輸入型別,和上面的轉換函式不同,此call方法的返回值應是固定為Boolean

然後是呼叫方式

JavaRDD<LabeledPoint> rdd = oriData.toJavaRDD().filter(boolFilter);

三、mapToPair

該方法和map方法有一些類似,也是對資料進行一些轉換。不過此函式輸入一行 輸出的是一個元組,最常用的方法是用來做交叉驗證 或者統計錯誤率 召回率 計算AUC等等

同樣,需要先定義一個轉換函式

Function<String, Boolean> transformer = new PairFunction<LabeledPoint, Object, Object>() {//LabeledPoint是輸入型別 後面的兩個Object不要改動
 @Override
 public Tuple2 call(LabeledPoint row) throws Exception {//重寫call方法 通常只改動輸入引數 輸出不要改動
 double predicton = thismodel.predict(row.features());
 double label = row.label();
 return new Tuple2(predicton, label);
 }
 });

關於呼叫的類、類的物件,要求和之前的一致,類需要實現序列化,類的物件需要宣告成final型別

相應的呼叫如下:

JavaPairRDD<Object, Object> predictionsAndLabels = oriData.mapToPair(transformer);

然後對該predictionsAndLabels的使用,計算準確率、召回率、精準率、AUC