1. 程式人生 > >54:Spark中的Tungsten-sort Based Shuffle內幕

54:Spark中的Tungsten-sort Based Shuffle內幕

本期內容: 1. Tungsten-sort Based Shuffle原理 2. Tungsten-sort Based Shuffle原始碼 ShortShuffleManager MemoryManager ShuffleExternalSorter

Tungsten-sort Based Shuffle

今天在對鎢絲計劃思考的基礎上,講解下基於Tungsten的shuffle。

首先解釋下概念,Tungsten-sort是對普通sort的一種優化,排序的不是內容本身,而是內容序列化後位元組陣列的指標(元資料),把資料的排序轉變為了指標陣列的排序,實現了直接對序列化後的二進位制資料進行排序。由於直接基於二進位制資料進行操作,所以在這裡面沒有序列化和反序列化的過程。記憶體的消耗大大降低,相應的,會極大的減少的gc的開銷。

Page的管理

要做到這種,jvm的記憶體管理結構無法完成,所以提出了Page的概念。

Page是由block組成的,我們先看一下Block的結構,可以看到,除了記錄page編號外,Block內部組成是MemoryLocation。

在MemoryLocation中,重要的就是記錄了物件及初始位置的定位offset。實際執行可以onheap或者offheap(用NIO或者Tachyon管理)。

在shuffle角度,都是統一在SortShuffleManager中進行構造。可以看到,在如下位置構造了UnsafeShuffleWriter,但沒有UnsafeShuffleReader,從Tungsten角度講,reader使用的是HashShuffleReader。

從註釋中,可以看到資料一旦進來,就使用shuffle write進行序列化,在序列化的二進位制基礎上進行排序,這樣就可以減少記憶體的GC。這種優化需要我們的序列化器可以在不反序列化的情況下重新排序。

資料寫入

讓我們進入UnsafeShuffleWriter

會通過MyByteArrayOutputStream直接對記憶體操作

在write方法中,會迴圈記錄,寫入Sorter。

其中,serBuffle預設大小是1M,而且已經是序列化之後的資料了。

在插入前,首先會分配記憶體,之後會根據每條資料,採用遊標的方式進行遍歷,並計算找到recordAddress,完成插入操作。

在記憶體分配時,會有兩種分配方式UNSAFE和HEAP,內部各有一套自己的記憶體評估機制

此外,recordAddress是有一套自己的編解碼方式。

最終在插入時,僅僅是存放了一個RecordPointer,也就是資料指標。

小結

在具體插入操作的時候,以Page為核心單位,從Page角度講,插入記錄的時候,本身也有location和大小,需要找到page中指標的位置。在整個記憶體中有多個Page,每個Page有限定的大小,滿了之後會分配下一個Page。從JVM角度講,最底層的資料結構是位元組陣列,所以outputStream和序列化都是對位元組陣列來操作的。進行shuffle操作的時候,實際是對指標進行操作,這是沒有序列化和反序列化的關鍵。資料量也少,所以記憶體使用率低,大大減少了GC。

最後,說明下,即使配置了Tungsten shuffle,在一些情況也會自動變成sort-based shuffle,從資料結構角度講,限制蠻多,記錄不能太大,單條記錄不能超過128M,shuffle的時候中間過程不能產生太多的小檔案,不能超過160W,aggregation或者輸出後需要排序的操作也不可以。

相關推薦

54SparkTungsten-sort Based Shuffle內幕

本期內容: 1. Tungsten-sort Based Shuffle原理 2. Tungsten-sort Based Shuffle原始碼 ShortShuffleManager Memory

Spark Tungsten-sort Based Shuffle 分析

Tungsten-sort 算不得一個全新的shuffle 方案,它在特定場景下基於類似現有的Sort Based Shuffle處理流程,對記憶體/CPU/Cache使用做了非常大的優化。帶來高效的同時,也就限定了自己的使用場景。如果Tungsten-s

Spark-1.6.0Sort Based Shuffle原始碼解讀

  從Spark-1.2.0開始,Spark的Shuffle由Hash Based Shuffle升級成了Sort Based Shuffle。即Spark.shuffle.manager從Hash換成了Sort。不同形式的Shuffle邏輯主要是Shuffle

[spark] Shuffle Read解析 (Sort Based Shuffle)

本文將講解shuffle Reduce部分,shuffle的下游Stage的第一個rdd是ShuffleRDD,通過其compute方法來獲取上游Stage Shuffle Write溢寫到磁碟檔案資料的一個迭代器: override def com

Spark原始碼分析之Sort-Based Shuffle讀寫流程

