1. 程式人生 > >基於Netty的RPC簡單框架實現(三):Kryo實現序列化

基於Netty的RPC簡單框架實現(三):Kryo實現序列化

1.序列化和反序列化

網路中都是以位元組序列的形式來傳輸資料的,因此在傳送訊息時需要先將物件序列化轉換為位元組序列,然後將獲得的位元組序列傳送出去,訊息接收方接收到位元組序列後將之反序列化獲得傳輸的物件,從收發雙方來看就如同直接傳送和接收了物件一樣。

2.第三方依賴

本例使用目前最新版的kryo-serializers 0.36用於序列化

<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.36</version>
</dependency>
使用maven直接在pom.xml中新增上面的依賴即可

3.序列化和反序列化的實現

序列化和反序列化的物件在本例中只有兩種:1.客戶端向服務端發出的呼叫請求RpcRequest 2.服務端向客戶端返回的呼叫結果RpcResponse

(1).RpcResponse

從RPC服務端傳回給客戶端的某次呼叫請求的結果

package com.maigo.rpc.context;

public class RpcResponse 
{	
	private int id;
	private Object result;
	private Throwable throwable;
	private boolean isInvokeSuccess;
	
	public int getId() 
	{
		return id;
	}
	
	public void setId(int id) 
	{
		this.id = id;
	}
	
	public Object getResult() 
	{
		return result;
	}
	
	public void setResult(Object result) 
	{
		this.result = result;
	}
	
	public Throwable getThrowable() 
	{
		return throwable;
	}
	
	public void setThrowable(Throwable throwable) 
	{
		this.throwable = throwable;
	}
	
	public boolean isInvokeSuccess() 
	{
		return isInvokeSuccess;
	}
	
	public void setInvokeSuccess(boolean isInvokeSuccess) 
	{
		this.isInvokeSuccess = isInvokeSuccess;
	}

}
RpcResponse中的id對應著該次請求的RpcRequest中的id,isInvokeSuccess表示呼叫中是否有異常丟擲,result和throwable分別表示呼叫結果和呼叫過程丟擲的異常。

(2).RpcRequest

已在上一節中給出

(3).KryoSerializer

實際負責序列化和反序列化

package com.maigo.rpc.serializer;

import java.io.IOException;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;

public class KryoSerializer
{
	private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
	
	public static void serialize(Object object, ByteBuf byteBuf) 
	{
		Kryo kryo = KryoHolder.get();
		int startIdx = byteBuf.writerIndex();
        ByteBufOutputStream byteOutputStream = new ByteBufOutputStream(byteBuf);
        try 
        {
			byteOutputStream.write(LENGTH_PLACEHOLDER);
			Output output = new Output(1024*4, -1);
	        output.setOutputStream(byteOutputStream);
	        kryo.writeClassAndObject(output, object);
	        
	        output.flush();
	        output.close();
	        
	        int endIdx = byteBuf.writerIndex();

	        byteBuf.setInt(startIdx, endIdx - startIdx - 4);
		} 
        catch (IOException e) 
        {
			e.printStackTrace();
		}
	}

	public static Object deserialize(ByteBuf byteBuf) 
	{
		if(byteBuf == null)
            return null;
		
        Input input = new Input(new ByteBufInputStream(byteBuf));
        Kryo kryo = KryoHolder.get();
        return kryo.readClassAndObject(input);
	}	
}
serialize()將一個物件通過kryo序列化並寫入ByteBuf中,注意到在頭部預留了4個位元組用於寫入長度資訊。deserialize()將ByteBuf中的內容反序列化還原出傳輸的物件。其中序列化和反序列化均用到了kryo物件,該物件是從KryoHolder中通過get()拿到的。

(4).KryoHolder

由於kryo物件是執行緒不安全的,當有多個netty的channel同時連線時,各channel是可能工作在不同的執行緒上的(netty中一個IO執行緒可以對應多個channel,而一個channel只能對應一個執行緒,詳細可以參考netty執行緒模型),若共用同一個kryo物件會出現併發問題,因此用ThreadLocal在每個執行緒保留一個各自的kryo物件,保證不會大量建立kryo物件的同時避免了併發問題

package com.maigo.rpc.serializer;

import com.esotericsoftware.kryo.Kryo;

public class KryoHolder 
{
	private static ThreadLocal<Kryo> threadLocalKryo = new ThreadLocal<Kryo>()
	{
		protected Kryo initialValue() 
		{
			Kryo kryo = new KryoReflectionFactory();
						
			return kryo;
		};
	};
	
	public static Kryo get()
	{
		return threadLocalKryo.get();
	}
}
可見,最終用於序列化和反序列化的kryo物件是通過new KryoReflectionFactory()建立的。

