1. 程式人生 > >android socket連結 NIO非阻塞方式

android socket連結 NIO非阻塞方式

最近在研究android的推送,一開始準備自己搭建推送伺服器,並在android機上建立一個socket長連線,由於之前一直是用c++,在網上搜索一些資料後,臨時寫了一個基於NIO模式的客戶端socket連結類,測試後能使用,當然還有很多問題,沒有去修改了,因為最後發現,現在國內已經有現成的第三方推送平臺,基於專案的開發時間有限,準備使用第三方推送平臺,廢話不多說,直接貼程式碼,說明都在程式碼中.

package cn.kidstone.cartoon.net;

import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

/**
 * 通過NIO方式進行socket連結
 * @author He Zhongqiu
 *
 */
public class TCPClient {
	/** 通道選擇器 */
	private Selector mSelector;
	
	/** 伺服器通訊的通道 */
	private SocketChannel mChannel;
	
	/** 遠端伺服器ip地址 */
	private String mRemoteIp;
	
	/** 遠端伺服器埠 */
	private int mPort;
	
	/** 是否載入過的標識 */
	private boolean mIsInit = false;
	
	/** 單鍵例項 */
	private static TCPClient gTcp;
	
	private TCPClientEventListener mEventListener;
	
	/** 預設連結超時時間 */
	public static final int TIME_OUT = 10000;
	
	/** 讀取buff的大小 */
	public static final int READ_BUFF_SIZE = 1024;
	
	/** 訊息流的格式 */
	public static final String BUFF_FORMAT = "utf-8";
	
	public static synchronized TCPClient instance() {
		if ( gTcp == null ) {
			gTcp = new TCPClient();
		}
		return gTcp;
	}
	
	private TCPClient() {
		
	}
	
	/**
	 * 連結遠端地址
	 * @param remoteIp
	 * @param port
	 * @param TCPClientEventListener
	 * @return
	 */
	public void connect( String remoteIp, int port, TCPClientEventListener tcel ) {
		mRemoteIp = remoteIp;
		mPort = port;
		mEventListener = tcel;
		connect();
	}
	
	/**
	 * 連結遠端地址
	 * @param remoteIp
	 * @param port
	 * @return
	 */
	public void connect( String remoteIp, int port ) {
		connect(remoteIp,port,null);
	}
	
	private void connect() {
		//需要在子執行緒下進行連結
		MyConnectRunnable connect = new MyConnectRunnable();
		new Thread(connect).start();
	}
	
	/**
	 * 傳送字元
	 * @param msg
	 * @return
	 */
	public boolean sendMsg(String msg) {
		boolean bRes = false;
		try {
			bRes = sendMsg(msg.getBytes(BUFF_FORMAT));
		} catch ( Exception e ) {
			e.printStackTrace();
		}
		
		return bRes;
	}
	
	/**
	 * 傳送資料,此函式需要在獨立的子執行緒中完成,可以考慮做一個傳送佇列
	 * 自己開一個子執行緒對該佇列進行處理,就好像connect一樣
	 * @param bt
	 * @return
	 */
	public boolean sendMsg( byte[] bt ) {
		boolean bRes = false;
		if ( !mIsInit ) {
			
			return bRes;
		}
		try {
			ByteBuffer buf = ByteBuffer.wrap(bt);
			int nCount = mChannel.write(buf);
			if ( nCount > 0 ) {
				bRes = true;
			}
		} catch ( Exception e ) {
			e.printStackTrace();
		}
		
		return bRes;
	}
	
	public Selector getSelector() {
		return mSelector;
	}
	
	/**
	 * 是否連結著
	 * @return
	 */
	public boolean isConnect() {
		if ( !mIsInit ) {
			return false;
		}
		return mChannel.isConnected();
	}
	
