1. 程式人生 > >spark部分運算元的彙總大全(包含Transformations類運算元,action類運算元,持久化運算元等 )【文字說明+Scala程式碼+程式碼連結】

spark部分運算元的彙總大全(包含Transformations類運算元,action類運算元,持久化運算元等 )【文字說明+Scala程式碼+程式碼連結】

一.Spark中的運算元總結(原理) 

Spark運算元
        1).Transformations ,轉換運算元,懶執行,需要Action類運算元觸發。
            map/mapToPair,flatMap,filter,reduceByKey,sample,sortBy/sortByKey,groupByKey,join,leftOutJoin,rightOuterJoin,fullOuterJoin,distinct,union,intersection,subtract,repartition,coalesce,zip,zipWithIndex,mapPartitions,
            mapPartitionWithIndex,cogroup,mapValues,aggreagateByKey,combineByKey
        2).Action,行動運算元,觸發Action類運算元執行。Spark應用程式中(Spark Application)有一個Action運算元就有了一個job。
            take,frist,foreach,count,collect,reduce,foreachPartition,countByKey,countByValue
        3).持久化運算元。
            a).cache
                預設將資料持久化到記憶體,cache()=persist()=persist(StorageLevel.MEMORY_ONLY)
            b).persist
                可以手動指定資料持久化級別。
                MEMORY_ONLY
                MEMORY_ONLY_SER
                MEMORY_AND_DISK
                MEMORY_AND_DISK_SER
                "_2"代表有副本數,儘量避免使用"DISK_ONLY"級別。
            c).checkpoint
                將資料可以持久化到磁碟,指定的checkpoint目錄中,切斷checkpointRDD之前的依賴關係,使之後的RDD依賴於checkpoint目錄中的資料。需要設定checkpoint路徑。
                RDD lineage 非常長,每一個RDD之間邏輯複雜,計算耗時。對一個RDD進行checkpoint之前最好先cache下。
                    
            注意:
            a).cache和persist注意事項:
                i).cache和persist是懶執行,需要Action運算元觸發。
                ii).對一個RDD進行cache/persist之後,可以賦值給一個變數,下次直接使用這個變數就是使用的持久化的資料。
                iii).cache/persist之後不能緊跟Action類運算元。
            b).checkpoint執行流程:
                i).Spark任務執行完成之後,會從後往前回溯,找到CcheckpointRDD做標記。
                ii).回溯完成之後,重新計算標記RDD的資料,將資料放入checkpoint目錄中。
                iii).切斷RDD之間的依賴關係。

————————————————————————————————————————————————————————

二.運算元例項 

package com.bjsxt

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ListBuffer

