1. 程式人生 > >利用Kryo序列化庫是你提升Spark效能要做的第一件事

利用Kryo序列化庫是你提升Spark效能要做的第一件事

本文基於Spark2.1.0版本
套用官文Tuning Spark中的一句話作為文章的標題:

*Often, choose a serialization type will be the first thing you should tune to optimize a Spark application. *

在Spark的架構中,在網路中傳遞的或者快取在記憶體、硬碟中的物件需要進行序列化操作,序列化的作用主要是利用時間換空間:

  • 分發給Executor上的Task
  • 需要快取的RDD(前提是使用序列化方式快取)
  • 廣播變數
  • Shuffle過程中的資料快取
  • 使用receiver方式接收的流資料快取
  • 運算元函式中使用的外部變數

上面的六種資料,通過Java序列化(預設的序列化方式)形成一個二進位制位元組陣列,大大減少了資料在記憶體、硬碟中佔用的空間,減少了網路資料傳輸的開銷,並且可以精確的推測記憶體使用情況,降低GC頻率。

其好處很多,但是缺陷也很明顯:

  • 把資料序列化為位元組陣列、把位元組陣列反序列化為物件的操作,是會消耗CPU、延長作業時間的,從而降低了Spark的效能。

至少預設的Java序列化方式在這方面是不盡如人意的。Java序列化很靈活但效能較差,同時序列化後佔用的位元組數也較多。

所以官方也推薦儘量使用Kryo的序列化庫(版本2)

。官文介紹,Kryo序列化機制比Java序列化機制效能提高10倍左右,Spark之所以沒有預設使用Kryo作為序列化類庫,是因為它不支援所有物件的序列化,同時Kryo需要使用者在使用前註冊需要序列化的型別,不夠方便。

由於 Spark2.1.0預設對Task使用Java序列化(該序列化方式不允許修改,原始碼如下),