(5).KryoReflectionFactory

package com.maigo.rpc.serializer;

import java.lang.reflect.InvocationHandler;
import java.net.URI;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.GregorianCalendar;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;

import com.esotericsoftware.kryo.Serializer;
import com.maigo.rpc.context.RpcRequest;
import com.maigo.rpc.context.RpcResponse;

import de.javakaffee.kryoserializers.ArraysAsListSerializer;
import de.javakaffee.kryoserializers.BitSetSerializer;
import de.javakaffee.kryoserializers.CollectionsEmptyListSerializer;
import de.javakaffee.kryoserializers.CollectionsEmptyMapSerializer;
import de.javakaffee.kryoserializers.CollectionsEmptySetSerializer;
import de.javakaffee.kryoserializers.CollectionsSingletonListSerializer;
import de.javakaffee.kryoserializers.CollectionsSingletonMapSerializer;
import de.javakaffee.kryoserializers.CollectionsSingletonSetSerializer;
import de.javakaffee.kryoserializers.CopyForIterateCollectionSerializer;
import de.javakaffee.kryoserializers.CopyForIterateMapSerializer;
import de.javakaffee.kryoserializers.DateSerializer;
import de.javakaffee.kryoserializers.EnumMapSerializer;
import de.javakaffee.kryoserializers.EnumSetSerializer;
import de.javakaffee.kryoserializers.GregorianCalendarSerializer;
import de.javakaffee.kryoserializers.JdkProxySerializer;
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport;
import de.javakaffee.kryoserializers.RegexSerializer;
import de.javakaffee.kryoserializers.SubListSerializers;
import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer;
import de.javakaffee.kryoserializers.URISerializer;
import de.javakaffee.kryoserializers.UUIDSerializer;
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;

public class KryoReflectionFactory extends KryoReflectionFactorySupport
{
	public KryoReflectionFactory()
	{
		setRegistrationRequired(false);
		setReferences(true); 
		register(RpcRequest.class, new RpcRequestSerializer());
		register(RpcResponse.class, new RpcResponseSerializer());
        register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
        register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer());
        register(Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer());
        register(Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer());
        register(Collections.singletonList("").getClass(), new CollectionsSingletonListSerializer());
        register(Collections.singleton("").getClass(), new CollectionsSingletonSetSerializer());
        register(Collections.singletonMap("", "").getClass(), new CollectionsSingletonMapSerializer());
        register(Pattern.class, new RegexSerializer());
        register(BitSet.class, new BitSetSerializer());
        register(URI.class, new URISerializer());
        register(UUID.class, new UUIDSerializer());
        register(GregorianCalendar.class, new GregorianCalendarSerializer());
        register(InvocationHandler.class, new JdkProxySerializer());
        UnmodifiableCollectionsSerializer.registerSerializers(this);
        SynchronizedCollectionsSerializer.registerSerializers(this);
	}
	
	@Override
	@SuppressWarnings({"rawtypes", "unchecked"})
	public Serializer<?> getDefaultSerializer(Class clazz) 
	{		
		if(EnumSet.class.isAssignableFrom(clazz)) 
			return new EnumSetSerializer();
        
        if(EnumMap.class.isAssignableFrom(clazz))
            return new EnumMapSerializer();
        
        if(Collection.class.isAssignableFrom(clazz))
            return new CopyForIterateCollectionSerializer();
        
        if(Map.class.isAssignableFrom(clazz)) 
            return new CopyForIterateMapSerializer();
        
        if(Date.class.isAssignableFrom(clazz))
            return new DateSerializer( clazz );
        
        if (SubListSerializers.ArrayListSubListSerializer.canSerialize(clazz) 
        		|| SubListSerializers.JavaUtilSubListSerializer.canSerialize(clazz))
			return SubListSerializers.createFor(clazz);		
        
        return super.getDefaultSerializer(clazz);
	}	
}
匯入的包非常多,主要完成的功能是給大量類型別註冊其對應的Serializer。setRegistrationRequired()設定是否只能序列化已註冊的類,此處必須設定為false,因為RPC請求和迴應中都可能包含使用者自定義的類,這些類顯然是不可能在kryo中註冊過的。setReferences()若設定成false在序列化Exception時似乎有問題,此處維持開啟(預設也是開啟)。注意到給RpcRequest.class和RpcResponse.class分別註冊了對應的Serializer為RpcRequestSerializer和RpcResponseSerializer。這是由於kryo對未註冊的類序列化後的格式是
x01 x00 <(string)className> <(byte)id> <(Object)objectFieldValue ordered by fieldName>
裡面包含類的全類名,導致序列化後的位元組序列很長,故應該實現一個自定義的Serializer用於已知型別的序列化和反序列化縮短序列化後的位元組序列。