object Suanzi {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("test").setMaster("local");
    val sc=new SparkContext(conf)
            val rdd2=sc.parallelize(Array(
        "love1","love2","love3","love4",
                "love5","love6","love7","love8",
                "love9","love10","love11","love12"         
                ),3)
                rdd2.foreach(println)
                println("_________________________________--")
                /**
                 * repartition重新分割槽
                 * 增加分割槽
                 * 是一個shuffle類的運算元
                 */
                val repartition1=rdd2.repartition(5)
                repartition1.foreach(println)
                println("+++++++++++++++++++++++++++++++++++++")
                /**
                 * repartition重新分割槽
                 * 減少分割槽
                 * 是一個shuffle類的運算元
                 */
                val repartition2=rdd2.repartition(1)
                repartition2.foreach(println)
                /**
                 * Coalesce
                 * 可以增多,也可以減少分割槽
                 * 底層有shuffle,但是需要設定,預設是false關閉的
                 * 
                 */
                println("##########################################")
                val coalesce1=rdd2.coalesce(6, false)
                coalesce1.foreach(println)
                            /**
                 * Coalesce
                 * 可以增多,也可以減少分割槽
                 * 底層有shuffle,但是需要設定,預設是false關閉的
                 * 
                 */
                println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%")
                val coalesce2=rdd2.coalesce(6, true)
                coalesce2.foreach(println)
                            /**
                 * Coalesce
                 * 可以增多,也可以減少分割槽
                 * 底層有shuffle,但是需要設定,預設是false關閉的
                 * 
                 */
                println("PPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP")
                val coalesce3=rdd2.coalesce(2, true)
                coalesce3.foreach(println)    
                println("*****************************************************")
                /**
                 * mapPartitionsWithIndex
                 * 根據索引進行分割槽
                 */
                    val    mapPartitionsWithIndex = rdd2.mapPartitionsWithIndex((index,iter)=>{
                  val list=ListBuffer[String]()
                  while(iter.hasNext){
                    val next=iter.next()
                        println("parallelize index=["+index+"],value=["+next+"]")
                        list.+=(next)
                  }
                  list.iterator              
                },false).collect()
                println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
                /**
                 * reduce,將value的值進行累計求和
                 */
    val rdd1=sc.parallelize(Array(("a",200),("b",200),("a",300),("d",400),("a",200),("b",200),("a",300),("d",400)))
    val result=rdd1.reduce((t1,t2)=>{
      val key=t1._1+"~"+t2._1
      val value=t1._2+t2._2
      (key,value)
    })
    println(result)
    println("?????????????????????????????????????????????????")
    /**
     * countbyvalue
     * 根據value值不同進行求和計算
     */
    val result1=rdd1.countByValue()
    result1.foreach(println)
    /**
     * countbykey
     * 根據value值不同進行求和計算
     */ 
    println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
    val result2=rdd1.countByKey()
    result2.foreach(println)
    /**
     * groupByKey
     * 按照key的值不同進行分組
     * 將value的值放在一起
     */
    println("""""""""""""""""""""""""""""""""""""""""""""""""")
       rdd1.groupByKey().foreach(println)
       /**
        * zip的意思就是合併成(k,v)格式
        * 即第一個rdd的值為key,第二個RDD的值為value
        */
       println("zip$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
        val rdd3=sc.parallelize(Array(
        "love1","love2","love3","love4",
                "love5","love6","love7","love8",
                "love9","love10","love11","love12"         
                ))
                val rdd4=sc.parallelize(Array(
        "love1","love2","love3","love4",
                "love5","love6","love7","love8",
                "love9","love10","love11","love12"         
                ))
      val zip=rdd3.zip(rdd4)
      zip.foreach(println)
    /**
     * zipWithIndex
     */
      println("parallelize##########@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
    val parallelize=sc.parallelize(Array(
        "love1","love2","love3","love4",
                "love5","love6","love7","love8",
                "love9","love10","love11","love12"         
                ),3)
                val zipWithIndex= parallelize.zipWithIndex()
                zipWithIndex.foreach(println)
      /**
       * mapPartitionsWithIndex和coalesce的混合使用
       */
      println("mapPartitionsWithIndex和coalesce的混合使用")
        val    mapPartitionsWithIndex1 = rdd3.mapPartitionsWithIndex((index,iter)=>{
                  val list=ListBuffer[String]()
                  while(iter.hasNext){
                    val next=iter.next()
                        println("parallelize index=["+index+"],value=["+next+"]")
                        list.+=(next)
                  }
                  list.iterator              
                },false)
                val coalesce=mapPartitionsWithIndex1.coalesce(7,true)
                val result0=coalesce.mapPartitionsWithIndex((index,iter)=>{
                  val list=ListBuffer[String]()
                  while(iter.hasNext){
                    val next=iter.next()
                        println("parallelize index=["+index+"],value=["+next+"]")
                        list.+=(next)
                  }
                  list.iterator    
                  
                },true).collect()
                result0.foreach(println)
  }
}

用心做事,只為方便你我,鼓勵一下我唄!

————————————————————————————————————————————————————————

Repartitions運算元:

重新分割槽

重新分割槽,可以增多分割槽,可以減少分割槽

是一個有shuffle類的運算元

重新分割槽,可以增多分割槽,可以減少分割槽

是一個有shuffle類的運算元

Coalesce也是重新分割槽,可以增多,也可以減少分割槽,還可以有一個過載的方法

可以產生shuffle,也可以不產生shuffle,預設是不產生shuffle的

Repartition底層封裝的是coalesce

Coalesce叢小的分割槽到大的分割槽,同時不讓產生shuffle,這樣是不起作用的。

增多分割槽經常repartition

減少分割槽經常用coalesce

用coalesce,同時不設定true產生shuffle,這樣子是不產生分割槽的。

Repartition重新分割槽,可多可少,預設有shuffle

coalesce重新分割槽,可多可少,可以設定分割槽是否產生shuffle,coalesce(numpartition,shuffle=true/false),預設是false

由少的分割槽到多的分割槽,不讓產生shuffle,是不起作用的

Mappartition with index

Groupbykey:

將tuple型別的list轉換成用(K,V)格式的RDD用parallelizepares

依據key的值,將所有的value的值放在一起。

zipwithIndex:轉換成(k,v)格式的RDD

Zip就是將兩個RDD壓縮成一個RDD

Tuple型別的RDD也可以壓縮成k,v

Groupbykey

Reduce:累加求總數

Countbykey:數一數相同的key有幾個,按照pairRDD,統計相同的key有幾個

Countbyvalue,可以作用到(k,v)格式的RDD上,也可以作用在非(K,V)格式的RDD上

相關推薦

spark部分運算元彙總大全包含Transformations運算元action運算元持久化運算元 文字說明+Scala程式碼+程式碼連結

一.Spark中的運算元總結(原理)  Spark運算元         1).Transformations ,轉換運算元,懶執行,需要Action類運算元觸發。             map/mapToPair,flatMap,filter,reduceByKey,s