/**
   * Helper method to create a SparkEnv for a driver or an executor.
   */
  private def create(
      conf: SparkConf,
      executorId: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      isLocal: Boolean,
      numUsableCores: Int,
      ioEncryptionKey: Option[Array[Byte]],
      listenerBus: LiveListenerBus = null,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

    val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
    ...
    val serializer = instantiateClassFromConf[Serializer](
      "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    logDebug(s"Using serializer: ${serializer.getClass}")

    val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

    val closureSerializer = new JavaSerializer(conf)  --Task閉包函式使用Java序列化庫

所以本文主要針對下面這五種資料型別:

  • 需要快取的RDD(前提是使用序列化方式快取)
  • 廣播變數
  • Shuffle過程中的資料快取
  • 使用receiver方式接收的流資料快取
  • 運算元函式中使用的外部變數

其實從Spark 2.0.0版本開始,簡單型別、簡單型別陣列、字串型別的Shuffling RDDs 已經預設使用Kryo序列化方式了。

下面,我給出具體的流程,來切換到Kryo序列化庫。

先介紹幾個相關的配置:

Property Name Default Meaning
spark.serializer org.apache.spark.serializer.JavaSerializer Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java serialization works with any Serializable Java object but is quite slow, so we recommend using org.apache.spark.serializer.KryoSerializer and configuring Kryo serialization when speed is necessary.
spark.kryoserializer.buffer 64k Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker. This buffer will grow up to spark.kryoserializer.buffer.max if needed.
spark.kryoserializer.buffer.max 64m Maximum allowable size of Kryo serialization buffer. This must be larger than any object you attempt to serialize. Increase this if you get a "buffer limit exceeded" exception inside Kryo.
spark.kryo.classesToRegister (none) If you use Kryo serialization, give a comma-separated list of custom class names to register with Kryo. See the tuning guide for more details.
spark.kryo.referenceTracking true Whether to track references to the same object when serializing data with Kryo, which is necessary if your object graphs have loops and useful for efficiency if they contain multiple copies of the same object. Can be disabled to improve performance if you know this is not the case.
spark.kryo.registrationRequired false Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception if an unregistered class is serialized. If set to false (the default), Kryo will write unregistered class names along with each object. Writing class names can cause significant performance overhead, so enabling this option can enforce strictly that a user has not omitted classes from registration.
spark.kryo.registrator (none) If you use Kryo serialization, give a comma-separated list of classes that register your custom classes with Kryo. This property is useful if you need to register your classes in a custom way, e.g. to specify a custom field serializer. Otherwise spark.kryo.classesToRegister is simpler. It should be set to classes that extend KryoRegistrator. See the tuning guide for more details.
spark.kryo.unsafe false Whether to use unsafe based Kryo serializer. Can be substantially faster by using Unsafe Based IO.

配置說明:(當使用Kryo序列化庫時)

spark.kryo.classesToRegister:向Kryo註冊自定義的的型別,類名間用逗號分隔

spark.kryo.referenceTracking:跟蹤對同一個物件的引用情況,這對發現有迴圈引用或同一物件有多個副本的情況是很有用的。設定為false可以提高效能

spark.kryo.registrationRequired:是否需要在Kryo登記註冊?如果為true,則序列化一個未註冊的類時會丟擲異常

spark.kryo.registrator:為Kryo設定這個類去註冊你自定義的類。最後,如果你不註冊需要序列化的自定義型別,Kryo也能工作,不過每一個物件例項的序列化結果都會包含一份完整的類名,這有點浪費空間

spark.kryo.unsafe:如果想更加提升效能,可以使用Kryo unsafe方式

spark.kryoserializer.buffer:每個Executor中的每個core對應著一個序列化buffer。如果你的物件很大,可能需要增大該配置項。其值不能超過spark.kryoserializer.buffer.max

spark.kryoserializer.buffer.max:允許使用序列化buffer的最大值

spark.serializer:序列化時用的類,需要申明為org.apache.spark.serializer.KryoSerializer。這個設定不僅控制各個worker節點之間的混洗資料序列化格式,同時還控制RDD存到磁碟上的序列化格式及廣播變數的序列化格式。 

更多的Kryo配置及使用細節,參考文末的連結

主要的使用過程就三步:

  1. 設定序列化使用的庫
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  //使用Kryo序列化庫
  1. 在該庫中註冊使用者定義的型別
conf.set("spark.kryo.registrator", toKryoRegistrator.class.getName());       //在Kryo序列化庫中註冊自定義的類集合
  1. 在自定義類中實現KryoRegistrator介面的registerClasses方法
public static class toKryoRegistrator implements KryoRegistrator {
    public void registerClasses(Kryo kryo) {
        kryo.register(tmp1.class, new FieldSerializer(kryo, tmp1.class));  //在Kryo序列化庫中註冊自定義的類
        kryo.register(tmp2.class, new FieldSerializer(kryo, tmp2.class));  //在Kryo序列化庫中註冊自定義的類
    }
}

具體的原始碼如下(關鍵點見原始碼中的註釋):

import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.serializer.KryoRegistrator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import org.apache.spark.storage.StorageLevel;
import java.util.regex.Pattern;
import java.io.IOException;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.apache.spark.broadcast.Broadcast;

public final class javakryoserializer {
   private static final Pattern SPACE = Pattern.compile(" ");
   // This is our custom class we will configure Kyro to serialize
   static class tmp1 implements java.io.Serializable {
       public int total_;
       public int num_;
   }

   static class tmp2 implements java.io.Serializable {
       public tmp2 (String ss)
       {
           s = ss;
       }
       public String s;
   }

   public static class toKryoRegistrator implements KryoRegistrator {
       public void registerClasses(Kryo kryo) {
           kryo.register(tmp1.class, new FieldSerializer(kryo, tmp1.class));  //在Kryo序列化庫中註冊自定義的類
           kryo.register(tmp2.class, new FieldSerializer(kryo, tmp2.class));  //在Kryo序列化庫中註冊自定義的類
       }
   }

   public static void readToBuffer(StringBuffer buffer, String filePath) throws IOException {
       InputStream is = new FileInputStream(filePath);
       String line; // 用來儲存每行讀取的內容
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));
       line = reader.readLine(); // 讀取第一行
       while (line != null) { // 如果 line 為空說明讀完了
           buffer.append(line); // 將讀到的內容新增到 buffer 中
           buffer.append("\n"); // 新增換行符
           line = reader.readLine(); // 讀取下一行
       }
       reader.close();
       is.close();
   }

   public static void main(String[] args) throws Exception {
       SparkConf conf = new SparkConf().setMaster("local").setAppName("basicavgwithkyro");
       conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  //使用Kryo序列化庫,如果要使用Java序列化庫,需要把該行遮蔽掉
       conf.set("spark.kryo.registrator", toKryoRegistrator.class.getName());       //在Kryo序列化庫中註冊自定義的類集合,如果要使用Java序列化庫,需要把該行遮蔽掉
       JavaSparkContext sc = new JavaSparkContext(conf);
       StringBuffer sb = new StringBuffer();
       javakryoserializer.readToBuffer(sb, args[0]);
       final Broadcast<tmp2> stringBV = sc.broadcast(new tmp2(sb.toString()));

       JavaRDD<String> rdd1 = sc.textFile(args[1]);
       JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
           @Override
           public Iterator<String> call(String s) {
               return Arrays.asList(SPACE.split(s)).iterator();
           }
       });


       JavaRDD<Integer> rdd3 = rdd2.map(new Function<String, Integer>() {
           @Override
           public Integer call(String s) {
               String length = stringBV.value().s;  //只是為了使用廣播變數stringBV,沒有實際的意義
               String tmp = length;                 //只是為了使用廣播變數stringBV,沒有實際的意義
               return s.length();
           }
       });

       JavaRDD<tmp1> rdd4 = rdd3.map(new Function<Integer, tmp1>() {
           @Override
           public tmp1 call(Integer x) {
               tmp1 a = new tmp1();  //只是為了將rdd4中的元素型別轉換為tmp1型別的物件,沒有實際的意義
               a.total_ += x;
               a.num_ += 1;
               return a;
           }
       });

       rdd4.persist(StorageLevel.MEMORY_ONLY_SER());  //將rdd4以序列化的形式快取在記憶體中,因為其元素是tmp1物件,所以使用Kryo的序列化方式快取
       System.out.println("the count is " + rdd4.count());

       while (true) {}  //除錯命令,只是用來將程式掛住,方便在Driver 4040的WEB UI中觀察rdd的storage情況
       //sc.stop();
   }
}

