1. 程式人生 > >spark 幾種transformation 的計算邏輯和測試

spark 幾種transformation 的計算邏輯和測試

到網上看了一些資料,簡單的做個筆記。備忘。

測試例子使用的資料:

test01:

a a
b b
c c
d d
e e
f f
g g

test02:
1 1
2 2
3 3
4 4
5 5
6 6
a a
b b
c c
d d
e e
f f


1、union(otherRDD)

      union() 將兩個rdd簡單結合在一起,與mysql中的union 操作類似只不過它是操作的rdd,它不會改變partition中的資料

spark sql 測試:

./spark-shell
sc
val t01 = sc.textFile("hdfs://user/data_spark/test01")
val t02 = sc.textFile("hdfs://user/data_spark/test02")
t01.union(t01) foreach println

結果:
a a
e e
b b
a a
f f
b b
c c
c c
g g
d d
d d
e e
f f
g g


多次測試union,結果順序都是隨機的,所以,union只是簡單的將兩個rdd的資料拼接到一起

2、groupByKey(numPartitions)

      普通的RDD 類是沒有這個方法的,org.apache.spark.rdd.PairRDDFunctions 這個pairRdd提供這個方法;

       顧名思義,這個方法是將相同的key的records聚合在一起,類似mysql中的groupby操作,通過ShuffledRDD將每個partition中fetch過來,shuffle機制預設用的是hashShuffle,spark1.1版本引入sorted shuffle,速度更快。shuffle操作後面接著mapPartition()操作,生成MapPartitionRDD。這就是groupbykey的結果了。

      同一個key的值聚合以後,將所有的value放到一個arraylist,新的arraylist 作為value

val wc = t01.union(t01).flatMap(l=>l.split(" ")).map(w=>(w,1))
wc foreach println
結果:
(e,1)
(e,1)
(e,1)
(e,1)
(f,1)
(f,1)
(f,1)
(f,1)
(g,1)
(g,1)
(g,1)
(g,1)
(a,1)
(a,1)
(a,1)
(a,1)
(b,1)
(b,1)
(b,1)
(b,1)
(c,1)
(c,1)
(c,1)
(c,1)
(d,1)
(d,1)
(d,1)
(d,1)

wc.groupByKey foreach println
結果:
(d,CompactBuffer(1, 1, 1, 1))
(g,CompactBuffer(1, 1, 1, 1))
(c,CompactBuffer(1, 1, 1, 1))
(b,CompactBuffer(1, 1, 1, 1))
(f,CompactBuffer(1, 1, 1, 1))
(e,CompactBuffer(1, 1, 1, 1))
(a,CompactBuffer(1, 1, 1, 1))

ok,groupByKey 之後,將同一個key的值都放到一個列表中

3、reduceByKey(func,numPartition)

      這個操作的作用類似mapreduce中的reduce操作,對相同的key的值加上func的操作,比如要做wordcount的操作:

             map(x=>(x,1)).reduceByKey(_+_, 5)
      reduceByKey預設開啟map端的combine,上面的groupByKey預設沒有開啟map端的combine操作,可以人工設定一下。

接上面的測試

wc.reduceByKey(_+_) foreach println
結果:
(d,4)
(b,4)
(f,4)
(g,4)
(c,4)
(e,4)
(a,4)

4、distinct(numPartitions)

      將 parent rdd 的資料去重,放到新的numPartitions,還是要通過shuffle操作,如果是kv pair 的資料<k,v>  則直接進行shuffle 操作,如果只有key,那麼spark先將資料轉換成<k, null>再進行shuffle。其實後面呼叫的是reduceByKey()

wc.distinct(1) foreach println
結果:
(g,1)
(b,1)
(f,1)
(d,1)
(a,1)
(e,1)
(c,1)

5、cogroup(otherRDD,numPartitions)

     與groupByKey不同的地方,cogroup 是將多個rdd的資料聚合到一起,過程跟groupByKey 類似.

     但是結果是一個包含多個arraylist 的arraylist,每一個rdd 的value放到一個arraylist,然後,將這些arraylist放到一個元素的arraylist的arraylist。

val wc01 = t01.flatMap(l=>l.split(" ")).map(w=>(w,1))
val wc02 = t02.flatMap(l=>l.split(" ")).map(w=>(w,1))