基於開源專案OpenCV的人臉識別Demo版整理不僅可以識別人臉還可以識別眼睛鼻子嘴模式識別中的翹楚

最近對人臉識別的程式非常感興趣,但是苦於沒有選修多媒體方向,看了幾篇關於人臉識別的論文,大概也沒看懂多少,什麼灰度處理啊,切割識別啊,雲裡霧裡,傻傻看不明白啊。各種苦惱。     於是就在網上找找,看有木有神馬開原始碼啊,要是有個現成的原始碼就更好了,百度it ,那些原始碼都憂傷的躲在CSDN中,老衲還

Android View的繼承體系大全包含125個view的所有子by 星空武哥

       安卓的view是一個龐大是繼承體系,今天花了一個晚上我總結一下view的繼承體系。view的直接繼承子類有12個,間接繼承子類有113個。今天我就總結一下所有的view的子類,希

Java版運算元彙總包括filtercollecttakefirst,sampleJava純程式碼

package com.bjsxt; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache

ROS知識16----如何編譯時自動鏈接同一個工作空間的其他包的頭文件包含messagesrvaction自動生成的頭文件

logs package fin 空間 依賴庫 osc div build 知識 catkin_make編譯時,往往需要自動鏈接同一個工作空間的其他包的頭文件。否則會出現類似如下的錯誤: /home/xx/xx_ws/srcA_package/src/db.hpp:13:

Java基礎------生成一個六位數的驗證碼包含大寫字母、小寫字母、數字並且不允許重復

參考 數組 rand 定義 ole ava length log post 問題描述:生成一個六位數的驗證碼(包含大寫字母、小寫字母、數字,並且不允許重復)? 參考代碼如下: import java.util.Arrays;import java.util.Random

Gradle打可執行Jar包包含依賴第三方庫中的

