fast協議解讀
背景
股票行情一般傳輸的資料型別為: int / long / float /double / string 來表示行情價格成交量之類的資料。
正常傳輸過程中,都是使用tag=value的方式。 如1=date(標號1代表日期) 2=openPrice(2表示開盤價格) 等等, 在解析每個欄位之前需要先解析這個欄位標號,然後通過這個標號能夠從提前約定的欄位(一般編碼端和解碼端都有一個xml模板類似的約定配置檔案)對應型別來解析這個欄位。
前提約定:
tag : 1->日期 2->時間 3->開盤價 4->最高價 5->最低價 6->當前價。 其中tag為short型別,即2個位元組
日期、時間為int; 開高低收為float ,保留3位小數
樣例資料:
1=20190310 , 2=142900 , 3=13.4 , 4=15.0 ,5=13.0 ,6=13.5
不使用fast協議來傳輸需要的位元組數: tag佔用位元組(6*2) + value(4 + 4 + 4 +4 + 4 + 4)=36位元組
同樣的資料,如果使用fast協議傳輸需要位元組數: tag(6*1) +value(4 + 3 + 3+ 3+ 3)=29位元組
fast協議特徵
基本特徵:
- 每個欄位中所有的byte的最高位用0表示當前位元組屬於該欄位,用1表示這是該欄位的最後一個位元組(停止位特徵),byte流和unicode字串流資料部分不使用停止位特徵
- fast協議傳輸過程中不會傳輸float/double型別的資料,而是將其根據小數位【具體每個欄位小數位數在模板配置檔案中約定】擴充套件成數字型別。
- 數字型別在傳輸過程中,可以為1,2,3,4,5,6,7,8,9,10個位元組,具體要根據是否為有符號、無符號、以及數字的範圍來具體確定佔用幾個位元組
- 在傳輸數字時,如果涉及到有符號數的時候,第一個位元組的第2位用來表示符號,0表示正數,1表示負數。
- 在傳輸ascii編碼型別的時候,佔用1個位元組。ascii編碼本身第一位為0,,所以一個位元組是符合fast協議規定的。
- 在傳輸unicode編碼型別的時候,使用(count,真正資料)來傳輸,count代表資料真正佔用的位元組數。
- 在傳輸byte流的時候,和unicode編碼一樣,也使用(count,真正資料)來傳輸。
- 在傳輸unicode 和byte流的時候不使用停止位特徵,即每個位元組的最高位為真實資料。資料長度欄位仍然使用停止位特徵
下面程式碼出自:openfast-1.1.1
fast協議解讀
停止位
org.openfast.template.type.codec
/** * 資料編碼成功後,將最後一個位元組的首位設定成1,即為停止位 * * */ public byte[] encode(ScalarValue value) { byte[] encoding = encodeValue(value); encoding[encoding.length - 1] |= 0x80; return encoding; }
有符號數編碼類
類:org.openfast.template.type.codec.SignedInteger
/** * 編碼方法 * */ public byte[] encodeValue(ScalarValue value) { long longValue = ((NumericValue) value).toLong(); int size = getSignedIntegerSize(longValue); byte[] encoding = new byte[size]; //組裝資料,即每個位元組第一位不表示資料;組裝完成後仍然是大端序列(低位元組位為值得高有效位) for (int factor = 0; factor < size; factor++) { //0x3f = 0011 1111 //0x7f = 0111 1111 int bitMask = (factor == (size - 1)) ? 0x3f : 0x7f; encoding[size - factor - 1] = (byte) ((longValue >> (factor * 7)) & bitMask); } // Get the sign bit from the long value and set it on the first byte // 01000000 00000000 ... 00000000 // ^----SIGN BIT //將一個位元組的第二位設定為符號位, 0表示正數;1表示負數 encoding[0] |= (0x40 & (longValue >> 57)); return encoding; } /** * 解碼方法 * */ public ScalarValue decode(InputStream in) { long value = 0; try { // IO read方法如果返回小於-1的時候,表示結束;正常範圍0-255 int byt = in.read(); if (byt < 0) { Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached."); return null; // short circuit if global error handler does not throw exception } //通過首位元組的第二位與運算,確認該資料的符號 if ((byt & 0x40) > 0) { value = -1; } //到此,value的符號已經確定, //value=0 則該數為負數, value= -1該數為正數 // int value = -116進製為 0xFF FF FF FF // int value = 016進製為 0x00 00 00 00 //下面的只是通過位操作來複原真實的資料 value = (value << 7) | (byt & 0x7f);//(value << 7)確保最後7位為0;(byt & 0x7f) 還是byt while ((byt & 0x80) == 0) {//根據第一位來判斷當前byte是否屬於這個欄位 byt = in.read(); if (byt < 0) { Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached."); return null; // short circuit if global error handler does not throw exception } value = (value << 7) | (byt & 0x7f); //先把有效位往左移7位,然後再處理當前的七位 } } catch (IOException e) { Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e); return null; // short circuit if global error handler does not throw exception } return createValue(value); } /** * 判斷無符號數所要佔用的位元組數 * */ public static int getUnsignedIntegerSize(long value) { if (value < 128) { return 1; // 2 ^ 7 } if (value <= 16384) { return 2; // 2 ^ 14 } if (value <= 2097152) { return 3; // 2 ^ 21 } if (value <= 268435456) { return 4; // 2 ^ 28 } if (value <= 34359738368L) { return 5; // 2 ^ 35 } if (value <= 4398046511104L) { return 6; // 2 ^ 42 } if (value <= 562949953421312L) { return 7; // 2 ^ 49 } if (value <= 72057594037927936L) { return 8; // 2 ^ 56 } return 9; } /** * 判斷有符號數需要佔用的位元組 * */ public static int getSignedIntegerSize(long value) { if ((value >= -64) && (value <= 63)) { return 1; // - 2 ^ 6 ... 2 ^ 6 -1 } if ((value >= -8192) && (value <= 8191)) { return 2; // - 2 ^ 13 ... 2 ^ 13 -1 } if ((value >= -1048576) && (value <= 1048575)) { return 3; // - 2 ^ 20 ... 2 ^ 20 -1 } if ((value >= -134217728) && (value <= 134217727)) { return 4; // - 2 ^ 27 ... 2 ^ 27 -1 } if ((value >= -17179869184L) && (value <= 17179869183L)) { return 5; // - 2 ^ 34 ... 2 ^ 34 -1 } if ((value >= -2199023255552L) && (value <= 2199023255551L)) { return 6; // - 2 ^ 41 ... 2 ^ 41 -1 } if ((value >= -281474976710656L) && (value <= 281474976710655L)) { return 7; // - 2 ^ 48 ... 2 ^ 48 -1 } if ((value >= -36028797018963968L) && (value <= 36028797018963967L)) { return 8; // - 2 ^ 55 ... 2 ^ 55 -1 } if ((value >= -4611686018427387904L && value <= 4611686018427387903L)) { return 9; } return 10; }
無符號數編碼類
org.openfast.template.type.codec.UnsignedInteger
/** * 編碼方法 * */ public byte[] encodeValue(ScalarValue scalarValue) { long value = scalarValue.toLong(); int size = getUnsignedIntegerSize(value); byte[] encoded = new byte[size]; for (int factor = 0; factor < size; factor++) { encoded[size - factor - 1] = (byte) ((value >> (factor * 7)) & 0x7f); } return encoded; } /** * * 解碼方法 * */ public ScalarValue decode(InputStream in) { long value = 0; int byt; try { do { byt = in.read(); if (byt < 0) { Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached."); return null; // short circuit if global error handler does not throw exception } value = (value << 7) | (byt & 0x7f); } while ((byt & 0x80) == 0); } catch (IOException e) { Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e); return null; // short circuit if global error handler does not throw exception } return createValue(value); }
AsciiString編碼類
org.openfast.template.type.codec.AsciiString
public byte[] encodeValue(ScalarValue value) { if ((value == null) || value.isNull()) { throw new IllegalStateException("Only nullable strings can represent null values."); } String string = value.toString(); if ((string != null) && (string.length() == 0)) { return TypeCodec.NULL_VALUE_ENCODING; } if (string.startsWith(ZERO_TERMINATOR)) { return ZERO_PREAMBLE; } return string.getBytes(); } public ScalarValue decode(InputStream in) { int byt; ByteArrayOutputStream buffer = Global.getBuffer(); try { do { byt = in.read(); if (byt < 0) { Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached."); return null; // short circuit if global error handler does not throw exception } buffer.write(byt); } while ((byt & 0x80) == 0); } catch (IOException e) { Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e); return null; // short circuit if global error handler does not throw exception } byte[] bytes = buffer.toByteArray(); //復原最後一個位元組為真實資料 bytes[bytes.length - 1] &= 0x7f; if (bytes[0] == 0) { if (!ByteUtil.isEmpty(bytes)) Global.handleError(FastConstants.R9_STRING_OVERLONG, null); if (bytes.length > 1 && bytes[1] == 0) return new StringValue("\u0000"); return new StringValue(""); } return new StringValue(new String(bytes)); }
位元組流編碼類
org.openfast.template.type.codec.ByteVectorType
注意:位元組流型別不使用停止位
public byte[] encode(ScalarValue value) { byte[] bytes = value.getBytes(); int lengthSize = IntegerCodec.getUnsignedIntegerSize(bytes.length); byte[] encoding = new byte[bytes.length + lengthSize]; byte[] length = TypeCodec.UINT.encode(new IntegerValue(bytes.length)); //資料流所佔長度 System.arraycopy(length, 0, encoding, 0, lengthSize); //資料 System.arraycopy(bytes, 0, encoding, lengthSize, bytes.length); return encoding; } public ScalarValue decode(InputStream in) { //解析位元組流的長度 int length = ((IntegerValue) TypeCodec.UINT.decode(in)).value; byte[] encoding = new byte[length]; //讀取位元組流 for (int i = 0; i < length; i++) try { int nextByte = in.read(); if (nextByte < 0) { Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached."); return null; // short circuit if global error handler does not throw exception } encoding[i] = (byte) nextByte; } catch (IOException e) { Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e); return null; // short circuit if global error handler does not throw exception } return new ByteVectorValue(encoding); }
Unicode字串型別編碼類
org.openfast.template.type.codec.UnicodeString
這個型別和上面的位元組流編碼類邏輯一樣。