1. 程式人生 > >《深入理解Spark》之通過自定義分割槽器解決資料傾斜問題

《深入理解Spark》之通過自定義分割槽器解決資料傾斜問題

package com.lyzx.day37

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

class D1 {

  //partitionBy和自定義分割槽器解決資料傾斜的問題
  def f1(sc:SparkContext): Unit ={
     val  r1 = sc.parallelize(1 to 100,4)
     val filterRdd = r1.filter(_ > 49)
      filterRdd.mapPartitionsWithIndex((idx,itr)=>{
        while(itr.hasNext){
          println("["+idx+"]"+itr.next())
        }
        for(v <- itr) yield  v
      }).collect()
    println(filterRdd.partitions.length)
    val reP = filterRdd
                .map(x=>(x,null))
                .partitionBy(new P(r1.partitions.length))
                  .map(x=>x._1)

    reP.mapPartitionsWithIndex((idx,itr)=>{
      while(itr.hasNext){
        println("["+idx+"]"+itr.next())
      }
      for(v <- itr) yield  v
    }).collect()
  }


  //只對K,V鍵值對的鍵做操作
  def f2(sc:SparkContext): Unit ={
    val  r1 = sc.parallelize(1 to 10).map(x=>(x,x))
    r1.mapValues(_+1)
      .foreach(println)
    println("====================================")

    //做完mapValues之後再壓平 可以把一個值對映為一個集合
    //在壓平的過程中把集合中的值掏出來,組合為(K1,V1),(K1,V2)的格式
    r1.flatMapValues(x=>Seq(x+100,x))
        .foreach(println)
  }


}

object D1{

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("day36").setMaster("local")
    val sc = new SparkContext(conf)
    val t = new D1

//    t.f1(sc)
    t.f2(sc)
    sc.stop()
  }
}

//自定義分割槽器,把資料均勻的分配到各個Partition上
class P(num:Int) extends  Partitioner{
  private var N:Int = 0
  override def numPartitions: Int = num

  //這麼做就和key沒有關係,只是完全按照資料的個數把他們分配到各個Partition中
  override def getPartition(key: Any): Int = {
    val currentIndex = (N % num)
    N += 1
    N = N % num  //這樣做是為了讓N保持在0 - num-1 之間防止N超出long的範圍
    currentIndex
  }
}

相關推薦

深入理解Spark通過定義分割槽解決資料傾斜問題

package com.lyzx.day37 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class D1 { //partitionBy和自定義分割槽器解決資料傾斜的問題 def

MRpartition定義分割槽

maptask執行的結果都會放到一個分割槽檔案中,這個分割槽檔案有自己的編號,這個編號是通過一個hash演算法來生成的,通過對context.write(k,v)中的k進行hash會產生一個值,相同的key產生的值是一樣的,所以這種辦法能將相同的key值放到一個分割槽中。分割槽中的值會發送給

深入理解SparkSpark定義分割槽

package com.lyzx.reviewDay27 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class

深入理解Spark通過sample運算元找出導致資料傾斜的key

最近在整理原來學過的內容,看到sample運算元就寫一篇在實際開發中sample運算元的具體應用 sample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long) sample

Android 深入理解Android中的定義屬性

1、引言 對於自定義屬性,大家肯定都不陌生,遵循以下幾步,就可以實現: 自定義一個CustomView(extends View )類 編寫values/attrs.xml,在其中編寫styleable和item等標籤元素 在佈局檔案中CustomView使用自定義的屬性(

深入理解SparkListenerBus監聽器

ListenerBus對消費佇列的實現 上圖為LiveListenerBus類的申明 self => 這句相當於給this起了一個別名為self LiveListenerBus負責將SparkListenerEvents非同步傳送過已經註冊過的S

帶你深入理解Android中的定義屬性!!!

att omv world 過程 參數 and pla 開發 dimen 引言 對於自定義屬性,大家肯定都不陌生,遵循以下幾步,就可以實現: 1.自定義一個CustomView(extends View )類 2.編寫values/attrs.xml,在其中編寫styl

MapReduce定義分割槽Partitioner

@[toc] ## 問題引出 >要求將統計結果按照條件輸出到不同檔案中(分割槽)。 比如:將統計結果按照**手機歸屬地不同省份**輸出到不同檔案中(分割槽) ## 預設Partitioner分割槽 ```java public class HashPartitioner extends Partitio

MT5客戶端CTP接入 國內期貨定義交易品種 實時資料接收

實時接收資料大概分為2種方式 方式1:直接通過webrequest方式去請求, 例如 void OnStart() { string cookie=NULL,headers; char post[],result[]; string url="htt

MT5客戶端CTP接入 國內期貨定義交易品種 歷史資料匯入

方法1: 介面插入歷史資料 1.  選中產品與資料週期 2 選擇歷史資料檔案 將資料插入 3. 檢視匯入歷史資料情況 切換1小時週期 資料自然推算出來 good! 4.  K線效果圖   手工匯入成功

(二)Django進階定義管理

在 Test 模型中構造管理器子類, 並同步如下資料庫 from django.db import models class Test(model.Model): test_id =

Spring Boot學習路——定義攔截

Spring Boot簡介 Spring Boot很大程度上簡化了基於Spring的應用開發,只需要呼叫“run”方法就可以建立一個獨立的,產品級別的Spring應用。Spring Boot能夠為所有Spring開發提供一個從根本上更快,且隨處可得的入門體驗;

Javastruts2定義攔截

struts2自定義攔截器 攔截器生命週期: 隨著程式的開始而建立,隨著程式的結束而銷燬 (不可能每個訪問都建立一個攔截器) 方式一 實現Interceptor介面 public class MyInterceptor1 im

安卓專案實戰強大的網路請求框架okGo使用詳解(二):深入理解Callback定義JsonCallback

前言 JSON是一種取代XML的資料結構,和xml相比,它更小巧但描述能力卻不差,由於它的小巧所以網路傳輸資料將減少更多流量從而加快了傳輸速度,目前客戶端伺服器返回的資料大多都是基於這種格式的,相應的我們瞭解的關於json的解析工具主要有兩個:Gson(Google官方出的)和fas

Spark資料過濾、定義分割槽、Shuffer調優 經典案例(詳解)

案例: 根據學科取得最受歡迎的老師的前兩名 這個是資料 http://bigdata.edu360.cn/zhangsan http://bigdata.edu360.cn/zhangsan http://bigdata.edu360.cn/lisi http:

我的Android進階旅------>Android如何通過定義SeekBar來實現視訊播放進度條

首先來看一下效果圖,如下所示:其中進度條如下:接下來說一說我的思路,上面的進度拖動條有自定義的Thumb,在Thumb正上方有一個PopupWindow視窗,窗口裡面顯示當前的播放時間。在SeekBar右邊有一個文字框顯示當前播放時間/總時間。step1、先來看一看Popup

深入理解Spark reduceByKey

 XML Code  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

深入理解Spark運算元詳解

 XML Code  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

深入理解Sparkspark Streaming概念的再理解

1、spark Streaming是一個微批處理的框架 2、批處理時間間隔 batchInterval       >> 表示在batchInterval時間內Spark 所接收的資料被當做一個批次做處理 3、批處理時間間隔(batchInterval)、視窗長

深入理解Spark 結構化流(spark streaming+spark SQL 處理結構化資料)的一個demo

最近在做關於spark Streaming + spark sql 結合處理結構化的資料的業務,下面是一個小栗子,有需要的拿走! ​ package com.unistack.tamboo.compute.process.impl; import com.alibaba.