上述原始碼,涉及了閉包中使用的廣播變數stringBV(是tmp2類的物件),以及對rdd4(元素是tmp1類的物件)的持久化,由於RDD的持久化佔用的記憶體看起來比較直觀,所以主要對比rdd4使用兩種序列化庫的區別。
使用預設的Java序列化庫的情況:快取後的 rdd4佔用記憶體空間137.7MB

應用程式執行時的資訊

4040埠 Driver WEB UI

使用Kryo序列化庫的情況:快取後的 rdd4佔用記憶體空間38.5MB

應用程式執行時的資訊

 

4040埠 Driver WEB UI

可以看出,使用了Kryo序列化庫後,rdd4在記憶體中佔用的空間從137.7MB降低到38.5MB,比使用Java序列化庫節省了4倍左右的空間(如果使用其他更適合壓縮的物件型別,應該能達到官方的所說的提升10倍的壓縮比)

當然,如果想進一步的節省記憶體、硬碟的空間,減少網路傳輸的資料量,可以配合的使用Spark支援的壓縮方式(目前預設是lz4),廣播變數、shuffle過程中的資料都預設使用壓縮功能。(注意,RDD預設是不壓縮的)

Property Name Default Meaning
spark.io.compression.codec lz4 The codec used to compress internal data such as RDD partitions, broadcast variables and shuffle outputs. By default, Spark provides three codecs: lz4, lzf, and snappy.
spark.broadcast.compress true Whether to compress broadcast variables before sending them. Generally a good idea.
spark.shuffle.compress true Whether to compress map output files. Generally a good idea.
spark.shuffle.spill.compress true Whether to compress data spilled during shuffles.
spark.rdd.compress false Whether to compress serialized RDD partitions (e.g. for StorageLevel.MEMORY_ONLY_SER in Java and Scala or StorageLevel.MEMORY_ONLY in Python). Can save substantial space at the cost of some extra CPU time.

RDD持久化操作時使用壓縮機制(注意,只有序列化後的RDD才能使用壓縮機制)

SparkConf 增加下面的配置
conf.set("spark.rdd.compress", "true");

效果很顯著吧!rdd4持久化後在記憶體中佔用的空間降低到1MB左右!

應用程式執行的資訊

4040埠 Driver WEB UI

 

使用壓縮機制,也會增加額外的開銷,也會影響到效能,這點需要注意。

相關連結
[Tuning Spark] (http://spark.apache.org/docs/latest/tuning.html)
[Spark Configuration] (http://spark.apache.org/docs/latest/configuration.html#compression-and-serialization)
[Kryo] (https://github.com/EsotericSoftware/kryo)