override def read(): Iterator[Product2[K, C]] = {   // 構造ShuffleBlockFetcherIterator,一個迭代器,它獲取多個塊,對於本地塊,從本地讀取   // 對於遠端塊,通過遠端方法讀取val blockFetcherItr = new

[spark] Shuffle Write解析 (Sort Based Shuffle)

本文基於 Spark 2.1 進行解析 前言 從 Spark 2.0 開始移除了Hash Based Shuffle,想要了解可參考Shuffle 過程,本文將講解 Sort Based Shuffle。 ShuffleMapTask的結果(S

56SparkTungsten記憶體和CPU的優化使用

本期內容: 1. Tungsten記憶體分配優化使用 2. Tungsten的CPU的優化使用 今天聚焦於記憶體和CPU的優化使用,這是Spark2.0提供的關於執行時的非常大的優化部分。 對過去

Spark部分Spark取交集(intersection )和取差集(subtract )【Java版純程式碼】

package com.bjsxt.spark; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.Spar

Spark篇】---SparkShuffle文件的尋址

sta lock exe 數據 小文件 默認 節點 刪除 提高 一、前述 Spark中Shuffle文件的尋址是一個文件底層的管理機制,所以還是有必要了解一下的。 二、架構圖 三、基本概念: 1) MapOutputTracker MapOutputTracker是Spa

工作采坑劄記3. Sparkes-hadoop插件異常解決

-h adp elastic sed thread ould dex flush 文檔 1. Es-Hadoop異常: org.elasticsearch.hadoop.EsHadoopException: Could not write all entries [61

SparkSpark Shuffle詳解(多看幾遍)

Shuffle簡介 Shuffle描述著資料從map task輸出到reduce task輸入的這段過程。shuffle是連線Map和Reduce之間的橋樑,Map的輸出要用到Reduce中必須經過shuffle這個環節,shuffle的效能高低直接影響了整個程式的效能和吞吐量。因為在分散式情況

Spark Join——Broadcast Join、Shuffle Hash Join、Sort Merge Join

1. Broadcast Join 在資料庫的常見模型中(比如星型模型或者雪花模型),表一般分為兩種:事實表和維度表。維度表一般指固定的、變動較少的表,例如聯絡人、物品種類等,一般資料有限。而事實表一般記錄流水,比如銷售清單等,通常隨著時間的增長不斷膨脹。 因為

40Spark StreamingKafkaReceiver內幕實現徹底解密

本期內容: 1. KafkaInputDStream原始碼解密 2. KafkaReceiver原始碼解密 Direct方式,是No Receiver方式,和普通Receiver方式,最大的

sparkshuffle的過程------不看你後悔

Spark大會上,所有的演講嘉賓都認為shuffle是最影響效能的地方,但是又無可奈何。之前去百度面試hadoop的時候,也被問到了這個問題,直接回答了不知道。 這篇文章主要是沿著下面幾個問題來開展: 1、shuffle過程的劃分? 2、shuffle的中間結果如何

分享知識-快樂自己Java的經典算法之冒泡排序(Bubble Sort)

ble 減少 實現 ima main public 只需要 system 16px 原理:比較兩個相鄰的元素,將值大的元素交換至右端。 思路:依次比較相鄰的兩個數,將小數放在前面,大數放在後面。即在第一趟:首先比較第1個和第2個數,將小數放前,大數放後。然後比較第2個數和

STLalgorithm排序函式sort(升序排列函式)和reverse(反轉排列函式)的簡單用法

#include "stdafx.h" #include <iostream> #include <algorithm> using namespace std; int

Spark 入門之十二再看Spark的排程策略(Standlone)

資源排程是Spark中比較重要的內容,對排程的相關原理以及策略的瞭解對叢集的執行以及優化都會有很大的幫助,資源排程的方式有多種,Local,Standlone,Yarn,Mesos等,本文只針對Standlone的方式做簡介 幾個重要的概念 開始文章之前

大資料IMF傳奇行動絕密課程第54Spark效能優化第十季之Spark統一記憶體管理

Spark效能優化第十季之Spark統一記憶體管理 1、傳統的Spark記憶體管理的問題 2、Spark統一記憶體管理 3、展望 Spark記憶體分為三部分:Execution、Sotrage、Other; Shuffle,當記憶體不夠的時候下,磁碟I

Go_16GoLangflag標簽使用

span clas 格式化 print mes 初始 div fun sta   直接上代碼,在代碼中會做詳細的解釋,當前運行環境為:Go-1.8.1 package main import ( "flag" "log" "os" ) func

GO_05_2Golang panic、recover、defer 的用法

log logs lan finall 可能 錯誤處理 錯誤 異常處理 auto 函數 defer   1. 它的執行方式類似其他語言中的折構函數,在函數體執行結束後按照調用順序的 相反順序 逐個執行   2. 即使函數發生 嚴重錯誤 也會被執行,類似於 java 中 t