使用Gradle來打Jar包,在引入Gradle的java外掛後,直接就能實現 在build.gradle檔案中引入java外掛 plugins { id 'java' } 然後配置maifest主類 jar { manifest { attributes "M

C語言~巨集操作大全巨集定義、內建巨集、__FILE__、__LINE__、##用法

當然巨集定義非常重要的,它可以幫助我們防止出錯,提高程式碼的可移植性和可讀性等。 下面列舉一些成熟軟體中常用得巨集定義 1,防止一個頭檔案被重複包含 #ifndef COMDEF_H #define COMDEF_H //標頭檔案內容 … #endif

SparkStreaming部分的學習包括:sparkStreaming與storm的區別 Sparkstreaming處理資料的過程業務邏輯圖及文字說明

sparkStreaming與storm的區別:  Sparkstreaming處理資料的過程: sparkstreaming:資料是一段時間處理的,是一個微批處理,這個時間是由自己人為設定的。sparkstreaming的吞吐量高。 Storm:是純實時處理資料的,

ios中常用的小技巧大全總有你不知道的和你會用到的

/*1*/ tempString = [tempString stringByReplacingOccurrencesOfString:@" " withString:@""]; /*2 */tempString = [tempString stringByReplacingOccurrencesOfStri

Spark的RDD連續轉換操作有時需要注意強行觸發action執行操作否則Tansformation的惰性lazy機制會導致結果錯誤

最近通過spark做一些資料處理,遇到一些詭異的現象 我開發了一個隨機生成海量資料點的程式,因為要保證這些點具有自增序號,不適合直接map分散式做(幾十億的資料,map計算需要分割槽(不主動分割槽估計也會自動分割槽,spark自帶的資料累加邏輯只能對單個partitio

eclipse中使用spring boot 入門開發包含:與jsp頁面和資料庫互動cmd打包執行war包

突然想到自己有一段時間沒使用spring boot了,熟悉了一下之後決定記錄一下這次使用的注意點 一:使用springBoot搭出來一個架子(從前端到資料庫) 1.eclipse已經整合了maven,所以新建一個maven專案,然後針對專案修改下jdk相關點 2.目錄如

Android------視訊播放器包含全屏播放快退快進騰訊新聞的列表播放

前段時間做了一個新聞APP,涉及到了列表視訊播放,和騰訊新聞APP差不多,總結了一下程式碼,寫了一個Demo來分享給大家。用了  TabLayout+RecylerView+自定義視訊控制元件  完成的 列表中支援全屏播放來看看效果圖:  列表類程式碼:public clas

19ASP.NET Core EF建立模型包含屬性和排除屬性、主鍵、生成的值

1.什麼是Fluent API? EF中內嵌的約定將POCO類對映到表。但是,有時您無法或不想遵守這些約定,需要將實體對映到約定指示外的其他物件,所以Fluent API和註解都是一種方法,這兩種方法是用來配置EF在對映屬性時繞開約定。Code first fluent API最常訪問通過重寫OnModel

Cmake新手使用日記1C++11下的初體驗

pen 如何 其他 err ++ targe 使用 可執行文件 使用教程   第一次使用Cmake,搜索了很多使用教程,包括《Cmake實踐》、《Cmake手冊》等,但是在針對最新的C++11條件下編程還是會存在一點點問題,需要實驗很多次錯誤並搜索大量文章才能解決問題。這裏

純CSS畫的基本圖形矩形、圓形、三角形、多邊形、愛心、八卦

技術分享 部分 fin display 三角形 spl back transform 純css 今天在css-tricks上看到一篇文章,那篇文章讓我不禁心頭一震,強大的CSS啊,居然能畫出這麽多基本的圖形。圖形包括基本的矩形、圓形、橢圓、三角形、多邊形,也包括稍微復雜一點

ArcGIS API for JavaScript3.x 學習筆記[4] 加載底圖Open Street Map開放街道地圖

asc 裏的 指定 訪問 utf-8 gis sca utf 同方 Open Street Map OpenStreetMap(簡稱OSM,中文是開放街道地圖)是一個網上地圖協作計劃,目標是創造一個內容自由且能讓所有人編輯的世界地圖。 OSM是一款由網絡大眾共同打造的免費開

語音識別學習筆記基於向量量化的識別技術

語音識別學習筆記(二)【基於向量量化的識別技術】   概述  量化分為標量量化和向量量化(Vector Quantization,VQ)。標量量化是將取樣後的訊號值逐個進行量化,而適量量化是將若干個取樣訊號分成一組,即構成一個向量,然後對此向量一次進行量化。向量量化

04 -pandas索引的堆行列操作交換行列、聚合操作求和、最大值、最小值、平均值

引入模組 import pandas as pd from pandas import Series,DataFrame import matplotlib.pyplot as plt 建立示例DataFrame # 用作案例 不要刪 !!! data=np.random.ra

PAT 1123—— Is It a Complete AVL Tree平衡二叉樹左旋右旋各種旋

#include <cstdio> #include <algorithm> #include <vector> #include <iostream> #include <queue> using namespace std;