(6).RpcRequestSerializer

package com.maigo.rpc.serializer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.maigo.rpc.context.RpcRequest;

public class RpcRequestSerializer extends Serializer<RpcRequest>
{
	@Override
	public void write(Kryo kryo, Output output, RpcRequest object) 
	{
		output.writeInt(object.getId());
		output.writeByte(object.getMethodName().length());
		output.write(object.getMethodName().getBytes());
		kryo.writeClassAndObject(output, object.getArgs());
	}

	@Override
	public RpcRequest read(Kryo kryo, Input input, Class<RpcRequest> type) 
	{
		RpcRequest rpcRequest = null;
		int id = input.readInt();
		byte methodLength = input.readByte();
		byte[] methodBytes = input.readBytes(methodLength);
		String methodName = new String(methodBytes);
		Object[] args = (Object[])kryo.readClassAndObject(input);
		
		rpcRequest = new RpcRequest(id, methodName, args);
		
		return rpcRequest;
	}
}
write()中按順序往output中寫入id,呼叫方法名的長度和呼叫方法名的位元組陣列,最後是呼叫方法的引數列表,由於不知道引數的確切型別,此處呼叫傳進的kryo物件的writeClassAndObject()方法對引數進行序列化。

read()中按照相同的順序讀出值並根據這些值構建出一個RpcRequest物件並返回。

(7).RpcResponseSerializer

package com.maigo.rpc.serializer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.maigo.rpc.context.RpcResponse;

public class RpcResponseSerializer extends Serializer<RpcResponse> 
{
	@Override
	public void write(Kryo kryo, Output output, RpcResponse object) 
	{
		output.writeInt(object.getId());
		output.writeBoolean(object.isInvokeSuccess());
		if(object.isInvokeSuccess())
			kryo.writeClassAndObject(output, object.getResult());
		else
			kryo.writeClassAndObject(output, object.getThrowable());
	}

	@Override
	public RpcResponse read(Kryo kryo, Input input, Class<RpcResponse> type) 
	{
		RpcResponse rpcResponse = new RpcResponse();
		rpcResponse.setId(input.readInt());
		rpcResponse.setInvokeSuccess(input.readBoolean());
		if(rpcResponse.isInvokeSuccess())
			rpcResponse.setResult(kryo.readClassAndObject(input));
		else
			rpcResponse.setThrowable((Throwable)kryo.readClassAndObject(input));
		
		return rpcResponse;
	}
}
類似RpcRequestSerializer,不再贅述4.測試

測試內容與下一節的Netty網路傳輸一同測試

相關推薦

基於Netty的RPC簡單框架實現:Kryo實現序列

1.序列化和反序列化 網路中都是以位元組序列的形式來傳輸資料的,因此在傳送訊息時需要先將物件序列化轉換為位元組序列,然後將獲得的位元組序列傳送出去,訊息接收方接收到位元組序列後將之反序列化獲得傳輸的物件,從收發雙方來看就如同直接傳送和接收了物件一樣。 2.第三方依賴 本例

01分散式基礎-分散式通訊-序列

分散式通訊-序列化 java序列化機制Serialize介面 java本身的序列化機制存在的問題 序列化和反序列化的概念 怎麼去實現一個序列化操作 擴充套件 serialVersionUID的作用 靜態

java物件的序列與static、final關鍵字

Java序列化是指把Java物件轉換為位元組序列的過程;而Java反序列化是指把位元組序列恢復為Java物件的過程。java中存有Cloneable介面,實現此介面的類都具有被拷貝能力,比new一個物件

基於java的微信小程式的實現登入,註冊,注小程式端的實現

1.微信小程式專案結構認識 js檔案用來寫相關的邏輯操作,主要是用來操作資料 json檔案用來寫一些相關的配置 wxss相當於css用來寫頁面樣式 wxml相當於html用來寫頁面的元素的 pages資料夾中可以存放多個資料夾,每個資料夾裡面都是一套是js,json

Java容器框架--LinkedList實現原理

1. 簡介 如果對Java容器家族成員不太熟悉,可以先閱讀Java容器框架(一)--概述篇這邊文章,LinkedList類在List家族中具有重要的位置,基本上可以和ArrayList平起平坐,在功能上甚至比ArrayList還要強大。下面我們先來看看LinkedList繼

簡單的通訊----使用Socket實現TCP協議

功能 客戶端向伺服器端傳送一張檔案(這裡以圖片為例),伺服器發反饋訊息給客戶端。 程式碼 package com.demo; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; im

