Android與Java NIO實現簡單Echo伺服器與客戶端
阿新 • • 發佈:2019-01-19
上一篇用Java IO來做了個Demo,於是乎進一步,用Java NIO來做一個。
NIO的優勢在於非阻塞。使用了Selector在一個執行緒裡進行輪詢,就能夠完成接入、收\發訊息的操作,不需要每建立一個連線都新啟動一個執行緒的方式。
Server端程式碼:
public class EchoServer { private static final int MAX_SIZE = 256; // max size 256 private static Charset mCharSet = Charset.forName("UTF-8"); //encode and decode charset public static void main(String args[]) { try { Selector selector = Selector.open(); // init an selector initServerChannel(selector); startSelector(selector); } catch (IOException e) { e.printStackTrace(); } } private static void initServerChannel(Selector selector) { ServerSocketChannel serverChannel = null; try { serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // not blocking serverChannel.socket().bind(new InetSocketAddress(8999)); serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 將ServerChannl註冊為accept感興趣型別 } catch (IOException e) { e.printStackTrace(); } } private static void startSelector(Selector selector) { int loopCount = 0; while (true) { int n = 0; // Selector輪詢註冊來的Channel, 阻塞到至少有一個通道在你註冊的事件上就緒了。 try { n = selector.select(); } catch (IOException e) { e.printStackTrace(); } if (n == 0) { continue; } System.out.println("loopCount:" + loopCount); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 獲取SelectionKey SocketChannel socketChannel = null; SelectableChannel selectableChannel = selectionKey.channel(); // 每一步都要判斷selectionKey.isValid(),避免斷開連線產生的java.nio.channels.CancelledKeyException if (selectionKey.isValid() && selectionKey.isAcceptable()) { System.out.println("selectionKey isAcceptable"); acceptClient(selectionKey, (ServerSocketChannel) selectableChannel); } if (selectionKey.isValid() && selectionKey.isReadable()) { // a channel is ready for reading System.out.println("selectionKey isReadable"); socketChannel = (SocketChannel) selectableChannel;// 返回為之建立此鍵的通道。 readMsg(selectionKey, socketChannel); } if (selectionKey.isValid() && selectionKey.isWritable()) { // a channel is ready for writing System.out.println("selectionKey isWritable"); socketChannel = (SocketChannel) selectableChannel; writeMsg(selectionKey, socketChannel); } iterator.remove(); } loopCount++; } } private static void acceptClient(SelectionKey selectionKey, ServerSocketChannel serverChannel) { // 此方法返回的套接字通道(如果有)將處於阻塞模式。 try { SocketChannel socketChannel = serverChannel.accept(); socketChannel.configureBlocking(false); // 向Selector註冊Channel,設定讀取為感興趣操作,此類操作將會在下一次選擇器select操作時被交付。同時附加byteBuffer物件作為資料傳遞的容器 socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ); System.out.println("connected from:" + socketChannel.getRemoteAddress()); } catch (IOException e) { e.printStackTrace(); selectionKey.cancel(); } } private static void readMsg(SelectionKey selectionKey, SocketChannel socketChannel) { if (selectionKey == null || socketChannel == null) { return; } ByteBuffer dataBuffer = ByteBuffer.allocate(MAX_SIZE); int count = 0; try { count = socketChannel.read(dataBuffer); } catch (IOException e) { e.printStackTrace(); selectionKey.cancel(); try { socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } } if (count > 0) { dataBuffer.flip(); byte[] bytes = new byte[dataBuffer.remaining()]; dataBuffer.get(bytes); String data = new String(bytes, mCharSet); System.out.println("received: " + data); selectionKey.attach(dataBuffer); // 給SelectionKey附加上的DataBuffer物件 selectionKey.interestOps(SelectionKey.OP_WRITE); // 讀取完,設定寫入為感興趣操作,這樣在接下來一個迴圈會觸發到服務端寫操作,給使用者返回資料 } if (count == -1) { selectionKey.cancel(); try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } private static void writeMsg(SelectionKey selectionKey, SocketChannel socketChannel) { if (selectionKey == null || socketChannel == null) { return; } ByteBuffer dataBuffer = (ByteBuffer) selectionKey.attachment(); String data = new String(dataBuffer.array(), mCharSet); String result = "response: " + data.trim(); dataBuffer.flip(); System.out.println("send back: " + result); dataBuffer = ByteBuffer.wrap(result.getBytes(mCharSet)); //輸出到通道 try { while (dataBuffer.hasRemaining()) { socketChannel.write(dataBuffer); } //將緩衝區的當前位置和界限之間的位元組(如果有)複製到緩衝區的開始處 dataBuffer.compact(); selectionKey.interestOps(SelectionKey.OP_READ); // 寫入完畢,設定讀取為感興趣操作 } catch (IOException e) { e.printStackTrace(); selectionKey.cancel(); try { socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } } } }
Android客戶端:
public class MainActivity extends AppCompatActivity { private static final String TAG = MainActivity.class.getSimpleName(); private EditText mIpEt; private EditText mPortEt; private Button mConnBtn; private TextView mScreenTv; private EditText mInputEt; private Button mSendBtn; private SocketThread mSocketThread; private static Handler mMainHander; private static final int MSG_CONNECT = 0x001; private static final int MSG_RECEIVE = 0x002; private static final int MSG_SEND_ERROR = 0x003; private static final String DATA_RECEIVE = "data_receive"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); mIpEt = findViewById(R.id.main_ip_et); mPortEt = findViewById(R.id.main_port_et); mConnBtn = findViewById(R.id.main_connect_btn); mScreenTv = findViewById(R.id.main_screen_tv); mInputEt = findViewById(R.id.main_input_et); mSendBtn = findViewById(R.id.main_send_btn); // defalut value. Change it to your own server ip mIpEt.setText("172.16.62.65"); mPortEt.setText("8999"); mConnBtn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { String ip = mIpEt.getText().toString(); String port = mPortEt.getText().toString(); if (TextUtils.isEmpty(ip) || TextUtils.isEmpty(port)) { Toast.makeText(MainActivity.this, "ip or port is null", Toast.LENGTH_SHORT).show(); } else { connectToServer(ip, Integer.valueOf(port)); } } }); mSendBtn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { String data = mInputEt.getText().toString(); if (!TextUtils.isEmpty(data)) { mSocketThread.sendMsgToServer(data); } } }); // TODO handler may cause memory leaks mMainHander = new Handler() { @Override public void handleMessage(Message msg) { switch (msg.what) { case MSG_CONNECT: Toast.makeText(MainActivity.this, "Connect to Server Success", Toast.LENGTH_SHORT).show(); mConnBtn.setText("Connected"); mConnBtn.setEnabled(false); break; case MSG_RECEIVE: Bundle data = msg.getData(); String dataStr = data.getString(DATA_RECEIVE); Log.i(TAG, "received data:" + dataStr); CharSequence originData = mScreenTv.getText(); String result = originData + "\n" + dataStr; mScreenTv.setText(result); break; case MSG_SEND_ERROR: Toast.makeText(MainActivity.this, "Send Error, Connection may be Closed", Toast.LENGTH_SHORT).show(); break; } } }; } private void connectToServer(String ip, int port) { mSocketThread = new SocketThread(ip, port); mSocketThread.start(); } private static class SocketThread extends Thread { private static final int MAX_SIZE = 256; // max size 256 private static Charset mCharSet = Charset.forName("UTF-8"); //encode and decode charset private String mIp; private int mPort; private SocketChannel mClientChannel; private Selector mSelector; public SocketThread(String ip, int port) { this.mIp = ip; this.mPort = port; try { mSelector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { initClientChannel(mSelector); startSelector(mSelector); } private void initClientChannel(Selector selector) { try { mClientChannel = SocketChannel.open(); mClientChannel.configureBlocking(false); mClientChannel.connect(new InetSocketAddress(mIp, mPort)); mClientChannel.register(selector, SelectionKey.OP_CONNECT); } catch (IOException e) { e.printStackTrace(); } } private void startSelector(Selector selector) { while (true) { int n = 0; // Selector輪詢註冊來的Channel, 阻塞到至少有一個通道在你註冊的事件上就緒了。 try { n = selector.select(); } catch (IOException e) { e.printStackTrace(); } if (n == 0) { continue; } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 獲取SelectionKey // 每一步都要判斷selectionKey.isValid(),避免斷開連線產生的java.nio.channels.CancelledKeyException if (selectionKey.isValid() && selectionKey.isConnectable()) { connectServer(selectionKey); } if (selectionKey.isValid() && selectionKey.isReadable()) { // a channel is ready for reading readMsg(selectionKey); } if (selectionKey.isValid() && selectionKey.isWritable()) { // a channel is ready for writing } iterator.remove(); } } } private void connectServer(SelectionKey selectionKey) { try { mClientChannel.finishConnect(); mMainHander.sendEmptyMessage(MSG_CONNECT); selectionKey.interestOps(SelectionKey.OP_READ); Log.i(TAG, "connected to:" + mClientChannel.socket().getInetAddress()); } catch (IOException e) { e.printStackTrace(); selectionKey.cancel(); try { mClientChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } } } private void readMsg(SelectionKey selectionKey) { if (selectionKey == null || mClientChannel == null) { return; } ByteBuffer dataBuffer = ByteBuffer.allocate(MAX_SIZE); // 從SelectionKey中取出註冊時附加上的DataBuffer物件 int count = 0; try { count = mClientChannel.read(dataBuffer); } catch (IOException e) { e.printStackTrace(); selectionKey.cancel(); try { mClientChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } } if (count > 0) { dataBuffer.flip(); byte[] bytes = new byte[dataBuffer.remaining()]; dataBuffer.get(bytes); String data = new String(bytes, mCharSet); Message message = mMainHander.obtainMessage(); message.what = MSG_RECEIVE; Bundle bundle = new Bundle(); bundle.putString(DATA_RECEIVE, data); message.setData(bundle); mMainHander.sendMessage(message); } } public void sendMsgToServer(final String data) { new Thread(new Runnable() { @Override public void run() { ByteBuffer dataBuffer = ByteBuffer.allocate(data.length()); dataBuffer.put(data.getBytes(mCharSet)); dataBuffer.flip(); //輸出到通道 try { while (dataBuffer.hasRemaining()) { mClientChannel.write(dataBuffer); } Log.i(TAG, "send data" + data); } catch (IOException e) { e.printStackTrace(); mMainHander.sendEmptyMessage(MSG_SEND_ERROR); try { if (mClientChannel != null) { mClientChannel.close(); } } catch (IOException e1) { e1.printStackTrace(); } } } }).start(); } } }
MainActivity的佈局檔案:
<?xml version="1.0" encoding="utf-8"?> <LinearLayout xmlns:android="http://schemas.android.com/apk/res/android" android:layout_width="match_parent" android:layout_height="match_parent" android:orientation="vertical" android:layout_margin="15dp"> <EditText android:id="@+id/main_ip_et" android:layout_width="match_parent" android:layout_height="wrap_content" android:hint="ip address"/> <EditText android:id="@+id/main_port_et" android:layout_width="match_parent" android:layout_height="wrap_content" android:hint="port" android:inputType="number"/> <Button android:id="@+id/main_connect_btn" android:layout_width="wrap_content" android:layout_height="wrap_content" android:layout_gravity="center" android:text="Connect"/> <EditText android:id="@+id/main_input_et" android:layout_width="match_parent" android:layout_height="wrap_content" android:hint="message to server"/> <Button android:id="@+id/main_send_btn" android:layout_width="wrap_content" android:layout_height="wrap_content" android:layout_gravity="center" android:text="Send"/> <ScrollView android:layout_width="match_parent" android:layout_height="match_parent" android:layout_marginTop="10dp"> <TextView android:id="@+id/main_screen_tv" android:layout_width="match_parent" android:layout_height="wrap_content" android:text="received message will be shown here"/> </ScrollView> </LinearLayout>
最後,同樣不要忘記清單檔案中宣告網路許可權。
NIO的使用比IO要複雜一些,但是它非阻塞的特性,極大減少執行緒使用這些優勢,還是很值得研究的。畢竟真正到了專案級別的程式碼,不可能用Java IO去實現,一定是基於NIO的網路框架。
Java NIO相關例子網上不太多,自己做的時候也踩了一些坑,尤其要注意ByteBuffer的使用方式,Java IO資料是面向流的,而NIO是面向Buffer的,Buffer的讀寫是基本功,如果有疑惑可以去查閱相關資料。也歡迎大家一起來探討、學習~