1. 程式人生 > >在Spark中使用Kryo序列化

在Spark中使用Kryo序列化

spark序列化
 對於優化<網路效能>極為重要,將RDD以序列化格式來儲存減少記憶體佔用. spark.serializer=org.apache.spark.serializer.JavaSerialization


Spark預設 使用Java自帶的ObjectOutputStream 框架來序列化物件,這樣任何實現了 java.io.Serializable 介面的物件,都能被序列化。同時,還可以通過擴充套件 java.io.Externalizable 來控制序列化效能。Java序列化很靈活但效能差速度很慢,同時序列化後佔用的位元組數也較多。


spark.serializer=org.apache.spark.serializer.KryoSerialization


KryoSerialization速度快,可以配置為任何org.apache.spark.serializer的子類。但Kryo也不支援所有實現了 java.io.Serializable 介面的型別,它需要你在程式中 register 需要序列化的型別,以得到最佳效能。


LZO的支援要求先安裝 Hadoop-lzo包(每個節點), 並放到 Spark本地庫中。如果是Debian包安裝,在呼叫spark-submit時加上 --driver-library-path /usr/lib/hadoop/lib/native/ --driver-class-path /usr/lib/hadoop/lib/ 就可以。 下載lzo http://cn.jarfire.org/hadoop.lzo.html


在 SparkConf 初始化的時候呼叫 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) 使用 Kryo。這個設定不僅控制各個worker節點之間的混洗資料序列化格式,同時還控制RDD存到磁碟上的序列化格式。需要在使用時註冊需要序列化的型別,建議在對網路敏感的應用場景下使用Kryo。 如果你的自定義型別需要使用Kryo序列化,可以用 registerKryoClasses 方法先註冊:


val conf = new SparkConf.setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

最後,如果你不註冊需要序列化的自定義型別,Kryo也能工作,不過每一個物件例項的序列化結果都會包含一份完整的類名,這有點浪費空間。
在Scala中使用New API (Twitter Elephant Bird 包) lzo JsonInputFormat讀取 LZO 演算法壓縮的 JSON 檔案:
val input = sc.newAPIHadoopFile(inputFile, classOf[lzoJsonInputFormat], classOf[LongWritable], classOf[MapWritable], conf)
inputFile: 輸入路徑
接收第一個類:“格式”類,輸入格式
接收第二個類:“鍵”
接收第二個類:“值”
conf:設定一些額外的壓縮選項
在Scala中使用老API直接讀取 KeyValueTextInputFormat()最簡單的Hadoop輸入格式 :
val input = sc.HadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{ case (x, y) => (x.toString, y.toString) }

注:如果讀取單個壓縮過的輸入,做好不要考慮使用Spark的封裝(textFile/SequenceFile..),而是使用 newAPIHadoopFile 或者 HadoopFile,並指定正確的壓縮解碼器。 有些輸入格式(如SequenceFile)允許我們只壓縮鍵值對資料中的值,這在查詢時很有用。其它一些輸入格式也有自己的壓縮控制,如:Twitter Elephant Bird 包中的許多格式都可以使用LZO演算法壓縮資料。