1. 程式人生 > >spark記錄(3)spark算子之Transformation

spark記錄(3)spark算子之Transformation

ace 使用 ble pan 寫入 1.2 插入 get .text

一、map、flatMap、mapParations、mapPartitionsWithIndex

1.1 map

map十分容易理解,他是將源JavaRDD的一個一個元素的傳入call方法,並經過算法後一個一個的返回從而生成一個新的JavaRDD。

(1) 使用Java進行編寫

public static void map() {
        List<String> list = Arrays.asList("李光洙","劉在石","哈哈","宋智孝");
        JavaRDD<String> rdd = jsc.parallelize(list);
        
        JavaRDD
<String> map = rdd.map(new Function<String, String>() { private static final long serialVersionUID = 1L; @Override public String call(String name) throws Exception { return "hello,"+name; } }); map.foreach(
new VoidFunction<String>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(String msg) throws Exception { System.out.println(msg); } }); }

(2) 使用scala進行編寫

  def map(): Unit = {
    val list = List("李光洙","劉在石","哈哈","宋智孝");
    val rdd = sc.parallelize(list)
    val map = rdd.map(s => "hello," + s).foreach(println) 
  }

(3)運行結果

技術分享圖片

(4) 總結

可以看出,對於map算子,源JavaRDD的每個元素都會進行計算,由於是依次進行傳參,所以他是有序的,新RDD的元素順序與源RDD是相同的。而由有序又引出接下來的flatMap。

1.2 flatMap

flatMap與map一樣,是將RDD中的元素依次的傳入call方法,他比map多的功能是能在任何一個傳入call方法的元素後面添加任意多元素,而能達到這一點,正是因為其進行傳參是依次進行的。

(1) 使用Java進行編寫

    public static void flatmap() {
        List<String> list = Arrays.asList("李光洙 劉在石","哈哈 宋智孝");
        JavaRDD<String> rdd = jsc.parallelize(list);
        
        JavaRDD<String> map = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        }).map(new Function<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public String call(String s) throws Exception {
                return "你好," + s;
            }
        });
        
        map.foreach(new VoidFunction<String>() {
            
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
        
    }

(2) 使用scala進行編寫

  def flatmap(): Unit = {
    val list = List("李光洙 劉在石","哈哈 宋智孝");
    val rdd = sc.parallelize(list)
    rdd.flatMap(_.split(" ")).map("你好,"+_).foreach(println)
  }

(3) 運行結果

技術分享圖片

(4) 總結

flatMap的特性決定了這個算子在對需要隨時增加元素的時候十分好用,比如在對源RDD查漏補缺時。

map和flatMap都是依次進行參數傳遞的,但有時候需要RDD中的兩個元素進行相應操作時(例如:算存款所得時,下一個月所得的利息是要原本金加上上一個月所得的本金的),這兩個算子便無法達到目的了,這是便需要mapPartitions算子,他傳參的方式是將整個RDD傳入,然後將一個叠代器傳出生成一個新的RDD,由於整個RDD都傳入了,所以便能完成前面說的業務。

map是對RDD中元素逐一進行函數操作映射為另外一個RDD,而flatMap操作是將函數應用於RDD之中的每一個元素,將返回的叠代器的所有內容構成新的RDD。而flatMap操作是將函數應用於RDD中每一個元素,將返回的叠代器的所有內容構成RDD。

flatMap與map區別在於map為“映射”,而flatMap“先映射,後扁平化”,map對每一次(func)都產生一個元素,返回一個對象,而flatMap多一步就是將所有對象合並為一個對象。

1.3 mapPartitions

與map方法類似,map是對rdd中的每一個元素進行操作,而mapPartitions(foreachPartition)則是對rdd中的每個分區的叠代器進行操作。如果在map過程中需要頻繁創建額外的對象(例如將rdd中的數據通過jdbc寫入數據庫,map需要為每個元素創建一個鏈接而mapPartition為每個partition創建一個鏈接),則mapPartitions效率比map高的多。

(1) 使用Java進行編寫

    public static void mapPartitions() {
        JavaRDD<String> textFile = jsc.textFile("words",3);
        textFile.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
            
            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(Iterator<String> is) throws Exception {
                System.out.println("創建數據庫連接。。。。");
                List<String> list = new ArrayList<String>();
                while(is.hasNext()) {
                    list.add(is.next());
                    System.out.println("模擬向數據庫插入批量數據。。。");
                }
                System.out.println("關閉數據庫連接。。。");
                return list;
            }
        }).collect();
        
    }

(2) 使用scala進行編寫

  def mapPartitions: Unit = {
    val rdd1 = sc.textFile("words")
    val mapResult = rdd1.mapPartitions(iter =>{
        println("打開數據庫。。。")
        val list = List()
        while(iter.hasNext){
          list.addString(new StringBuilder(iter.next()))
          println("插入數據庫。。。")
        }
        println("關閉數據庫。。。")
        list.iterator
      }, false)
    mapResult.foreach(println)
  }

(3) 運行結果

技術分享圖片

(4)總結

mapPartitions比較適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支。

spark記錄(3)spark算子之Transformation