java-spark中各種常用運算元的寫法示例
Spark的運算元的分類
從大方向來說,Spark 運算元大致可以分為以下兩類:
1)Transformation 變換/轉換運算元:這種變換並不觸發提交作業,完成作業中間過程處理。
Transformation 操作是延遲計算的,也就是說從一個RDD 轉換生成另一個 RDD 的轉換操作不是馬上執行,需要等到有 Action 操作的時候才會真正觸發運算。
2)Action 行動運算元:這類運算元會觸發 SparkContext 提交 Job 作業。
Action 運算元會觸發 Spark 提交作業(Job),並將資料輸出 Spark系統。
從小方向來說,Spark 運算元大致可以分為以下三類:
1)Value資料型別的Transformation運算元,這種變換並不觸發提交作業,針對處理的資料項是Value型的資料。
2)Key-Value資料型別的Transfromation運算元,這種變換並不觸發提交作業,針對處理的資料項是Key-Value型的資料對。
3)Action運算元,這類運算元會觸發SparkContext提交Job作業。
引言
通常寫spark的程式用scala比較方便,畢竟spark的原始碼就是用scala寫的。然而,目前java開發者特別多,尤其進行資料對接、上線服務的時候,這時候,就需要掌握一些spark在java中的使用方法了
一、map
map在進行資料處理、轉換的時候,不能更常用了
在使用map之前 首先要定義一個轉換的函式 格式如下:
Function<String, LabeledPoint> transForm = new Function<String, LabeledPoint>() {//String是某一行的輸入型別 LabeledPoint是轉換後的輸出型別 @Override public LabeledPoint call(String row) throws Exception {//重寫call方法 String[] rowArr = row.split(","); int rowSize = rowArr.length; double[] doubleArr = new double[rowSize-1]; //除了第一位的lable外 其餘的部分解析成double 然後放到陣列中 for (int i = 1; i < rowSize; i++) { String each = rowArr[i]; doubleArr[i] = Double.parseDouble(each); } //用剛才得到的資料 轉成向量 Vector feature = Vectors.dense(doubleArr); double label = Double.parseDouble(rowArr[0]); //構造用於分類訓練的資料格式 LabelPoint LabeledPoint point = new LabeledPoint(label, feature); return point; } };
需要特別注意的是:
1、call方法的輸入應該是轉換之前的資料行的型別 返回值應是處理之後的資料行型別
2、如果轉換方法中呼叫了自定義的類,注意該類名必須實現序列化 比如
public class TreeEnsemble implements Serializable {
}
3、轉換函式中如果呼叫了某些類的物件,比如該方法需要呼叫外部的一個引數,或者數值處理模型(標準化,歸一化等),則該物件需要宣告是final
然後就是在合適的時候呼叫該轉換函數了
JavaRDD<LabeledPoint> rdd = oriData.toJavaRDD().map(transForm);
這種方式是需要將普通的rdd轉成javaRDD才能使用的,轉成javaRDD的這一步操作不耗時,不用擔心
二、filter
在避免資料出現空值、0等場景中也非常常用,可以滿足sql中where的功能
這裡首先也是要定義一個函式,該函式給定資料行 返回布林值 實際效果是將返回為true的資料保留
Function<String, Boolean> boolFilter = new Function<String, Boolean>() {//String是某一行的輸入型別 Boolean是對應的輸出型別 用於判斷資料是否保留
@Override
public Boolean call(String row) throws Exception {//重寫call方法
boolean flag = row!=null;
return flag;
}
};
通常該函式實際使用中需要修改的僅僅是row的型別 也就是資料行的輸入型別,和上面的轉換函式不同,此call方法的返回值應是固定為Boolean
然後是呼叫方式
JavaRDD<LabeledPoint> rdd = oriData.toJavaRDD().filter(boolFilter);
三、mapToPair
該方法和map方法有一些類似,也是對資料進行一些轉換。不過此函式輸入一行 輸出的是一個元組,最常用的方法是用來做交叉驗證 或者統計錯誤率 召回率 計算AUC等等
同樣,需要先定義一個轉換函式
Function<String, Boolean> transformer = new PairFunction<LabeledPoint, Object, Object>() {//LabeledPoint是輸入型別 後面的兩個Object不要改動
@Override
public Tuple2 call(LabeledPoint row) throws Exception {//重寫call方法 通常只改動輸入引數 輸出不要改動
double predicton = thismodel.predict(row.features());
double label = row.label();
return new Tuple2(predicton, label);
}
});
關於呼叫的類、類的物件,要求和之前的一致,類需要實現序列化,類的物件需要宣告成final型別
相應的呼叫如下:
JavaPairRDD<Object, Object> predictionsAndLabels = oriData.mapToPair(transformer);
然後對該predictionsAndLabels的使用,計算準確率、召回率、精準率、AUC