Mybatis原始碼---重寫一個最簡單的Mybatis架構實現

   前兩篇文章裡,我們實現了一個簡單的Mybatis。只要願意,如果完善了後續的資料庫操作,我們完全可以用它來替換本來的Mybatis。在本篇文章裡,我們要做的是完成我們自定義Mybatis與Spring或SpringBoot整合時的自動配置。首先,我們在來熟悉一下在XML

基於Unity3D的相機功能的實現——第一人稱相機FPS

在遊戲開發中,角色視野跟隨滑鼠位置即第一人稱相機(FPS)是一個很常見的需求,我們今天來實現該功能。 掛載到相機上即可,程式碼如下: using UnityEngine; using System.

java網路程式設計:13、基於UDP的socket程式設計實現相互發送接收訊息

宣告:本教程不收取任何費用,歡迎轉載,尊重作者勞動成果,不得用於商業用途,侵權必究!!! 文章目錄 一、前言 二、基於UDP伺服器端程式的編寫 三、基於UDP客戶端程式的編寫 四、測試列印 五、系列文章(java網路程式設計) 通過上兩篇文章:1、瞭解了基於UDP

基於android的網路音樂播放器-回撥實現音樂播放及音樂收藏的實現

作為android初學者,最近把瘋狂android講義和瘋狂Java講義看了一遍,看到書中介紹的知識點非常多,很難全部記住,為了更好的掌握基礎知識點,我將開發一個網路音樂播放器-EasyMusic來鞏固下,也當作是練練手。感興趣的朋友可以看看,有設計不足的地方也

.NET Core API框架實戰 使用Swagger文件實現上傳檔案

介紹 在Swagger中利用 IOperationFilter 操作來實現檔案上傳 檔案上傳 1、介紹  寫過介面的朋友都知道,除錯、維護介面是一件非常重要的一件事;swagger是一款非常不錯的介面文件工具,那麼在本期中,我們就用swagge

基於以太坊的DPOS實現創世塊

原始碼 目錄 創世檔案 我們將這個DPOS共識命名為alien,所以大家在文中或程式碼中看到

Android 基於Zxing掃碼實現、從相簿選取二維碼

前言 本文的程式碼基於YZxing庫,如需查閱程式碼可前往GitHub上面檢視。專案地址如下: YZxing 內容 從相簿獲取二維碼,主要涉及到幾大步驟。 第一,進入相簿獲取照片。 第二,對照片進行壓縮。 第三,對照片上的二維碼進行de

基於PCA的人臉識別系統JAVA版 系統實現

系統主要由上圖幾部分組成。其中EigenFaceCore為特徵臉類,faceMain為主程式,ImageViewer為顯示圖片的工具類。如果根據第一篇博文環境都已經配置好的話則程式可以完美執行。J

基於stm32 Systick 的簡單定時器裸機-- 陣列實現

前言   在嵌入式的開發中,經常需要執行定時的操作。 聰明的同學肯定會想到, 我可以配置硬體定時器, 然後利用定時器中斷來執行需要定時執行的程式碼。然而硬體定時器的數量總是有限,不一定可以滿足我們定時的需求。因此我們常常需要用到軟體定時的方法。   事實上,

八大排序算法的python實現冒泡排序

解釋 實現 兩個 blog python odin int 通過 順序 代碼: #coding:utf-8 #author:徐蔔靈 #交換排序.冒泡排序 L = [1, 3, 2, 32, 5, 4] def Bubble_sort(L): for i in ra

selenium + python自動化測試unittest框架學習webdriver對頁面其他控件操作

文件的 文件路徑 內容 option selenium script web 對話 對話框 1.對話框,下拉框 (1)對話框的有兩種,一種是iframe格式的,需要switch_to_iframe()進行定位,現在大部分的對話框是div格式的,這種格式的可以通過層級定位來定

selenium + python自動化測試unittest框架學習webdriver元素定位

倒數 節點 大於 文本框 webdriver 而且 單標簽 unit 遍歷 1.Webdriver原理 webdirver是一款web自動化操作工具,為瀏覽器提供統一的webdriver接口,由client也就是我們的測試腳本提交請求,remote server瀏覽器進行響

Linux下PCI設備驅動程序開發 --- PCI驅動程序實現

傳輸 char rep roc register case 負責 ava b- 三、PCI驅動程序實現 1. 關鍵數據結構 PCI設備上有三種地址空間:PCI的I/O空間、PCI的存儲空間和PCI的配置空間。CPU可以訪問PCI設備上的所有地址空間,其中I/O空間和存儲空間

Redux和React-Redux的實現:中間件的原理和applyMiddleware、Thunk的實現

調用 map 介紹 typeof 觀察者 ets 返回 async 基本原理 現在我們的Redux和React-Redux已經基本實現了,在Redux中,觸發一個action,reducer立即就能算出相應的state,如果我要過一會才讓reducer計算state呢怎麽辦