1. 程式人生 > >Spark效能優化:高效能序列化類庫

Spark效能優化:高效能序列化類庫

一、資料序列化概述:

在任何分散式系統中,序列化都是扮演著一個重要的角色的。如果使用的序列化技術,在執行序列化操作的時候很慢,或者是序列化後的資料還是很大,那麼會讓分散式應用程式的效能下降很多。所以,進行Spark效能優化的第一步,就是進行序列化的效能優化。

Spark自身預設就會在一些地方對資料進行序列化,比如Shuffle。還有就是,如果我們的運算元函式使用到了外部的資料(比如Java內建型別,或者自定義型別),那麼也需要讓其可序列化。

而Spark自身對於序列化的便捷性和效能進行了一個取捨和權衡。預設,Spark傾向於序列化的便捷性,使用了Java自身提供的序列化機制——基於ObjectInputStream

ObjectOutputStream的序列化機制。因為這種方式是Java原生提供的,很方便使用。

但是問題是,Java序列化機制的效能並不高。序列化的速度相對較慢,而且序列化以後的資料,還是相對來說比較大,還是比較佔用記憶體空間。因此,如果你的Spark應用程式對記憶體很敏感,那麼,實際上預設的Java序列化機制並不是最好的選擇。

二、 Spark提供的兩種序列化機制

Spark實際上提供了兩種序列化機制,它只是預設使用了第一種:

1、Java序列化機制:

預設情況下,Spark使用Java自身的ObjectInputStreamObjectOutputStream機制進行物件的序列化。只要你的類實現了Serializable介面,那麼都是可以序列化的。而且Java序列化機制是提供了自定義序列化支援的,只要你實現Externalizable

介面即可實現自己的更高效能的序列化演算法。Java序列化機制的速度比較慢,而且序列化後的資料佔用的記憶體空間比較大。

2、Kryo序列化機制:

Spark也支援使用Kryo類庫來進行序列化。Kryo序列化機制比Java序列化機制更快,而且序列化後的資料佔用的空間更小,通常比Java序列化的資料佔用的空間要小10倍。Kryo序列化機制之所以不是預設序列化機制的原因是,有些型別雖然實現了Seriralizable介面,但是它也不一定能夠進行序列化;此外,如果你要得到最佳的效能,Kryo還要求你在Spark應用程式中,對所有你需要序列化的型別都進行註冊。

三、 如何使用Kryo序列化機制(一)

如果要使用Kryo序列化機制,首先要用SparkConf設定一個引數,使用new SparkConf().set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)即可,即將Spark的序列化器設定為

KryoSerializer。這樣,Spark在內部的一些操作,比如Shuffle,進行序列化時,就會使用Kryo類庫進行高效能、快速、更低記憶體佔用量的序列化了。

使用Kryo時,它要求是需要序列化的類,是要預先進行註冊的,以獲得最佳效能——如果不註冊的話,那麼Kryo必須時刻儲存型別的全限定名,反而佔用不少記憶體。Spark預設是對Scala中常用的型別自動註冊了Kryo的,都在AllScalaRegistry類中。

但是,比如自己的運算元中,使用了外部的自定義型別的物件,那麼還是需要將其進行註冊。

(實際上,下面的寫法是錯誤的,因為counter不是共享的,所以累加的功能是無法實現的)

val counter = new Counter();
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
numbers.foreach(num => counter.add(num));

四、 如何使用Kryo序列化機制(二)

如果要註冊自定義的型別,那麼就使用如下的程式碼,即可:

Scala版本:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[Counter] ))
val sc = new SparkContext(conf)

Java版本:
SparkConf conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Counter.class)
JavaSparkContext sc = new JavaSparkContext(conf)

五、 優化Kryo類庫的使用

1、優化快取大小

如果註冊的要序列化的自定義的型別,本身特別大,比如包含了超過100個field。那麼就會導致要序列化的物件過大。此時就需要對Kryo本身進行優化。因為Kryo內部的快取可能不夠存放那麼大的class物件。此時就需要呼叫SparkConf.set()方法設定spark.kryoserializer.buffer.mb引數的值,將其調大。

預設情況下它的值是2,就是說最大能快取2M的物件,然後進行序列化。可以在必要時將其調大。比如設定為10。

2、預先註冊自定義型別

雖然不註冊自定義型別,Kryo類庫也能正常工作,但是那樣的話,對於它要序列化的每個物件,都會儲存一份它的全限定類名。此時反而會耗費大量記憶體。因此通常都建議預先註冊號要序列化的自定義的類。

六、 在什麼場景下使用Kryo序列化類庫?

首先,這裡討論的都是Spark的一些普通的場景,一些特殊的場景,比如RDD的持久化。

那麼,這裡針對的Kryo序列化類庫的使用場景,就是運算元函式使用到了外部的大資料的情況。比如說吧,我們在外部定義了一個封裝了應用所有配置的物件,比如自定義了一個MyConfiguration物件,裡面包含了100m的資料。然後,在運算元函式裡面,使用到了這個外部的大物件。

此時呢,如果預設情況下,讓Spark用java序列化機制來序列化這種外部的大物件,那麼就會導致,序列化速度緩慢,並且序列化以後的資料還是比較大,比較佔用記憶體空間。

因此,在這種情況下,比較適合,切換到Kryo序列化類庫,來對外部的大物件進行序列化操作。一是,序列化速度會變快;二是,會減少序列化後的資料佔用的記憶體空間