基於Netty的RPC簡單框架實現(三):Kryo實現序列化
1.序列化和反序列化
網路中都是以位元組序列的形式來傳輸資料的,因此在傳送訊息時需要先將物件序列化轉換為位元組序列,然後將獲得的位元組序列傳送出去,訊息接收方接收到位元組序列後將之反序列化獲得傳輸的物件,從收發雙方來看就如同直接傳送和接收了物件一樣。
2.第三方依賴
本例使用目前最新版的kryo-serializers 0.36用於序列化
使用maven直接在pom.xml中新增上面的依賴即可<dependency> <groupId>de.javakaffee</groupId> <artifactId>kryo-serializers</artifactId> <version>0.36</version> </dependency>
3.序列化和反序列化的實現
序列化和反序列化的物件在本例中只有兩種:1.客戶端向服務端發出的呼叫請求RpcRequest 2.服務端向客戶端返回的呼叫結果RpcResponse
(1).RpcResponse
從RPC服務端傳回給客戶端的某次呼叫請求的結果
RpcResponse中的id對應著該次請求的RpcRequest中的id,isInvokeSuccess表示呼叫中是否有異常丟擲,result和throwable分別表示呼叫結果和呼叫過程丟擲的異常。package com.maigo.rpc.context; public class RpcResponse { private int id; private Object result; private Throwable throwable; private boolean isInvokeSuccess; public int getId() { return id; } public void setId(int id) { this.id = id; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public Throwable getThrowable() { return throwable; } public void setThrowable(Throwable throwable) { this.throwable = throwable; } public boolean isInvokeSuccess() { return isInvokeSuccess; } public void setInvokeSuccess(boolean isInvokeSuccess) { this.isInvokeSuccess = isInvokeSuccess; } }
(2).RpcRequest
已在上一節中給出
(3).KryoSerializer
實際負責序列化和反序列化
serialize()將一個物件通過kryo序列化並寫入ByteBuf中,注意到在頭部預留了4個位元組用於寫入長度資訊。deserialize()將ByteBuf中的內容反序列化還原出傳輸的物件。其中序列化和反序列化均用到了kryo物件,該物件是從KryoHolder中通過get()拿到的。package com.maigo.rpc.serializer; import java.io.IOException; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; public class KryoSerializer { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; public static void serialize(Object object, ByteBuf byteBuf) { Kryo kryo = KryoHolder.get(); int startIdx = byteBuf.writerIndex(); ByteBufOutputStream byteOutputStream = new ByteBufOutputStream(byteBuf); try { byteOutputStream.write(LENGTH_PLACEHOLDER); Output output = new Output(1024*4, -1); output.setOutputStream(byteOutputStream); kryo.writeClassAndObject(output, object); output.flush(); output.close(); int endIdx = byteBuf.writerIndex(); byteBuf.setInt(startIdx, endIdx - startIdx - 4); } catch (IOException e) { e.printStackTrace(); } } public static Object deserialize(ByteBuf byteBuf) { if(byteBuf == null) return null; Input input = new Input(new ByteBufInputStream(byteBuf)); Kryo kryo = KryoHolder.get(); return kryo.readClassAndObject(input); } }
(4).KryoHolder
由於kryo物件是執行緒不安全的,當有多個netty的channel同時連線時,各channel是可能工作在不同的執行緒上的(netty中一個IO執行緒可以對應多個channel,而一個channel只能對應一個執行緒,詳細可以參考netty執行緒模型),若共用同一個kryo物件會出現併發問題,因此用ThreadLocal在每個執行緒保留一個各自的kryo物件,保證不會大量建立kryo物件的同時避免了併發問題
package com.maigo.rpc.serializer;
import com.esotericsoftware.kryo.Kryo;
public class KryoHolder
{
private static ThreadLocal<Kryo> threadLocalKryo = new ThreadLocal<Kryo>()
{
protected Kryo initialValue()
{
Kryo kryo = new KryoReflectionFactory();
return kryo;
};
};
public static Kryo get()
{
return threadLocalKryo.get();
}
}
可見,最終用於序列化和反序列化的kryo物件是通過new KryoReflectionFactory()建立的。
(5).KryoReflectionFactory
package com.maigo.rpc.serializer;
import java.lang.reflect.InvocationHandler;
import java.net.URI;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.GregorianCalendar;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;
import com.esotericsoftware.kryo.Serializer;
import com.maigo.rpc.context.RpcRequest;
import com.maigo.rpc.context.RpcResponse;
import de.javakaffee.kryoserializers.ArraysAsListSerializer;
import de.javakaffee.kryoserializers.BitSetSerializer;
import de.javakaffee.kryoserializers.CollectionsEmptyListSerializer;
import de.javakaffee.kryoserializers.CollectionsEmptyMapSerializer;
import de.javakaffee.kryoserializers.CollectionsEmptySetSerializer;
import de.javakaffee.kryoserializers.CollectionsSingletonListSerializer;
import de.javakaffee.kryoserializers.CollectionsSingletonMapSerializer;
import de.javakaffee.kryoserializers.CollectionsSingletonSetSerializer;
import de.javakaffee.kryoserializers.CopyForIterateCollectionSerializer;
import de.javakaffee.kryoserializers.CopyForIterateMapSerializer;
import de.javakaffee.kryoserializers.DateSerializer;
import de.javakaffee.kryoserializers.EnumMapSerializer;
import de.javakaffee.kryoserializers.EnumSetSerializer;
import de.javakaffee.kryoserializers.GregorianCalendarSerializer;
import de.javakaffee.kryoserializers.JdkProxySerializer;
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport;
import de.javakaffee.kryoserializers.RegexSerializer;
import de.javakaffee.kryoserializers.SubListSerializers;
import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer;
import de.javakaffee.kryoserializers.URISerializer;
import de.javakaffee.kryoserializers.UUIDSerializer;
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
public class KryoReflectionFactory extends KryoReflectionFactorySupport
{
public KryoReflectionFactory()
{
setRegistrationRequired(false);
setReferences(true);
register(RpcRequest.class, new RpcRequestSerializer());
register(RpcResponse.class, new RpcResponseSerializer());
register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer());
register(Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer());
register(Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer());
register(Collections.singletonList("").getClass(), new CollectionsSingletonListSerializer());
register(Collections.singleton("").getClass(), new CollectionsSingletonSetSerializer());
register(Collections.singletonMap("", "").getClass(), new CollectionsSingletonMapSerializer());
register(Pattern.class, new RegexSerializer());
register(BitSet.class, new BitSetSerializer());
register(URI.class, new URISerializer());
register(UUID.class, new UUIDSerializer());
register(GregorianCalendar.class, new GregorianCalendarSerializer());
register(InvocationHandler.class, new JdkProxySerializer());
UnmodifiableCollectionsSerializer.registerSerializers(this);
SynchronizedCollectionsSerializer.registerSerializers(this);
}
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public Serializer<?> getDefaultSerializer(Class clazz)
{
if(EnumSet.class.isAssignableFrom(clazz))
return new EnumSetSerializer();
if(EnumMap.class.isAssignableFrom(clazz))
return new EnumMapSerializer();
if(Collection.class.isAssignableFrom(clazz))
return new CopyForIterateCollectionSerializer();
if(Map.class.isAssignableFrom(clazz))
return new CopyForIterateMapSerializer();
if(Date.class.isAssignableFrom(clazz))
return new DateSerializer( clazz );
if (SubListSerializers.ArrayListSubListSerializer.canSerialize(clazz)
|| SubListSerializers.JavaUtilSubListSerializer.canSerialize(clazz))
return SubListSerializers.createFor(clazz);
return super.getDefaultSerializer(clazz);
}
}
匯入的包非常多,主要完成的功能是給大量類型別註冊其對應的Serializer。setRegistrationRequired()設定是否只能序列化已註冊的類,此處必須設定為false,因為RPC請求和迴應中都可能包含使用者自定義的類,這些類顯然是不可能在kryo中註冊過的。setReferences()若設定成false在序列化Exception時似乎有問題,此處維持開啟(預設也是開啟)。注意到給RpcRequest.class和RpcResponse.class分別註冊了對應的Serializer為RpcRequestSerializer和RpcResponseSerializer。這是由於kryo對未註冊的類序列化後的格式是
x01 x00 <(string)className> <(byte)id> <(Object)objectFieldValue ordered by fieldName>
裡面包含類的全類名,導致序列化後的位元組序列很長,故應該實現一個自定義的Serializer用於已知型別的序列化和反序列化縮短序列化後的位元組序列。
(6).RpcRequestSerializer
package com.maigo.rpc.serializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.maigo.rpc.context.RpcRequest;
public class RpcRequestSerializer extends Serializer<RpcRequest>
{
@Override
public void write(Kryo kryo, Output output, RpcRequest object)
{
output.writeInt(object.getId());
output.writeByte(object.getMethodName().length());
output.write(object.getMethodName().getBytes());
kryo.writeClassAndObject(output, object.getArgs());
}
@Override
public RpcRequest read(Kryo kryo, Input input, Class<RpcRequest> type)
{
RpcRequest rpcRequest = null;
int id = input.readInt();
byte methodLength = input.readByte();
byte[] methodBytes = input.readBytes(methodLength);
String methodName = new String(methodBytes);
Object[] args = (Object[])kryo.readClassAndObject(input);
rpcRequest = new RpcRequest(id, methodName, args);
return rpcRequest;
}
}
write()中按順序往output中寫入id,呼叫方法名的長度和呼叫方法名的位元組陣列,最後是呼叫方法的引數列表,由於不知道引數的確切型別,此處呼叫傳進的kryo物件的writeClassAndObject()方法對引數進行序列化。
read()中按照相同的順序讀出值並根據這些值構建出一個RpcRequest物件並返回。
(7).RpcResponseSerializer
package com.maigo.rpc.serializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.maigo.rpc.context.RpcResponse;
public class RpcResponseSerializer extends Serializer<RpcResponse>
{
@Override
public void write(Kryo kryo, Output output, RpcResponse object)
{
output.writeInt(object.getId());
output.writeBoolean(object.isInvokeSuccess());
if(object.isInvokeSuccess())
kryo.writeClassAndObject(output, object.getResult());
else
kryo.writeClassAndObject(output, object.getThrowable());
}
@Override
public RpcResponse read(Kryo kryo, Input input, Class<RpcResponse> type)
{
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setId(input.readInt());
rpcResponse.setInvokeSuccess(input.readBoolean());
if(rpcResponse.isInvokeSuccess())
rpcResponse.setResult(kryo.readClassAndObject(input));
else
rpcResponse.setThrowable((Throwable)kryo.readClassAndObject(input));
return rpcResponse;
}
}
類似RpcRequestSerializer,不再贅述4.測試測試內容與下一節的Netty網路傳輸一同測試
相關推薦
基於Netty的RPC簡單框架實現(三):Kryo實現序列化
1.序列化和反序列化 網路中都是以位元組序列的形式來傳輸資料的,因此在傳送訊息時需要先將物件序列化轉換為位元組序列,然後將獲得的位元組序列傳送出去,訊息接收方接收到位元組序列後將之反序列化獲得傳輸的物件,從收發雙方來看就如同直接傳送和接收了物件一樣。 2.第三方依賴 本例
01分散式基礎(三)-分散式通訊-序列化
分散式通訊-序列化 java序列化機制Serialize介面 java本身的序列化機制存在的問題 序列化和反序列化的概念 怎麼去實現一個序列化操作 擴充套件 serialVersionUID的作用 靜態
java(三)物件的序列化與static、final關鍵字
Java序列化是指把Java物件轉換為位元組序列的過程;而Java反序列化是指把位元組序列恢復為Java物件的過程。java中存有Cloneable介面,實現此介面的類都具有被拷貝能力,比new一個物件
基於java的微信小程式的實現(三)登入,註冊,注小程式端的實現
1.微信小程式專案結構認識 js檔案用來寫相關的邏輯操作,主要是用來操作資料 json檔案用來寫一些相關的配置 wxss相當於css用來寫頁面樣式 wxml相當於html用來寫頁面的元素的 pages資料夾中可以存放多個資料夾,每個資料夾裡面都是一套是js,json
Java容器框架(三)--LinkedList實現原理
1. 簡介 如果對Java容器家族成員不太熟悉,可以先閱讀Java容器框架(一)--概述篇這邊文章,LinkedList類在List家族中具有重要的位置,基本上可以和ArrayList平起平坐,在功能上甚至比ArrayList還要強大。下面我們先來看看LinkedList繼
簡單的通訊(三)----使用Socket實現TCP協議
功能 客戶端向伺服器端傳送一張檔案(這裡以圖片為例),伺服器發反饋訊息給客戶端。 程式碼 package com.demo; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; im
Mybatis原始碼---重寫一個最簡單的Mybatis架構實現(三)
前兩篇文章裡,我們實現了一個簡單的Mybatis。只要願意,如果完善了後續的資料庫操作,我們完全可以用它來替換本來的Mybatis。在本篇文章裡,我們要做的是完成我們自定義Mybatis與Spring或SpringBoot整合時的自動配置。首先,我們在來熟悉一下在XML
基於Unity3D的相機功能的實現(三)——第一人稱相機(FPS)
在遊戲開發中,角色視野跟隨滑鼠位置即第一人稱相機(FPS)是一個很常見的需求,我們今天來實現該功能。 掛載到相機上即可,程式碼如下: using UnityEngine; using System.
java網路程式設計:13、基於UDP的socket程式設計(三)實現相互發送接收訊息
宣告:本教程不收取任何費用,歡迎轉載,尊重作者勞動成果,不得用於商業用途,侵權必究!!! 文章目錄 一、前言 二、基於UDP伺服器端程式的編寫 三、基於UDP客戶端程式的編寫 四、測試列印 五、系列文章(java網路程式設計) 通過上兩篇文章:1、瞭解了基於UDP
基於android的網路音樂播放器-回撥實現音樂播放及音樂收藏的實現(三)
作為android初學者,最近把瘋狂android講義和瘋狂Java講義看了一遍,看到書中介紹的知識點非常多,很難全部記住,為了更好的掌握基礎知識點,我將開發一個網路音樂播放器-EasyMusic來鞏固下,也當作是練練手。感興趣的朋友可以看看,有設計不足的地方也
.NET Core API框架實戰(三) 使用Swagger文件實現上傳檔案
介紹 在Swagger中利用 IOperationFilter 操作來實現檔案上傳 檔案上傳 1、介紹 寫過介面的朋友都知道,除錯、維護介面是一件非常重要的一件事;swagger是一款非常不錯的介面文件工具,那麼在本期中,我們就用swagge
基於以太坊的DPOS實現(三)創世塊
原始碼 目錄 創世檔案 我們將這個DPOS共識命名為alien,所以大家在文中或程式碼中看到
Android 基於Zxing掃碼實現(三)、從相簿選取二維碼
前言 本文的程式碼基於YZxing庫,如需查閱程式碼可前往GitHub上面檢視。專案地址如下: YZxing 內容 從相簿獲取二維碼,主要涉及到幾大步驟。 第一,進入相簿獲取照片。 第二,對照片進行壓縮。 第三,對照片上的二維碼進行de
基於PCA的人臉識別系統(JAVA版)(三) 系統實現
系統主要由上圖幾部分組成。其中EigenFaceCore為特徵臉類,faceMain為主程式,ImageViewer為顯示圖片的工具類。如果根據第一篇博文環境都已經配置好的話則程式可以完美執行。J
基於stm32 Systick 的簡單定時器(裸機)-- 陣列實現
前言 在嵌入式的開發中,經常需要執行定時的操作。 聰明的同學肯定會想到, 我可以配置硬體定時器, 然後利用定時器中斷來執行需要定時執行的程式碼。然而硬體定時器的數量總是有限,不一定可以滿足我們定時的需求。因此我們常常需要用到軟體定時的方法。 事實上,
八大排序算法的python實現(三)冒泡排序
解釋 實現 兩個 blog python odin int 通過 順序 代碼: #coding:utf-8 #author:徐蔔靈 #交換排序.冒泡排序 L = [1, 3, 2, 32, 5, 4] def Bubble_sort(L): for i in ra
selenium + python自動化測試unittest框架學習(三)webdriver對頁面其他控件操作(三)
文件的 文件路徑 內容 option selenium script web 對話 對話框 1.對話框,下拉框 (1)對話框的有兩種,一種是iframe格式的,需要switch_to_iframe()進行定位,現在大部分的對話框是div格式的,這種格式的可以通過層級定位來定
selenium + python自動化測試unittest框架學習(三)webdriver元素定位(一)
倒數 節點 大於 文本框 webdriver 而且 單標簽 unit 遍歷 1.Webdriver原理 webdirver是一款web自動化操作工具,為瀏覽器提供統一的webdriver接口,由client也就是我們的測試腳本提交請求,remote server瀏覽器進行響
Linux下PCI設備驅動程序開發 --- PCI驅動程序實現(三)
傳輸 char rep roc register case 負責 ava b- 三、PCI驅動程序實現 1. 關鍵數據結構 PCI設備上有三種地址空間:PCI的I/O空間、PCI的存儲空間和PCI的配置空間。CPU可以訪問PCI設備上的所有地址空間,其中I/O空間和存儲空間
Redux和React-Redux的實現(三):中間件的原理和applyMiddleware、Thunk的實現
調用 map 介紹 typeof 觀察者 ets 返回 async 基本原理 現在我們的Redux和React-Redux已經基本實現了,在Redux中,觸發一個action,reducer立即就能算出相應的state,如果我要過一會才讓reducer計算state呢怎麽辦