1. 程式人生 > >IO與NIO對於非同步Socket的處理

IO與NIO對於非同步Socket的處理

以下的內容以程式碼為主,簡單的展示了傳統IO流和NIO流對Socket請求的處理。

簡單來說,傳統IO流想要處理多個客戶端的Socket請求,它必須要不斷的建立新的執行緒來專門為連入的Socket請求進行處理,如果連入的Socket請求很多,並且來自不同的IP或者埠就必須要不斷的建立執行緒,對系統資源會造成很大的佔用。

下面就是傳統IO流非同步處理Socket請求的程式碼:

package com.firstdata.IOSocket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketCase {

    public static final int Port = 4495;
    public static ServerSocket server = null;

    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub
        init();
        System.out.println("Finish init!");
        while (true) {
            Socket socket = server.accept();
            System.out.println("Client connected!" + socket.getPort());
            try {
                SocketProcess socketProcess = new SocketProcess(socket);
                System.out.println("Start thread!");
                // Thread thread = new Thread(socketCase);
                Thread thread = new Thread(socketProcess);
                thread.start();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    }

    private static void init() throws IOException {
        server = new ServerSocket();
        server.setSoTimeout(0);
        server.setReuseAddress(true);
        server.bind(new InetSocketAddress(4495));

    }

}

除了上面這個主函式和初始化方法,還需要下面這個實現多執行緒的類:

package com.firstdata.IOSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class SocketProcess implements Runnable {

    private static Socket socket = null;

    public SocketProcess(Socket socket) {
        SocketProcess.socket = socket;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            handlerSocket();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void handlerSocket() {
        InputStream rd = null;
        OutputStream bw = null;
        try {
            rd = getBufferedReader(socket);
            bw = getBufferedWriter(socket);
            byte[] ReqBuff = new byte[1000];
            System.out.println("Ready receive the request message!");
            while (rd.read(ReqBuff) > 0) {
                System.out.println("start");
                System.out.println(new String(ReqBuff));
                System.out.println("finish");
            }
        } catch (Exception e) {
            e.printStackTrace();
            try {
                rd.close();
                bw.close();
                socket.close();
            } catch (IOException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }
    }

    private static InputStream getBufferedReader(Socket socket) throws IOException {
        InputStream in = socket.getInputStream();
        return in;
    }

    private static OutputStream getBufferedWriter(Socket socket) throws IOException {
        OutputStream out = socket.getOutputStream();
        return out;
    }

}

有上面2個類就能夠構成一個簡單的非同步Socket服務端類

對於NIO而已,NIO會有一個選擇器的出現,有點類似在傳統IO流的處理之後加上一塊分配資料流的工作,這樣能夠使得Socket在不建立執行緒的任務下處理多個來自不同地方的Socket請求,對於NIO原理的介紹,網上有很多很多,下面是個簡單的NIO類:

package com.firstdata.NIOSocket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class SocketCase {

    // 通道管理器
    private Selector selector;

    /**
     * 獲得一個ServerSocket通道,並對該通道做一些初始化的工作
     * 
     * @param port 繫結的埠號
     * @throws IOException
     */
    public void initServer(int port) throws IOException {
        // 獲得一個ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 設定通道為非阻塞
        serverChannel.configureBlocking(false);
        // 將該通道對應的ServerSocket繫結到port埠
        serverChannel.socket().bind(new InetSocketAddress(port));
        // 獲得一個通道管理器
        this.selector = Selector.open();
        // 將通道管理器和該通道繫結,併為該通道註冊SelectionKey.OP_ACCEPT事件,註冊該事件後,
        // 當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("OP_ACCEPT");
    }

    /**
     * 採用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則進行處理
     * 
     * @throws IOException
     */
    public void listen() {
        System.out.println("服務端啟動成功!");
        // 輪詢訪問selector
        while (true) {
            // 當註冊的事件到達時,方法返回;否則,該方法會一直阻塞
            try {
                selector.select();
                // 獲得selector中選中的項的迭代器,選中的項為註冊的事件
                Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
                while (ite.hasNext()) {
                    SelectionKey key = (SelectionKey) ite.next();
                    // 刪除已選的key,以防重複處理
                    ite.remove();
                    // 客戶端請求連線事件
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        // 獲得和客戶端連線的通道
                        SocketChannel channel = server.accept();
                        // 設定成非阻塞
                        channel.configureBlocking(false);

                        // 在這裡可以給客戶端傳送資訊哦
                        channel.write(ByteBuffer.wrap(new String("向客戶端傳送了一條資訊").getBytes()));
                        // 在和客戶端連線成功之後,為了可以接收到客戶端的資訊,需要給通道設定讀的許可權。
                        channel.register(this.selector, SelectionKey.OP_READ);

                        // 獲得了可讀的事件
                    } else if (key.isReadable()) {
                        read(key);
                    }

                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 處理讀取客戶端發來的資訊 的事件
     * 
     * @param key
     * @throws IOException
     */
    public void read(SelectionKey key) throws IOException {
        try {
            // 伺服器可讀取訊息:得到事件發生的Socket通道
            SocketChannel channel = (SocketChannel) key.channel();
            // 建立讀取的緩衝區
            ByteBuffer buffer = ByteBuffer.allocate(100);
            int statusCode = channel.read(buffer);
            System.out.println("length:" + statusCode);
            if (statusCode != -1) {
                byte[] data = buffer.array();
                String msg = new String(data).trim();
                System.out.println("服務端收到資訊:" + msg);
                ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
                channel.write(outBuffer);// 將訊息回送給客戶端
            } else {
                channel.close();
            }
        } catch (Exception e) {

        }
    }

    /**
     * 啟動服務端測試
     * 
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        SocketCase server = new SocketCase();
        server.initServer(4495);
        server.listen();
    }

}

本人只是程式碼的搬運工以及測試員。