1. 程式人生 > >Android與Java NIO實現簡單Echo伺服器與客戶端

Android與Java NIO實現簡單Echo伺服器與客戶端

上一篇用Java IO來做了個Demo,於是乎進一步,用Java NIO來做一個。

NIO的優勢在於非阻塞。使用了Selector在一個執行緒裡進行輪詢,就能夠完成接入、收\發訊息的操作,不需要每建立一個連線都新啟動一個執行緒的方式。

Server端程式碼:

public class EchoServer {
    private static final int MAX_SIZE = 256; // max size 256
    private static Charset mCharSet = Charset.forName("UTF-8"); //encode and decode charset

    public static void main(String args[]) {
        try {
            Selector selector = Selector.open();    // init an selector
            initServerChannel(selector);
            startSelector(selector);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void initServerChannel(Selector selector) {
        ServerSocketChannel serverChannel = null;
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false); // not blocking
            serverChannel.socket().bind(new InetSocketAddress(8999));

            serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 將ServerChannl註冊為accept感興趣型別
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void startSelector(Selector selector) {
        int loopCount = 0;
        while (true) {
            int n = 0;    // Selector輪詢註冊來的Channel, 阻塞到至少有一個通道在你註冊的事件上就緒了。
            try {
                n = selector.select();
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (n == 0) {
                continue;
            }
            System.out.println("loopCount:" + loopCount);
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();    // 獲取SelectionKey
                SocketChannel socketChannel = null;
                SelectableChannel selectableChannel = selectionKey.channel();

                // 每一步都要判斷selectionKey.isValid(),避免斷開連線產生的java.nio.channels.CancelledKeyException
                if (selectionKey.isValid() && selectionKey.isAcceptable()) {
                    System.out.println("selectionKey isAcceptable");
                    acceptClient(selectionKey, (ServerSocketChannel) selectableChannel);
                }
                if (selectionKey.isValid() && selectionKey.isReadable()) {
                    // a channel is ready for reading
                    System.out.println("selectionKey isReadable");
                    socketChannel = (SocketChannel) selectableChannel;// 返回為之建立此鍵的通道。
                    readMsg(selectionKey, socketChannel);
                }
                if (selectionKey.isValid() && selectionKey.isWritable()) {
                    // a channel is ready for writing
                    System.out.println("selectionKey isWritable");
                    socketChannel = (SocketChannel) selectableChannel;
                    writeMsg(selectionKey, socketChannel);
                }
                iterator.remove();
            }
            loopCount++;
        }
    }

    private static void acceptClient(SelectionKey selectionKey, ServerSocketChannel serverChannel) {
        // 此方法返回的套接字通道(如果有)將處於阻塞模式。
        try {
            SocketChannel socketChannel = serverChannel.accept();
            socketChannel.configureBlocking(false);

            // 向Selector註冊Channel,設定讀取為感興趣操作,此類操作將會在下一次選擇器select操作時被交付。同時附加byteBuffer物件作為資料傳遞的容器
            socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);

            System.out.println("connected from:" + socketChannel.getRemoteAddress());
        } catch (IOException e) {
            e.printStackTrace();
            selectionKey.cancel();
        }
    }

    private static void readMsg(SelectionKey selectionKey, SocketChannel socketChannel) {
        if (selectionKey == null || socketChannel == null) {
            return;
        }
        ByteBuffer dataBuffer = ByteBuffer.allocate(MAX_SIZE);
        int count = 0;
        try {
            count = socketChannel.read(dataBuffer);
        } catch (IOException e) {
            e.printStackTrace();
            selectionKey.cancel();
            try {
                socketChannel.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }

        if (count > 0) {
            dataBuffer.flip();
            byte[] bytes = new byte[dataBuffer.remaining()];
            dataBuffer.get(bytes);
            String data = new String(bytes, mCharSet);
            System.out.println("received: " + data);
            selectionKey.attach(dataBuffer);    // 給SelectionKey附加上的DataBuffer物件
            selectionKey.interestOps(SelectionKey.OP_WRITE);    // 讀取完,設定寫入為感興趣操作,這樣在接下來一個迴圈會觸發到服務端寫操作,給使用者返回資料
        }
        if (count == -1) {
            selectionKey.cancel();
            try {
                socketChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private static void writeMsg(SelectionKey selectionKey, SocketChannel socketChannel) {
        if (selectionKey == null || socketChannel == null) {
            return;
        }
        ByteBuffer dataBuffer = (ByteBuffer) selectionKey.attachment();

        String data = new String(dataBuffer.array(), mCharSet);
        String result = "response: " + data.trim();
        dataBuffer.flip();
        System.out.println("send back: " + result);
        dataBuffer = ByteBuffer.wrap(result.getBytes(mCharSet));
        //輸出到通道
        try {
            while (dataBuffer.hasRemaining()) {
                socketChannel.write(dataBuffer);
            }
            //將緩衝區的當前位置和界限之間的位元組(如果有)複製到緩衝區的開始處
            dataBuffer.compact();
            selectionKey.interestOps(SelectionKey.OP_READ); // 寫入完畢,設定讀取為感興趣操作
        } catch (IOException e) {
            e.printStackTrace();
            selectionKey.cancel();
            try {
                socketChannel.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }
}

Android客戶端:

public class MainActivity extends AppCompatActivity {

    private static final String TAG = MainActivity.class.getSimpleName();

    private EditText mIpEt;
    private EditText mPortEt;
    private Button mConnBtn;
    private TextView mScreenTv;
    private EditText mInputEt;

    private Button mSendBtn;

    private SocketThread mSocketThread;
    private static Handler mMainHander;

    private static final int MSG_CONNECT = 0x001;
    private static final int MSG_RECEIVE = 0x002;
    private static final int MSG_SEND_ERROR = 0x003;

    private static final String DATA_RECEIVE = "data_receive";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        mIpEt = findViewById(R.id.main_ip_et);
        mPortEt = findViewById(R.id.main_port_et);
        mConnBtn = findViewById(R.id.main_connect_btn);
        mScreenTv = findViewById(R.id.main_screen_tv);
        mInputEt = findViewById(R.id.main_input_et);
        mSendBtn = findViewById(R.id.main_send_btn);

        // defalut value. Change it to your own server ip
        mIpEt.setText("172.16.62.65");
        mPortEt.setText("8999");

        mConnBtn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                String ip = mIpEt.getText().toString();
                String port = mPortEt.getText().toString();
                if (TextUtils.isEmpty(ip) || TextUtils.isEmpty(port)) {
                    Toast.makeText(MainActivity.this, "ip or port is null", Toast.LENGTH_SHORT).show();
                } else {
                    connectToServer(ip, Integer.valueOf(port));
                }
            }
        });

        mSendBtn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                String data = mInputEt.getText().toString();
                if (!TextUtils.isEmpty(data)) {
                    mSocketThread.sendMsgToServer(data);
                }
            }
        });

        // TODO handler may cause memory leaks
        mMainHander = new Handler() {
            @Override
            public void handleMessage(Message msg) {
                switch (msg.what) {
                    case MSG_CONNECT:
                        Toast.makeText(MainActivity.this, "Connect to Server Success", Toast.LENGTH_SHORT).show();
                        mConnBtn.setText("Connected");
                        mConnBtn.setEnabled(false);
                        break;
                    case MSG_RECEIVE:
                        Bundle data = msg.getData();
                        String dataStr = data.getString(DATA_RECEIVE);
                        Log.i(TAG, "received data:" + dataStr);
                        CharSequence originData = mScreenTv.getText();
                        String result = originData + "\n" + dataStr;
                        mScreenTv.setText(result);
                        break;
                    case MSG_SEND_ERROR:
                        Toast.makeText(MainActivity.this, "Send Error, Connection may be Closed", Toast.LENGTH_SHORT).show();
                        break;
                }
            }
        };
    }