	/**
	 * 關閉連結
	 */
	public void close() {
		mIsInit = false;
		mRemoteIp = null;
		mPort = 0;
		try {
			if ( mSelector != null ) {
				mSelector.close();
			}
			if ( mChannel != null ) {
				mChannel.close();
			}
		} catch ( Exception e ) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 重連
	 * @return
	 */
	public void reConnect() {
		close();
		connect();
	}
	
	/**
	 * 傳送一個測試資料到伺服器,檢測伺服器是否關閉
	 * @return
	 */
	public boolean canConnectServer() {
		boolean bRes = false;
		if ( !isConnect() ) {
			return bRes;
		}
		try {
			mChannel.socket().sendUrgentData(0xff);
		} catch ( Exception e ) {
			e.printStackTrace();
		}
		return bRes;
	}
	
	/**
	 * 每次讀完資料後,需要重新註冊selector讀取資料
	 * @return
	 */
	private synchronized boolean repareRead() {
		boolean bRes = false;
		try {
			//開啟並註冊選擇器到通道
			mSelector = Selector.open();
			if ( mSelector != null ) {
				mChannel.register(mSelector, SelectionKey.OP_READ);
				bRes = true;
			}
		} catch ( Exception e ) {
			e.printStackTrace();
		} 
		return bRes;
	}
	
	public void revMsg() {
		if ( mSelector == null ) {
			return;
		}
		boolean bres = true;
		while ( mIsInit ) {
			if ( !isConnect() ) {
				bres = false;
			}
			if ( !bres ) {
				try {
					Thread.sleep(100);
				} catch ( Exception e ) {
					e.printStackTrace();
				}
				
				continue;
			}
			
			try {
				//有資料就一直接收
				while (mIsInit && mSelector.select() > 0) {
					for ( SelectionKey sk : mSelector.selectedKeys() ) {
						//如果有可讀資料
						if ( sk.isReadable() ) {
							//使用NIO讀取channel中的資料
							SocketChannel sc = (SocketChannel)sk.channel();
							//讀取快取
							ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFF_SIZE);
							//實際的讀取流
							ByteArrayOutputStream read = new ByteArrayOutputStream();
							int nRead = 0;
							int nLen = 0;
							//單個讀取流
							byte[] bytes;
							//讀完為止
							while ( (nRead = sc.read(readBuffer) ) > 0 ) {
								//整理
								readBuffer.flip();
								bytes = new byte[nRead];
								nLen += nRead;
								//將讀取的資料拷貝到位元組流中
								readBuffer.get(bytes);
								//將位元組流新增到實際讀取流中
								read.write(bytes);
								/////////////////////////////////////
								//@ 需要增加一個解析器,對資料流進行解析
								
								/////////////////////////////////////
								
								readBuffer.clear();
							}
							if ( nLen > 0 ) {
								if ( mEventListener != null ) {
									mEventListener.recvMsg(read);
								} else {
									String info = new String(read.toString(BUFF_FORMAT));
									System.out.println("rev:"+info);
								}
							}
							
							//為下一次讀取做準備
							sk.interestOps(SelectionKey.OP_READ);
						}
						
						//刪除此SelectionKey
						mSelector.selectedKeys().remove(sk);
					}
				}
			} catch ( Exception e ) {
				e.printStackTrace();
			}
		}
		
	}
	
	public interface TCPClientEventListener {
		/**
		 * 多執行緒下接收到資料
		 * @param read
		 * @return 
		 */
		void recvMsg(ByteArrayOutputStream read);
	}
	
	/**
	 * 連結執行緒
	 * @author HeZhongqiu
	 *
	 */
	private class MyConnectRunnable implements Runnable {

		@Override
		public void run() {
			// TODO Auto-generated method stub
			try {
				//開啟監聽通道,並設定為非阻塞模式
				SocketAddress ad = new InetSocketAddress(mRemoteIp, mPort);
				mChannel = SocketChannel.open( ad );
				if ( mChannel != null ) {
					mChannel.socket().setTcpNoDelay(false);
					mChannel.socket().setKeepAlive(true);
					
					//設定超時時間
					mChannel.socket().setSoTimeout(TIME_OUT);
					mChannel.configureBlocking(false);
					
					mIsInit = repareRead();
					
					//建立讀執行緒
					RevMsgRunnable rev = new RevMsgRunnable();
					new Thread(rev).start();
				}
			} catch ( Exception e ) {
				e.printStackTrace();
			} finally {
				if ( !mIsInit ) {
					close();
				}
			}
		}
	}
	
	private class RevMsgRunnable implements Runnable {

		@Override
		public void run() {
			// TODO Auto-generated method stub
			revMsg();
		}
		
	}
}

其中,還需要完善的是:

1.傳送訊息,需要自己再建立一個訊息佇列,在另外的一個執行緒處理,如果在主執行緒下,會丟擲異常,

2.自定義訊息解析器,解析器的基類我就沒定義了,可根據實際需求進行新增,根據自己的訊息結構體,對訊息資料流進行解析

3.連結斷開後,收訊息的輪詢處理需要退出