Flink序列化框架分析
1.Flink的TypeInformation類
TypeInformation是flink中所有型別的基類,其作為生產序列化器和比較的一個工具。它包括了型別的一些基本屬性,並可以通過它來生產序列化器(serializer),特殊情況下還可以生成型別比較器。(Flink中的比較器不僅僅是定義大小順序,更是處理keys的基本輔助工具)
- 基本型別:所有Java基本資料型別和對應的裝箱型別,加上void,String,Date,BigDecimal和BigInteger
- 基本陣列和物件陣列
- 複合型別:
- 輔助型別 (Option, Either, Lists, Maps, …)
- 泛型: Flink自身不會序列化泛型,而是藉助Kryo進行序列化.
POJO類非常有意思,因為POJO類可以支援複雜型別的建立,並且在定義keys時可以使用成員的名字:dataSet.join(another).where("name").equalTo("personName")。同時,POJO類對於執行時(runtime)是透明的,這使得Flink可以非常高效地處理它們。
1.1 POJO型別的規則
在滿足如下條件時,Flink會將這種資料型別識別成POJO型別(並允許以成員名引用欄位):
- 該類是public的並且是獨立的(即沒有非靜態的內部類)
- 該類有一個public的無參構造方法
- 該類(及該類的父類)的所有成員要麼是public的,要麼是擁有按照標準java bean命名規則命名的public getter和 public setter方法。
1.2 建立一個TypeInformation物件或序列化器###
建立一個TypeInformation物件時如下:
在Scala中,Flink使用在編譯時執行的巨集,在巨集可供呼叫時去捕獲所有泛型資訊。
// 重要: 為了能夠訪問'createTypeInformation' 的巨集方法,這個import是必須的 import org.apache.flink.streaming.api.scala._ val stringInfo: TypeInformation[String] = createTypeInformation[String] val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]
你也可以在Java使用相同的方法作為備選。
為了建立一個序列化器(TypeSerializer),只需要在TypeInformation 物件上呼叫typeInfo.createSerializer(config)方法。
config引數的型別是ExecutionConfig,它保留了程式的註冊的自定義序列化器的相關資訊。在可能用到TypeSerializer的地方,儘量傳入程式的ExecutionConfig,你可以呼叫DataStream 或 DataSet的 getExecutionConfig()方法獲取ExecutionConfig。一些內部方法(如:MapFunction)中,你可以通過將該方法變成一個Rich Function,然後呼叫getRuntimeContext().getExecutionConfig()獲取ExecutionConfig.
2 基本型別實現示例
以String為例:
//BasicTypeInfo.java public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<>(String.class, new Class<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);
StringSerializer如下
//StringSerializer.java public final class StringSerializer extends TypeSerializerSingleton<String> { private static final long serialVersionUID = 1L; public static final StringSerializer INSTANCE = new StringSerializer(); private static final String EMPTY = ""; @Override public boolean isImmutableType() { return true; } @Override public String createInstance() { return EMPTY; } @Override public String copy(String from) { return from; } @Override public String copy(String from, String reuse) { return from; } @Override public int getLength() { return -1; } @Override public void serialize(String record, DataOutputView target) throws IOException { StringValue.writeString(record, target); } @Override public String deserialize(DataInputView source) throws IOException { return StringValue.readString(source); } @Override public String deserialize(String record, DataInputView source) throws IOException { return deserialize(source); } @Override public void copy(DataInputView source, DataOutputView target) throws IOException { StringValue.copyString(source, target); } @Override public boolean canEqual(Object obj) { return obj instanceof StringSerializer; } @Override protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { return super.isCompatibleSerializationFormatIdentifier(identifier) || identifier.equals(StringValue.class.getCanonicalName()); } }
上面程式碼中出現的StringValue是真正進行input以及output序列化過程操作,基本型別都有相應的方法,後面會單獨說明下多欄位Record序列化形式。
StringComparator如下
public final class StringComparator extends BasicTypeComparator<String> { private static final long serialVersionUID = 1L; private static final int HIGH_BIT = 0x1 << 7; private static final int HIGH_BIT2 = 0x1 << 13; private static final int HIGH_BIT2_MASK = 0x3 << 6; public StringComparator(boolean ascending) { super(ascending); } @Override public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { String s1 = StringValue.readString(firstSource); String s2 = StringValue.readString(secondSource); int comp = s1.compareTo(s2); return ascendingComparison ? comp : -comp; } @Override public boolean supportsNormalizedKey() { return true; } @Override public boolean supportsSerializationWithKeyNormalization() { return false; } @Override public int getNormalizeKeyLen() { return Integer.MAX_VALUE; } @Override public boolean isNormalizedKeyPrefixOnly(int keyBytes) { return true; } @Override public void putNormalizedKey(String record, MemorySegment target, int offset, int len) {; final int limit = offset + len; final int end = record.length(); int pos = 0; while (pos < end && offset < limit) { char c = record.charAt(pos++); if (c < HIGH_BIT) { target.put(offset++, (byte) c); } else if (c < HIGH_BIT2) { target.put(offset++, (byte) ((c >>> 7) | HIGH_BIT)); if (offset < limit) { target.put(offset++, (byte) c); } } else { target.put(offset++, (byte) ((c >>> 10) | HIGH_BIT2_MASK)); if (offset < limit) { target.put(offset++, (byte) (c >>> 2)); } if (offset < limit) { target.put(offset++, (byte) c); } } } while (offset < limit) { target.put(offset++, (byte) 0); } } @Override public StringComparator duplicate() { return new StringComparator(ascendingComparison); } }
3 多欄位Record示例
在開始這部分原理分析之前可以先看個示例程式碼
//RecordTest.java public void testAddField() { try { // Add a value to an empty record Record record = new Record(); assertTrue(record.getNumFields() == 0); record.addField(this.origVal1); assertTrue(record.getNumFields() == 1); assertTrue(origVal1.getValue().equals(record.getField(0, StringValue.class).getValue())); // Add 100 random integers to the record record = new Record(); for (int i = 0; i < 100; i++) { IntValue orig = new IntValue(this.rand.nextInt()); record.addField(orig); IntValue rec = record.getField(i, IntValue.class); assertTrue(record.getNumFields() == i + 1); assertTrue(orig.getValue() == rec.getValue()); } // Add 3 values of different type to the record record = new Record(this.origVal1, this.origVal2); record.addField(this.origVal3); assertTrue(record.getNumFields() == 3); StringValue recVal1 = record.getField(0, StringValue.class); DoubleValue recVal2 = record.getField(1, DoubleValue.class); IntValue recVal3 = record.getField(2, IntValue.class); assertTrue("The value of the first field has changed", recVal1.equals(this.origVal1)); assertTrue("The value of the second field changed", recVal2.equals(this.origVal2)); assertTrue("The value of the third field has changed", recVal3.equals(this.origVal3)); } catch (Throwable t) { Assert.fail("Test failed due to an exception: " + t.getMessage()); } }
Record代表多個數值的記錄,其可以包含多個欄位(可空並不體現在該記錄中),內部有一個bitmap標記欄位是否被賦值。為了資料交換方便,Record中的資料都以bytes方式儲存,欄位在訪問時才被進行反序列化。當欄位被修改時首先是放在cache中,並在下次序列化時合入或者顯式呼叫updateBinaryRepresenation()方法。
Notes:
- 該record必須是一個可變的物件,這樣才可以被多個自定義方法使用來提升效能(後面單獨分析)。該record是一個比較中的物件,為了減少對每個欄位的序列化、反序列化操作,其儲存了比較大的狀態,需要有多個指標以及陣列,從而要佔用相對比較大的記憶體空間,在64位的JVM中要佔用超過200bytes。
- 該類是非執行緒安全的
4 存放Record的資料結構
針對上面提出的存放資料結構的疑問,這裡繼續深入分析下。
-
將record放在一個迭代器中,當前存在一個叫BlockResettableMutableObjectIterator,其包含如下一些方法,讀寫都是在這個迭代器中進行。
Record迭代器.png
其中以無引數next()方法為示例走讀儲存或者讀取流程,程式碼如下:
public T next() throws IOException { // check for the left over element if (this.readPhase) { return getNextRecord(); } else { // writing phase. check for leftover first T result = null; if (this.leftOverReturned) { // get next record if ((result = this.input.next()) != null) { if (writeNextRecord(result)) { return result; } else { // did not fit into memory, keep as leftover this.leftOverRecord = this.serializer.copy(result); this.leftOverReturned = false; this.fullWriteBuffer = true; return null; } } else { this.noMoreBlocks = true; return null; } } else if (this.fullWriteBuffer) { return null; } else { this.leftOverReturned = true; return this.leftOverRecord; } } }
通過原始碼可以看出,在方法執行時根據標記判斷是讀取還是寫入流程,同時方法對應getNextRecord和writeNextRecord兩個方法,都在抽象類AbstractBlockResettableIterator中,兩個方法原始碼如下:
protected T getNextRecord() throws IOException { if (this.numRecordsReturned < this.numRecordsInBuffer) { this.numRecordsReturned++; return this.serializer.deserialize(this.readView); } else { return null; } }
protected boolean writeNextRecord(T record) throws IOException { try { this.serializer.serialize(record, this.collectingView); this.numRecordsInBuffer++; return true; } catch (EOFException eofex) { return false; } }
其中存放資料是基於Flink記憶體管理部分進行申請以及維護大小等,相關初始化原始碼如下:
memoryManager.allocatePages(ownerTask, emptySegments, numPages); this.collectingView = new SimpleCollectingOutputView(this.fullSegments, new ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize()); this.readView = new RandomAccessInputView(this.fullSegments, memoryManager.getPageSize());
5 Flink 如何直接操作二進位制資料
Flink 提供瞭如 group、sort、join 等操作,這些操作都需要訪問海量資料。這裡,我們以sort為例,這是一個在 Flink 中使用非常頻繁的操作。
首先,Flink 會從 MemoryManager 中申請一批 MemorySegment,我們把這批 MemorySegment 稱作 sort buffer,用來存放排序的資料。

sort示例.png
我們會把 sort buffer 分成兩塊區域。一個區域是用來存放所有物件完整的二進位制資料。另一個區域用來存放指向完整二進位制資料的指標以及定長的序列化後的key(key+pointer)。如果需要序列化的key是個變長型別,如String,則會取其字首序列化。如上圖所示,當一個物件要加到 sort buffer 中時,它的二進位制資料會被加到第一個區域,指標(可能還有key)會被加到第二個區域。
將實際的資料和指標加定長key分開存放有兩個目的。第一,交換定長塊(key+pointer)更高效,不用交換真實的資料也不用移動其他key和pointer。第二,這樣做是快取友好的,因為key都是連續儲存在記憶體中的,可以大大減少 cache miss(後面會詳細解釋)。
排序的關鍵是比大小和交換。Flink 中,會先用 key 比大小,這樣就可以直接用二進位制的key比較而不需要反序列化出整個物件。因為key是定長的,所以如果key相同(或者沒有提供二進位制key),那就必須將真實的二進位制資料反序列化出來,然後再做比較。之後,只需要交換key+pointer就可以達到排序的效果,真實的資料不用移動。

sort指標.png
最後,訪問排序後的資料,可以沿著排好序的key+pointer區域順序訪問,通過pointer找到對應的真實資料,並寫到記憶體或外部(更多細節可以看這篇文章 Joins in Flink)。
5.1 快取友好的資料結構和演算法
隨著磁碟IO和網路IO越來越快,CPU逐漸成為了大資料領域的瓶頸。從 L1/L2/L3 快取讀取資料的速度比從主記憶體讀取資料的速度快好幾個量級。通過效能分析可以發現,CPU時間中的很大一部分都是浪費在等待資料從主記憶體過來上。如果這些資料可以從 L1/L2/L3 快取過來,那麼這些等待時間可以極大地降低,並且所有的演算法會因此而受益。
在上面討論中我們談到的,Flink 通過定製的序列化框架將演算法中需要操作的資料(如sort中的key)連續儲存,而完整資料儲存在其他地方。因為對於完整的資料來說,key+pointer更容易裝進快取,這大大提高了快取命中率,從而提高了基礎演算法的效率。這對於上層應用是完全透明的,可以充分享受快取友好帶來的效能提升。
References
- 1. ofollow,noindex">Data Types & Serialization