Android socket高階用法(自定義協議和Protocol Buffer使用)
前提
之前寫過兩篇關於socket的文章,但是,只是簡單的介紹了一下關於socket Tcp和Udp的簡單使用。如果沒有看過的朋友可以去看看ofollow,noindex">Android Socket程式設計(tcp)初探 和Android Socket程式設計(udp)初探 。相信很多朋友在公司使用socket開發的時候都會自定義協議來傳遞資訊。一方面是為了安全排除髒資料,另一個方面是為了更加高效的處理自己所需要的資料。今天就來介紹一下關於socket自定義協議和使用Protocol Buffer解析資料。
首先
既然說到了Protocol Buffer,那麼我們就簡單介紹一下Protocol Buffer是什麼?並且使用為什麼要使用Protocol Buffer?
- 1、什麼是Protocol Buffer
一種 結構化資料 的資料儲存格式(類似於 XML、Json ),其作用是通過將 結構化的資料 進行 序列化(序列化),從而實現 資料儲存 / RPC 資料交換的功能。至於更詳細的用法和介紹請移步Protocol Buffer 序列化原理大揭祕 - 為什麼Protocol Buffer效能這麼好?
-
2、為什麼要使用Protocol Buffer
在回答這個問題之前,我們還是先給出一個在實際開發中經常會遇到的系統場景。比如:我們的客戶端程式是使用Java開發的,可能執行自不同的平臺,如:Linux、Windows或者是Android,而我們的伺服器程式通常是基於Linux平臺並使用C++或者Python開發完成的。在這兩種程式之間進行資料通訊時存在多種方式用於設計訊息格式,如:
1、 直接傳遞C/C++/Python語言中一位元組對齊的結構體資料,只要結構體的宣告為定長格式,那麼該方式對於C/C++/Python程式而言就非常方便了,僅需將接收到的資料按照結構體型別強行轉換即可。事實上對於變長結構體也不會非常麻煩。在傳送資料時,也只需定義一個結構體變數並設定各個成員變數的值之後,再以char*的方式將該二進位制資料傳送到遠端。反之,該方式對於Java開發者而言就會非常繁瑣,首先需要將接收到的資料存於ByteBuffer之中,再根據約定的位元組序逐個讀取每個欄位,並將讀取後的值再賦值給另外一個值物件中的域變數,以便於程式中其他程式碼邏輯的編寫。對於該型別程式而言,聯調的基準是必須客戶端和伺服器雙方均完成了訊息報文構建程式的編寫後才能展開,而該設計方式將會直接導致Java程式開發的進度過慢。即便是Debug階段,也會經常遇到Java程式中出現各種域欄位拼接的小錯誤。
2、 使用SOAP協議(WebService)作為訊息報文的格式載體,由該方式生成的報文是基於文字格式的,同時還存在大量的XML描述資訊,因此將會大大增加網路IO的負擔。又由於XML解析的複雜性,這也會大幅降低報文解析的效能。總之,使用該設計方式將會使系統的整體執行效能明顯下降。
對於以上兩種方式所產生的問題,Protocol Buffer均可以很好的解決,不僅如此,Protocol Buffer還有一個非常重要的優點就是可以保證同一訊息報文新舊版本之間的相容性。 對於Protocol Buffer具體的用法請移步Protocol Buffer技術詳解(語言規範) 今天主要講解的是socket自定義協議這塊
其次
說了那麼多,我們來看看我們今天的主要內容—自定義socket協議
先看一張心跳返回的圖