wc01.cogroup(wc02,1) foreach println
結果:
(d,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(e,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(4,(CompactBuffer(),CompactBuffer(1, 1)))
(5,(CompactBuffer(),CompactBuffer(1, 1)))
(a,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(6,(CompactBuffer(),CompactBuffer(1, 1)))
(b,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(2,(CompactBuffer(),CompactBuffer(1, 1)))
(3,(CompactBuffer(),CompactBuffer(1, 1)))
(f,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(1,(CompactBuffer(),CompactBuffer(1, 1)))
(g,(CompactBuffer(1, 1),CompactBuffer()))
(c,(CompactBuffer(1, 1),CompactBuffer(1, 1)))


6、intersection(otherRDD)

     這個操作值保留兩個rdd中都包含的資料,首先將rdd的資料轉化成<k, ->,後面呼叫cogroup()操作。

     然後,  對cogroup結果進行過濾,由前面cogroup 的結果格式介紹可知,會生成包含兩個arraylist元素的arraylist,只保留結果中兩個arraylist都不為空的,最後取出key,便是最終的結果。

wc01.intersection(wc02) foreach println
結果:
(d,1)
(e,1)
(b,1)
(f,1)
(a,1)
(c,1)

只有兩個rdd共同的部分 kv 對

7、join(otherRDD, numPartitions)

     將兩個RDD[ K, V ] 安裝sql中的join方式聚合。類似intersection,先進行cogroup操作,得到<k, (Iterable[v1], Iterable[v2])> 的MappedValuesRDD。

     將 Iterable[v1] 和 Iterable[v2] 做笛卡爾集,並將集合flat()化,生成FlatMappedValuesRDD。

wc01.join(wc02,1) foreach println
結果:
(d,(1,1))
(d,(1,1))
(d,(1,1))
(d,(1,1))
(e,(1,1))
(e,(1,1))
(e,(1,1))
(e,(1,1))
(a,(1,1))
(a,(1,1))
(a,(1,1))
(a,(1,1))
(b,(1,1))
(b,(1,1))
(b,(1,1))
(b,(1,1))
(f,(1,1))
(f,(1,1))
(f,(1,1))
(f,(1,1))
(c,(1,1))
(c,(1,1))
(c,(1,1))
(c,(1,1))
這個jion應該對應於mysql的inner join,只包含雙方共有的資料

8、sortByKey(ascending,numPartitions)

     將RDD [ k, v ] 按照key進行排序,如果ascending=true表示升序,false表示降序。

     先通過shuffle將資料聚合到一起,然後將聚合的資料按照key排序

wc01.sortByKey(true,1) foreach println
結果:
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)

9、cartesian(otherRDD)

       求兩個rdd的笛卡爾集,生成的CartesianRDD中的partition個數為兩個rdd的partition的個數乘積。

       邏輯類似join

笛卡爾乘積,這個很簡單,不過資料量大的話就不要這麼做了

10、coalesce(numPartitions, shuffle = false)

        合併,對一個rdd,兩種方式,一種需要shuffle,一種直接將多個partitions的內容合併到一起,不需要shuffle。

        這個方法的主要作用就是調整 parentRDD 的partition數量。合併因素除了考慮partition的個數外,還應該考慮locality 和 balance的問題

這個操作的邏輯比較難理解:

 wc01.coalesce(1) foreach println
結果:
14/10/27 17:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:0+14
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
14/10/27 17:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:14+14
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)

 wc01.coalesce(2) foreach println
結果:
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(e,1)
(d,1)
(e,1)
(d,1)
(f,1)
(f,1)
(g,1)
(g,1)



11、repartition(numPartitions)       

        等價於coalesce(numPartitions, shuffle = true)

wc01.repartition(1) foreach println
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)


雖然mapreduce 相當於 spark 的 map +  reduceByKey, 但是 mapreduce中的reduce可以靈活的操作,加入一些自己的邏輯,所以,各有所長。

但是,spark 確實很方便       

相關推薦

spark transformation計算邏輯測試

到網上看了一些資料,簡單的做個筆記。備忘。 測試例子使用的資料: test01: a a b b c c d d e e f f g g test02: 1 1 2 2 3 3 4 4 5 5 6 6 a a b b c c d d e e f f1、union(other

分頁的方式(邏輯分頁物理分頁)

/** *//** * TestPageResultSetDAO.java * * Copyright 2008. All Rights Reserved. */package com.cosmow.pageresultset.dao;import java.sql.Connection;import jav

spark學習(基礎篇)--(第三節)Spark運行模式

一些記錄 image ica runner 1.3 函數 ive 啟動 driver h2 { color: #fff; background-color: #7CCD7C; padding: 3px; margin: 10px 0px } h3 { color: #fff

C#中實現並發的方法的性能測試

返回 也不會 thead syn image 9.png 結果 次數 存在 原文地址:https://www.cnblogs.com/durow/p/4837746.html 0x00 起因 去年寫的一個程序因為需要在局域網發送消息支持一些命令和簡單數據的傳輸,所以寫了

js數組遍歷的常用的方法以及差異性能優化

object length 回調 value 鏈式操作 item IT rip 需要 <script type="text/javascript"> /*對比: 1、map速度比foreach快

spark讀檔案的方式

spark.read.textFile和sc.textFile的區別 val rdd1 = spark.read.textFile("hdfs://han02:9000/words.txt")   //讀取到的是一個RDD物件 val rdd2 = sc.textFile("hdfs://han02:90

[Xcode10 實際操作]一、博主領進門-(13)在控制檯的列印輸出語句po命令

本文將演幾種在控制檯輸出日誌的方式。 在專案導航區,開啟檢視控制器的程式碼檔案【ViewController.swift】 1 import UIKit 2 3 class ViewController: UIViewController { 4 5 override

Java定時任務的方法(Thread Timer,執行緒池)

/**   * 普通thread   * 這是最常見的,建立一個thread,然後讓它在while迴圈裡一直執行著,   * 通過sleep方法來達到定時任務的效果。這樣可以快速簡單的實現,程式碼如

MySQL資料型別精度標度的情況

1、整型 int預設是int(11),建立欄位int(5),當儲存的資料長度大於5時,可以正常儲存,儲存的資料完整顯示; 2、浮點型 float(M,D)和double(M,D) 如果不寫精度和標度,則會按照實際精度值顯示,如果有精度和標度,則會自動將四捨五入的結果插入,不

java垃圾收集方法垃圾收集器

  標記清除法:   分為兩個階段,標記----清除    C標記階段將所有需要回收的物件做標記,然後在清除階段將所有的標記物件回收   但是這種回收方法有很大的缺點,那就是這兩個過程的的效率並不高,兩個過程都是效率很低的過程   另外一個缺點就是標記清除之後,因為之前並沒有移動

Spring容器中定義Bean初始化方法銷燬方法

Spring 容器中的 Bean 是有生命週期的,Spring 允許 Bean 在初始化完成後以及銷燬前執行特定的操作。下面是常用的三種指定特定操作的方法: 通過實現InitializingBean/DisposableBean 介面來定製初始化之後/銷燬之前

Spring容器中的Bean初始化方法銷燬方法的先後順序

Spring 容器中的 Bean 是有生命週期的,spring 允許 Bean 在初始化完成後以及銷燬前執行特定的操作。下面是常用的三種指定特定操作的方法: 通過實現InitializingBean/DisposableBean 介面來定製初始化之後/銷燬之前的操作方法;

禁用頁面快取的方法(靜態動態)

 1、在Asp頁面首部<head>加入    以下是引用片段:   Response.Buffer   =   True       Response.ExpiresAbsolute   =   Now()   -   1       Response.Exp

常用序列化反序列化方法

摘要 序列化和反序列化幾乎是工程師們每天都要面對的事情,但是要精確掌握這兩個概念並不容易:一方面,它們往往作為框架的一部分出現而湮沒在框架之中;另一方面,它們會以其他更容易理解的概念出現,例如加密、持久化。然而,序列化和反序列化的選型卻是系統設計或重構一個重要的環節,在

java多執行緒解決同步問題的方式、原理程式碼

生產者類: publicclassProducerextendsThread{// 每次生產的產品數量privateint num;// 所在放置的倉庫privateStorage storage;// 建構函式,設定倉庫publicProducer(Storage storage){this.stora

談談分散式Session的實現方式,SessionCookie的區別聯絡以及Session的實現原理

一。分散式Session的幾種實現方式 1.基於資料庫的Session共享 2.基於NFS共享檔案系統3.基於memcached 的session,如何保證 memcached 本身的高可用性?4. 基於resin/tomcat web容器本身的session複製機制5.

典型的軟體自動化測試框架

一個自動化測試框架就是一個由假設、概念以及為自動化測試提供支援的實踐的集合。以下描述五種基本的自動測試框架:模組化測試指令碼框架,測試庫構架框架,關鍵字驅動/表驅動測試框架,資料驅動測試框架,以及混合測試框架。可以根據實際需要去考慮採用其中的一種測試框架而不是僅僅依賴於一個簡

oracle update 方法容易理解使用的更新命令

      習慣了SQL server的update寫法,感覺如此優雅和簡便,近期要用oracle,是如此的不方便。經過努力發現三種寫法還是很不錯的,真不愧是大佬。     例子:兩個表,結構相同,都有編號和名稱。           create table tb1(  

ios 時間計算的使用

所屬框架層: NSDate 屬於Foundation  CFAbsoluteTimeGetCurrent() 屬於 CoreFoundation CACurrentMediaTime() 屬於 QuartzCore mach_absolute_time 系統底層A

獲取View寬高的方式及ViewViewGroup測量的簡單實現

自己指定測量規則 //這裡自己指定寬wrap_content 高100, view = (Button) findViewById(R.id.button1); int widthMeasureSpec = MeasureSpec.makeMeasureS