    private void connectToServer(String ip, int port) {
        mSocketThread = new SocketThread(ip, port);
        mSocketThread.start();
    }

    private static class SocketThread extends Thread {
        private static final int MAX_SIZE = 256; // max size 256
        private static Charset mCharSet = Charset.forName("UTF-8"); //encode and decode charset
        private String mIp;
        private int mPort;
        private SocketChannel mClientChannel;
        private Selector mSelector;

        public SocketThread(String ip, int port) {
            this.mIp = ip;
            this.mPort = port;
            try {
                mSelector = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            initClientChannel(mSelector);
            startSelector(mSelector);
        }

        private void initClientChannel(Selector selector) {
            try {
                mClientChannel = SocketChannel.open();
                mClientChannel.configureBlocking(false);
                mClientChannel.connect(new InetSocketAddress(mIp, mPort));
                mClientChannel.register(selector, SelectionKey.OP_CONNECT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void startSelector(Selector selector) {
            while (true) {
                int n = 0;    // Selector輪詢註冊來的Channel, 阻塞到至少有一個通道在你註冊的事件上就緒了。
                try {
                    n = selector.select();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (n == 0) {
                    continue;
                }
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();    // 獲取SelectionKey

                    // 每一步都要判斷selectionKey.isValid(),避免斷開連線產生的java.nio.channels.CancelledKeyException
                    if (selectionKey.isValid() && selectionKey.isConnectable()) {
                        connectServer(selectionKey);
                    }
                    if (selectionKey.isValid() && selectionKey.isReadable()) {
                        // a channel is ready for reading
                        readMsg(selectionKey);
                    }
                    if (selectionKey.isValid() && selectionKey.isWritable()) {
                        // a channel is ready for writing
                    }
                    iterator.remove();
                }
            }
        }

        private void connectServer(SelectionKey selectionKey) {
            try {
                mClientChannel.finishConnect();
                mMainHander.sendEmptyMessage(MSG_CONNECT);
                selectionKey.interestOps(SelectionKey.OP_READ);
                Log.i(TAG, "connected to:" + mClientChannel.socket().getInetAddress());
            } catch (IOException e) {
                e.printStackTrace();
                selectionKey.cancel();
                try {
                    mClientChannel.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }

        private void readMsg(SelectionKey selectionKey) {
            if (selectionKey == null || mClientChannel == null) {
                return;
            }
            ByteBuffer dataBuffer = ByteBuffer.allocate(MAX_SIZE); // 從SelectionKey中取出註冊時附加上的DataBuffer物件
            int count = 0;
            try {
                count = mClientChannel.read(dataBuffer);
            } catch (IOException e) {
                e.printStackTrace();
                selectionKey.cancel();
                try {
                    mClientChannel.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if (count > 0) {
                dataBuffer.flip();
                byte[] bytes = new byte[dataBuffer.remaining()];
                dataBuffer.get(bytes);
                String data = new String(bytes, mCharSet);
                Message message = mMainHander.obtainMessage();
                message.what = MSG_RECEIVE;
                Bundle bundle = new Bundle();
                bundle.putString(DATA_RECEIVE, data);
                message.setData(bundle);
                mMainHander.sendMessage(message);
            }
        }

        public void sendMsgToServer(final String data) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    ByteBuffer dataBuffer = ByteBuffer.allocate(data.length());
                    dataBuffer.put(data.getBytes(mCharSet));
                    dataBuffer.flip();
                    //輸出到通道
                    try {
                        while (dataBuffer.hasRemaining()) {
                            mClientChannel.write(dataBuffer);
                        }
                        Log.i(TAG, "send data" + data);
                    } catch (IOException e) {
                        e.printStackTrace();
                        mMainHander.sendEmptyMessage(MSG_SEND_ERROR);
                        try {
                            if (mClientChannel != null) {
                                mClientChannel.close();
                            }
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }
}

MainActivity的佈局檔案:

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    android:orientation="vertical"
    android:layout_margin="15dp">

    <EditText
        android:id="@+id/main_ip_et"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:hint="ip address"/>

    <EditText
        android:id="@+id/main_port_et"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:hint="port"
        android:inputType="number"/>

    <Button
        android:id="@+id/main_connect_btn"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        android:text="Connect"/>

    <EditText
        android:id="@+id/main_input_et"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:hint="message to server"/>

    <Button
        android:id="@+id/main_send_btn"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        android:text="Send"/>

    <ScrollView
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:layout_marginTop="10dp">
        <TextView
            android:id="@+id/main_screen_tv"
            android:layout_width="match_parent"
            android:layout_height="wrap_content"
            android:text="received message will be shown here"/>
    </ScrollView>

</LinearLayout>

最後,同樣不要忘記清單檔案中宣告網路許可權。

NIO的使用比IO要複雜一些,但是它非阻塞的特性,極大減少執行緒使用這些優勢,還是很值得研究的。畢竟真正到了專案級別的程式碼,不可能用Java IO去實現,一定是基於NIO的網路框架。

Java NIO相關例子網上不太多,自己做的時候也踩了一些坑,尤其要注意ByteBuffer的使用方式,Java IO資料是面向流的,而NIO是面向Buffer的,Buffer的讀寫是基本功,如果有疑惑可以去查閱相關資料。也歡迎大家一起來探討、學習~

相關推薦

no