心跳返回.png
- 1、Protobuf協議
- 假設客戶端請求包體資料協議如下
request.proto
syntax = "proto3"; // 登入的包體資料 message Request { int32uid = 0; stringapi_token = 1; }
傳送的格式:
{包頭}{命令}{包體}
{包頭} -> 包體轉成protubuf的長度
{命令} -> 對應功能的命令字引數
{包體} -> 對應的protubuf資料
- 假設服務端返回包體資料協議
response.proto
syntax = "proto3"; // 登入成功後伺服器返回的包體資料 message Response { int32login = 1; }
伺服器返回的格式:
{包頭}{命令}{狀態碼}{包體}
{包頭} -> 包體轉成protubuf的長度
{命令} -> 對應功能的命令字引數
{狀態碼} -> 對應狀態的狀態碼
{包體} -> 對應的protubuf資料
-
2、客戶端socket寫法
-
分析:試想一下,要socket不會因為手機螢幕的熄滅或者其他什麼的而斷開,我們應該把socket放到哪裡去寫,又要怎麼保證socket的連線狀態呢?對於Android來說放到 service裡面去是最合適的,並且為了保證連線狀態。那麼,就要傳送一個心跳包保證連線狀態。既然這樣,那麼我們來寫service和socket。
-
3、service寫法
public class SocketService extends Service { Thread mSocketThread; Socket mSocket; InetSocketAddress mSocketAddress; //心跳執行緒 Thread mHeartThread; //接收執行緒 Thread mReceiveThread; //登入執行緒 Thread mLoginThread; boolean isHeart = false; boolean isReceive = false; SocketBinder mBinder = new SocketBinder(this); public SocketService() { } @Override public void onCreate() { super.onCreate(); createConnection(); receiveMsg(); isHeart = true; isReceive = true; } @Override public IBinder onBind(Intent intent) { return mBinder; } @Override public int onStartCommand(Intent intent, int flags, int startId) { startGps(); sendHeart(); if (!TextUtils.isEmpty(intent.getStringExtra(AppConfig.SERVICE_TAG))) { String TAG = intent.getStringExtra(AppConfig.SERVICE_TAG); switch (TAG) { case AppConfig.STOP_SERVICE_VALUE: {//停止服務 ClientSocket.getsInstance().shutDownConnection(mSocket); stopSelf(); mSocket = null; mHeartThread = null; mReceiveThread = null; mLoginThread = null; mSocketThread = null; isHeart = false; isReceive = false; break; } default: break; } } return super.onStartCommand(intent, flags, startId); } /** * 傳送心跳包 */ private void sendHeart() { mHeartThread = new Thread(new Runnable() { @Override public void run() { while (isHeart) { ClientSocket.getsInstance().sendHeart(mSocket, SocketStatus.HEART_CODE); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); mHeartThread.start(); } /** * 登入 */ private void login(final double mLatitude, final double mLongitude) { mLoginThread = new Thread(new Runnable() { @Override public void run() { if (PreferencesUtils.getInt(SocketService.this, Constants.USER_ID) != 0 && !TextUtils.isEmpty(PreferencesUtils.getString(SocketService.this, Constants.USER_TOKEN))) { Request.Request requestLogin = Request.Request.newBuilder() .setUid(PreferencesUtils.getInt(SocketService.this, Constants.USER_ID)) .setApiToken(PreferencesUtils.getString(SocketService.this, Constants.USER_TOKEN).trim()) .build(); ClientSocket.getsInstance().sendLogin(mSocket, requestLogin, SocketStatus.LOGIN_CODE); } } }); mLoginThread.start(); } /** * 建立連線 * * @return */ public void createConnection() { mSocketThread = new Thread(new Runnable() { @Override public void run() { try { mSocket = new Socket(); mSocketAddress = new InetSocketAddress(AppConfig.TCP_IP, AppConfig.TCP_PORT); mSocket.connect(mSocketAddress, 20 * 1000); // 設定 socket 讀取資料流的超時時間 mSocket.setSoTimeout(20 * 1000); // 傳送資料包,預設為 false,即客戶端傳送資料採用 Nagle 演算法; // 但是對於實時互動性高的程式,建議其改為 true,即關閉 Nagle // 演算法,客戶端每傳送一次資料,無論資料包大小都會將這些資料傳送出去 mSocket.setTcpNoDelay(true); // 設定客戶端 socket 關閉時,close() 方法起作用時延遲 30 秒關閉,如果 30 秒內儘量將未傳送的資料包傳送出去 // socket.setSoLinger(true, 30); // 設定輸出流的傳送緩衝區大小,預設是4KB,即4096位元組 mSocket.setSendBufferSize(10 * 1024); // 設定輸入流的接收緩衝區大小,預設是4KB,即4096位元組 mSocket.setReceiveBufferSize(10 * 1024); // 作用:每隔一段時間檢查伺服器是否處於活動狀態,如果伺服器端長時間沒響應,自動關閉客戶端socket // 防止伺服器端無效時,客戶端長時間處於連線狀態 mSocket.setKeepAlive(true); } catch (UnknownHostException e) { Logger.e(e.getMessage() + "========+UnknownHostException"); e.printStackTrace(); } catch (IOException e) { createConnection(); Logger.e(e.getMessage() + "========IOException"); e.printStackTrace(); } catch (NetworkOnMainThreadException e) { Logger.e(e.getMessage() + "========NetworkOnMainThreadException"); e.printStackTrace(); } } }); mSocketThread.start(); } /** * 接收 */ private void receiveMsg() { mReceiveThread = new Thread(new Runnable() { @Override public void run() { while (isReceive) { try { if (mSocket != null && mSocket.isConnected()) { DataInputStream dis = ClientSocket.getsInstance().getMessageStream(mSocket); ByteArrayOutputStream bos = new ByteArrayOutputStream(); if (dis != null) { int length = 0; int head = 0; int buffer_size = 4; byte[] headBuffer = new byte[4]; byte[] cmdBuffer = new byte[4]; byte[] stateBuffer = new byte[4]; length = dis.read(headBuffer, 0, buffer_size); if (length == 4) { bos.write(headBuffer, 0, length); System.arraycopy(bos.toByteArray(), 0, headBuffer, 0, buffer_size); head = ByteUtil.bytesToInt(headBuffer, 0); length = dis.read(cmdBuffer, 0, buffer_size); bos.write(cmdBuffer, 0, length); System.arraycopy(bos.toByteArray(), 4, cmdBuffer, 0, buffer_size); int cmd = ByteUtil.hexStringToAlgorism(ByteUtil.str2HexStr(ByteUtil.byte2hex(cmdBuffer))); int heartNumber = ByteUtil.hexStringToAlgorism(ByteUtil.str2HexStr(SocketStatus.HEART)); String discover = Integer.toHexString(0x0101); int discoverNumber = ByteUtil.hexStringToAlgorism(ByteUtil.str2HexStr(SocketStatus.DISCOVER)); int giftNumber = ByteUtil.hexStringToAlgorism(ByteUtil.str2HexStr(SocketStatus.GIFT)); if (cmd == heartNumber) { length = dis.read(stateBuffer, 0, buffer_size); bos.write(stateBuffer, 0, length); System.arraycopy(bos.toByteArray(), 8, stateBuffer, 0, buffer_size); switch (ByteUtil.bytesToInt(stateBuffer, 0)) { case SocketStatus.LOGIN_SUCCESS: {//登入成功 Logger.e("登入成功"); mLoginValue = "1"; EventUtils.sendEvent(new Event<>(Constants.MSG_LOGIN_SUCCESS)); break; } case SocketStatus.HEART_SUCCESS: {//心跳返回 if (ByteUtil.bytesToInt(stateBuffer, 0) == 200 && Integer.toHexString(ByteUtil.bytesToInt(cmdBuffer, 0)) .equals(discover)) { byte[] buffer = new byte[head]; length = dis.read(buffer, 0, head); bos.write(buffer, 0, length); Response.Response response = Response. Response.parseFrom(buffer); Logger.e(responseExplore.getNickname() + responseExplore.getAvatar()); //傳送到activity中對資料進行處理 EventUtils.sendEvent(new Event<>(Constants.MSG_START_DISCOVER_RESULT, responseExplore)); Logger.e(responseExplore + "=======response"); } else { Logger.e("心跳返回"); } break; } default: break; } } } else { //出錯重連 ClientSocket.getsInstance().shutDownConnection(mSocket); createConnection(); } } else { createConnection(); } } } catch (IOException ex) { ex.printStackTrace(); } try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } } }); mReceiveThread.start(); } @Override public void onDestroy() { super.onDestroy(); ClientSocket.getsInstance().shutDownConnection(mSocket); stopSelf(); mHeartThread = null; mReceiveThread = null; mLoginThread = null; mSocketThread = null; mStopDiscoverThread = null; isHeart = false; isReceive = false; } /** * Binder */ public class SocketBinder extends Binder { private SocketService mService; public OnServiceCallBack mCallBack; public SocketBinder(SocketService mService) { this.mService = mService; } /** * 傳送方法 * * @param object */ public void sendMethod(Object object) { mService.sendMsg(object); mCallBack.onService(object); } /** * 設定回撥 * * @param callBack */ public void setOnServiceCallBack(OnServiceCallBack callBack) { this.mCallBack = callBack; } } }
-
分析
上面的service中首先建立socket,然後連線,在socket發生錯誤的時候(比如網路異常)重新進行建立在連線。然後,開一個接收執行緒一直接收,每次接收都是接收4個位元組的int值進行判斷是否可以進入到下一步,如果可以則繼續向下。讀取4個位元組的包頭 然後讀取4個位元組的命令 再讀取4個位元組的狀態碼 最後讀取4個位元組的包體 ,包體就包含我們所需要返回的資料。並且,在剛開始的時候就開啟了一個接收執行緒每隔50毫秒接收一次資料,這樣不僅可以讀取到心跳包還可以讀取到我們需要的資料。在最後,server生命週期結束的時候停止所有的執行緒。
-
4、傳送資料的類
public class ClientSocket { private DataOutputStream out = null; private DataInputStream getMessageStream; private static ClientSocket sInstance; private ClientSocket() { } /** * 單例 * * @return */ public static ClientSocket getsInstance() { if (sInstance == null) { synchronized (ClientSocket.class) { if (sInstance == null) { sInstance = new ClientSocket(); } } } return sInstance; } /** * 登入 * * @return */ public void sendLogin(Socket socket, Request.RequestLogin requestLogin, int code) { byte[] data = requestLogin.toByteArray(); byte[] head = ByteUtil.intToBytes(data.length); byte[] cmd = ByteUtil.intToBytes(code); byte[] bytes = addBytes(head, cmd, data); if (socket != null) { if (socket.isConnected()) { try { OutputStream os = socket.getOutputStream(); os.write(bytes); os.flush(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 心跳 * * @param code 關鍵字(命令) * @return */ public boolean sendHeart(Socket socket, int code) { boolean isSuccess; byte[] head = ByteUtil.intToBytes(0); byte[] cmd = ByteUtil.intToBytes(code); byte[] bytes = addBytes(head, cmd); if (socket.isConnected()) { try { out = new DataOutputStream(socket.getOutputStream()); out.write(bytes); out.flush(); isSuccess = true; } catch (IOException e) { e.printStackTrace(); isSuccess = false; } } else { isSuccess = false; } return isSuccess; } /** * 斷開連線 */ public void shutDownConnection(Socket socket) { try { if (out != null) { out.close(); } if (getMessageStream != null) { getMessageStream.close(); } if (socket != null) { socket.close(); } } catch (IOException e) { e.printStackTrace(); } } /** * 獲取伺服器返回的流 * * @param socket * @return */ public DataInputStream getMessageStream(Socket socket) { if (socket == null) { return null; } if (socket.isClosed()) { return null; } if (!socket.isConnected()) { return null; } try { getMessageStream = new DataInputStream(new BufferedInputStream( socket.getInputStream())); } catch (IOException e) { e.printStackTrace(); if (getMessageStream != null) { try { getMessageStream.close(); } catch (IOException e1) { e1.printStackTrace(); } } } return getMessageStream; } }
-
分析:
這裡使用了單例模式,保證了資料的唯一性,不會重複建立,可以看到登入傳送了包頭、命令和資料長度,而心跳只是包頭和命令,因為包體長度為空,所以不用傳送,最後轉成4個位元組的二進位制資料進行傳送。這樣,proto buffer的優點就體現出來了,方便客戶端和服務端的解析。
-
二進位制轉換工具類
public class ByteUtil { /** * 將2個byte陣列進行拼接 */ public static byte[] addBytes(byte[] data1, byte[] data2) { byte[] data3 = new byte[data1.length + data2.length]; System.arraycopy(data1, 0, data3, 0, data1.length); System.arraycopy(data2, 0, data3, data1.length, data2.length); return data3; } /** * 將3個byte陣列進行拼接 */ public static byte[] addBytes(byte[] data1, byte[] data2, byte[] data3) { byte[] data4 = new byte[data1.length + data2.length + data3.length]; System.arraycopy(data1, 0, data4, 0, data1.length); System.arraycopy(data2, 0, data4, data1.length, data2.length); System.arraycopy(data3, 0, data4, data1.length + data2.length, data3.length); return data4; } /** * int轉byte{} */ public static byte[] intToBytes(int value, ByteOrder mode) { byte[] src = new byte[4]; if (mode == ByteOrder.LITTLE_ENDIAN) { src[3] = (byte) ((value >> 24) & 0xFF); src[2] = (byte) ((value >> 16) & 0xFF); src[1] = (byte) ((value >> 8) & 0xFF); src[0] = (byte) (value & 0xFF); } else { src[0] = (byte) ((value >> 24) & 0xFF); src[1] = (byte) ((value >> 16) & 0xFF); src[2] = (byte) ((value >> 8) & 0xFF); src[3] = (byte) (value & 0xFF); } return src; } /** * 16進製表示的字串轉換為位元組陣列 * * @param s 16進製表示的字串 * @return byte[] 位元組陣列 */ public static byte[] hexStringToByteArray(String s) { int len = s.length(); byte[] b = new byte[len / 2]; for (int i = 0; i < len; i += 2) { // 兩位一組,表示一個位元組,把這樣表示的16進位制字串,還原成一個位元組 b[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + Character .digit(s.charAt(i + 1), 16)); } return b; } /** * byte陣列中取int數值,本方法適用於(低位在前,高位在後)的順序,和和intToBytes()配套使用 * * @param srcbyte陣列 * @param offset 從陣列的第offset位開始 * @return int數值 */ public static int bytesToInt(byte[] src, int offset) { int value; value = (int) ((src[offset] & 0xFF) | ((src[offset + 1] & 0xFF) << 8) | ((src[offset + 2] & 0xFF) << 16) | ((src[offset + 3] & 0xFF) << 24)); return value; } /** * byte陣列中取int數值,本方法適用於(低位在後,高位在前)的順序。和intToBytes2()配套使用 */ public static int bytesToInt2(byte[] src, int offset) { int value; value = (int) (((src[offset] & 0xFF) << 24) | ((src[offset + 1] & 0xFF) << 16) | ((src[offset + 2] & 0xFF) << 8) | (src[offset + 3] & 0xFF)); return value; } /** * 將int數值轉換為佔四個位元組的byte陣列,本方法適用於(低位在前,高位在後)的順序。 和 bytesToInt()配套使用 * * @param value 要轉換的int值 * @return byte陣列 */ public static byte[] intToBytes(int value) { byte[] src = new byte[4]; src[3] = (byte) ((value >> 24) & 0xFF); src[2] = (byte) ((value >> 16) & 0xFF); src[1] = (byte) ((value >> 8) & 0xFF); src[0] = (byte) (value & 0xFF); return src; } /** * 將int數值轉換為佔四個位元組的byte陣列,本方法適用於(高位在前,低位在後)的順序。和 bytesToInt2()配套使用 */ public static byte[] intToBytes2(int value) { byte[] src = new byte[4]; src[0] = (byte) ((value >> 24) & 0xFF); src[1] = (byte) ((value >> 16) & 0xFF); src[2] = (byte) ((value >> 8) & 0xFF); src[3] = (byte) (value & 0xFF); return src; } /** * 將位元組轉換為二進位制字串 * * @param bytes 位元組陣列 * @return 二進位制字串 */ public static String byteToBit(byte... bytes) { StringBuffer sb = new StringBuffer(); int z, len; String str; for (int w = 0; w < bytes.length; w++) { z = bytes[w]; z |= 256; str = Integer.toBinaryString(z); len = str.length(); sb.append(str.substring(len - 8, len)); } return sb.toString(); } /** * 位元組陣列轉為普通字串(ASCII對應的字元) * * @param bytearray byte[] * @return String */ public static String byte2String(byte[] bytearray) { String result = ""; char temp; int length = bytearray.length; for (int i = 0; i < length; i++) { temp = (char) bytearray[i]; result += temp; } return result; } /** * 二進位制字串轉十進位制 * * @param binary 二進位制字串 * @return 十進位制數值 */ public static int binaryToAlgorism(String binary) { int max = binary.length(); int result = 0; for (int i = max; i > 0; i--) { char c = binary.charAt(i - 1); int algorism = c - '0'; result += Math.pow(2, max - i) * algorism; } return result; } /** * 位元組陣列轉換為十六進位制字串 * * @param b byte[] 需要轉換的位元組陣列 * @return String 十六進位制字串 */ public static String byte2hex(byte b[]) { if (b == null) { throw new IllegalArgumentException( "Argument b ( byte array ) is null! "); } String hs = ""; String stmp = ""; for (int n = 0; n < b.length; n++) { stmp = Integer.toHexString(b[n] & 0xff); if (stmp.length() == 1) { hs = hs + "0" + stmp; } else { hs = hs + stmp; } } return hs.toUpperCase(); } /** * 十六進位制字串轉換十進位制 * * @param hex 十六進位制字串 * @return 十進位制數值 */ public static int hexStringToAlgorism(String hex) { hex = hex.toUpperCase(); int max = hex.length(); int result = 0; for (int i = max; i > 0; i--) { char c = hex.charAt(i - 1); int algorism = 0; if (c >= '0' && c <= '9') { algorism = c - '0'; } else { algorism = c - 55; } result += Math.pow(16, max - i) * algorism; } return result; } /** * 字串轉換成十六進位制字串 * * @param str 待轉換的ASCII字串 * @return String 每個Byte之間空格分隔,如: [61 6C 6B] */ public static String str2HexStr(String str) { char[] chars = "0123456789ABCDEF".toCharArray(); StringBuilder sb = new StringBuilder(""); byte[] bs = str.getBytes(); int bit; for (int i = 0; i < bs.length; i++) { bit = (bs[i] & 0x0f0) >> 4; sb.append(chars[bit]); bit = bs[i] & 0x0f; sb.append(chars[bit]); sb.append(' '); } return sb.toString().trim(); } /** * 16進位制轉換成字串 * * @param hexStr * @return */ public static String hexStr2Str(String hexStr) { String str = "0123456789ABCDEF"; char[] hexs = hexStr.toCharArray(); byte[] bytes = new byte[hexStr.length() / 2]; int n; for (int i = 0; i < bytes.length; i++) { n = str.indexOf(hexs[2 * i]) * 16; n += str.indexOf(hexs[2 * i + 1]); bytes[i] = (byte) (n & 0xff); } return new String(bytes); } /** * 重寫了Inpustream 中的skip(long n) 方法, * 將資料流中起始的n 個位元組跳過 */ public static long skipBytesFromStream(InputStream inputStream, long n) { long remaining = n; // SKIP_BUFFER_SIZE is used to determine the size of skipBuffer int SKIP_BUFFER_SIZE = 2048; // skipBuffer is initialized in skip(long), if needed. byte[] skipBuffer = null; int nr = 0; if (skipBuffer == null) { skipBuffer = new byte[SKIP_BUFFER_SIZE]; } byte[] localSkipBuffer = skipBuffer; if (n <= 0) { return 0; } while (remaining > 0) { try { nr = inputStream.read(localSkipBuffer, 0, (int) Math.min(SKIP_BUFFER_SIZE, remaining)); } catch (IOException e) { e.printStackTrace(); } if (nr < 0) { break; } remaining -= nr; } return n - remaining; } }
最後
對於socket和 proto buffer來說,能靈活運用,首先要感謝我們公司的同事,沒有他們提供思路,估計很難靈活運用socket和proto buffer。其次,要感謝之前公司的大佬,還有給我提供寶貴意見的各位好友。還有要感謝自己,能靜下心來,堅持不懈,克服proto buffer和socket相結合的寫法。