《深入理解Spark》之通過自定義分割槽器解決資料傾斜問題
package com.lyzx.day37 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class D1 { //partitionBy和自定義分割槽器解決資料傾斜的問題 def f1(sc:SparkContext): Unit ={ val r1 = sc.parallelize(1 to 100,4) val filterRdd = r1.filter(_ > 49) filterRdd.mapPartitionsWithIndex((idx,itr)=>{ while(itr.hasNext){ println("["+idx+"]"+itr.next()) } for(v <- itr) yield v }).collect() println(filterRdd.partitions.length) val reP = filterRdd .map(x=>(x,null)) .partitionBy(new P(r1.partitions.length)) .map(x=>x._1) reP.mapPartitionsWithIndex((idx,itr)=>{ while(itr.hasNext){ println("["+idx+"]"+itr.next()) } for(v <- itr) yield v }).collect() } //只對K,V鍵值對的鍵做操作 def f2(sc:SparkContext): Unit ={ val r1 = sc.parallelize(1 to 10).map(x=>(x,x)) r1.mapValues(_+1) .foreach(println) println("====================================") //做完mapValues之後再壓平 可以把一個值對映為一個集合 //在壓平的過程中把集合中的值掏出來,組合為(K1,V1),(K1,V2)的格式 r1.flatMapValues(x=>Seq(x+100,x)) .foreach(println) } } object D1{ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("day36").setMaster("local") val sc = new SparkContext(conf) val t = new D1 // t.f1(sc) t.f2(sc) sc.stop() } } //自定義分割槽器,把資料均勻的分配到各個Partition上 class P(num:Int) extends Partitioner{ private var N:Int = 0 override def numPartitions: Int = num //這麼做就和key沒有關係,只是完全按照資料的個數把他們分配到各個Partition中 override def getPartition(key: Any): Int = { val currentIndex = (N % num) N += 1 N = N % num //這樣做是為了讓N保持在0 - num-1 之間防止N超出long的範圍 currentIndex } }
相關推薦
《深入理解Spark》之通過自定義分割槽器解決資料傾斜問題
package com.lyzx.day37 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class D1 { //partitionBy和自定義分割槽器解決資料傾斜的問題 def
MR之partition自定義分割槽器
maptask執行的結果都會放到一個分割槽檔案中,這個分割槽檔案有自己的編號,這個編號是通過一個hash演算法來生成的,通過對context.write(k,v)中的k進行hash會產生一個值,相同的key產生的值是一樣的,所以這種辦法能將相同的key值放到一個分割槽中。分割槽中的值會發送給
《深入理解Spark》Spark自定義分割槽器
package com.lyzx.reviewDay27 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class
《深入理解Spark》之通過sample運算元找出導致資料傾斜的key
最近在整理原來學過的內容,看到sample運算元就寫一篇在實際開發中sample運算元的具體應用 sample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long) sample
Android 深入理解Android中的自定義屬性
1、引言 對於自定義屬性,大家肯定都不陌生,遵循以下幾步,就可以實現: 自定義一個CustomView(extends View )類 編寫values/attrs.xml,在其中編寫styleable和item等標籤元素 在佈局檔案中CustomView使用自定義的屬性(
深入理解Spark之ListenerBus監聽器
ListenerBus對消費佇列的實現 上圖為LiveListenerBus類的申明 self => 這句相當於給this起了一個別名為self LiveListenerBus負責將SparkListenerEvents非同步傳送過已經註冊過的S
帶你深入理解Android中的自定義屬性!!!
att omv world 過程 參數 and pla 開發 dimen 引言 對於自定義屬性,大家肯定都不陌生,遵循以下幾步,就可以實現: 1.自定義一個CustomView(extends View )類 2.編寫values/attrs.xml,在其中編寫styl
MapReduce之自定義分割槽器Partitioner
@[toc] ## 問題引出 >要求將統計結果按照條件輸出到不同檔案中(分割槽)。 比如:將統計結果按照**手機歸屬地不同省份**輸出到不同檔案中(分割槽) ## 預設Partitioner分割槽 ```java public class HashPartitioner extends Partitio
MT5客戶端CTP接入 國內期貨之三 自定義交易品種 實時資料接收
實時接收資料大概分為2種方式 方式1:直接通過webrequest方式去請求, 例如 void OnStart() { string cookie=NULL,headers; char post[],result[]; string url="htt
MT5客戶端CTP接入 國內期貨之二 自定義交易品種 歷史資料匯入
方法1: 介面插入歷史資料 1. 選中產品與資料週期 2 選擇歷史資料檔案 將資料插入 3. 檢視匯入歷史資料情況 切換1小時週期 資料自然推算出來 good! 4. K線效果圖 手工匯入成功
(二)Django進階之路 自定義管理器
在 Test 模型中構造管理器子類, 並同步如下資料庫 from django.db import models class Test(model.Model): test_id =
Spring Boot學習之路——自定義攔截器
Spring Boot簡介 Spring Boot很大程度上簡化了基於Spring的應用開發,只需要呼叫“run”方法就可以建立一個獨立的,產品級別的Spring應用。Spring Boot能夠為所有Spring開發提供一個從根本上更快,且隨處可得的入門體驗;
Java之struts2自定義攔截器
struts2自定義攔截器 攔截器生命週期: 隨著程式的開始而建立,隨著程式的結束而銷燬 (不可能每個訪問都建立一個攔截器) 方式一 實現Interceptor介面 public class MyInterceptor1 im
安卓專案實戰之強大的網路請求框架okGo使用詳解(二):深入理解Callback之自定義JsonCallback
前言 JSON是一種取代XML的資料結構,和xml相比,它更小巧但描述能力卻不差,由於它的小巧所以網路傳輸資料將減少更多流量從而加快了傳輸速度,目前客戶端伺服器返回的資料大多都是基於這種格式的,相應的我們瞭解的關於json的解析工具主要有兩個:Gson(Google官方出的)和fas
Spark資料過濾、自定義分割槽、Shuffer調優 經典案例(詳解)
案例: 根據學科取得最受歡迎的老師的前兩名 這個是資料 http://bigdata.edu360.cn/zhangsan http://bigdata.edu360.cn/zhangsan http://bigdata.edu360.cn/lisi http:
我的Android進階之旅------>Android如何通過自定義SeekBar來實現視訊播放進度條
首先來看一下效果圖,如下所示:其中進度條如下:接下來說一說我的思路,上面的進度拖動條有自定義的Thumb,在Thumb正上方有一個PopupWindow視窗,窗口裡面顯示當前的播放時間。在SeekBar右邊有一個文字框顯示當前播放時間/總時間。step1、先來看一看Popup
《深入理解Spark》之 reduceByKey
XML Code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
《深入理解Spark》之運算元詳解
XML Code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
《深入理解Spark》之spark Streaming概念的再理解
1、spark Streaming是一個微批處理的框架 2、批處理時間間隔 batchInterval >> 表示在batchInterval時間內Spark 所接收的資料被當做一個批次做處理 3、批處理時間間隔(batchInterval)、視窗長
《深入理解Spark》之 結構化流(spark streaming+spark SQL 處理結構化資料)的一個demo
最近在做關於spark Streaming + spark sql 結合處理結構化的資料的業務,下面是一個小栗子,有需要的拿走! package com.unistack.tamboo.compute.process.impl; import com.alibaba.