1. 程式人生 > >網路程式設計NIO:BIO和NIO

網路程式設計NIO:BIO和NIO

BIO

BIO(Blocking I/O),同步阻塞,實現模式為一個連線一個執行緒,即當有客戶端連線時,伺服器端需為其單獨分配一個執行緒,如果該連線不做任何操作就會造成不必要的執行緒開銷。BIO是傳統的Java io程式設計,其相關的類和介面在java.io 包下。

BIO適用於連線數目較小且固定的架構,對伺服器資源的要求較高,是JDK1.4以前的唯一選擇,但程式簡單易理解。

BIO程式設計流程

  1. 伺服器端啟動一個SeverSocket

  2. 客戶端啟動Socket對伺服器端發起通訊,預設情況下伺服器端需為每個客戶端建立一個執行緒與之通訊

  3. 客戶端發起請求後,先諮詢伺服器端是否有執行緒響應,如果沒有則會等待或被拒絕

  4. 如果有執行緒響應,客戶端執行緒會等待請求結束後,再繼續執行

簡單程式碼實現

//BIO-伺服器端
public class BIOSever {
    public static void main(String[] args) throws IOException {
        //在BIO中,可以使用執行緒池進行優化
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("伺服器已啟動");

        while (true){
            System.out.println("等待客戶端連線.....(阻塞中)");
            Socket socket = serverSocket.accept();
            System.out.println("客戶端連線");
            cachedThreadPool.execute(new Runnable() {
                public void run() {
                    handler(socket);
                }
            });
        }
    }

    //從客服端socket讀取資料
    public static void handler(Socket socket){
        try{
            InputStream inputStream = socket.getInputStream();
            byte[] b = new byte[1024];
            while (true){
                System.out.println("等待客戶端輸入.....(阻塞中)");
                int read = inputStream.read(b);
                if (read != -1){
                    System.out.println(new String(b, 0, read));
                }else {
                    break;
                }
            }
            inputStream.close();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
//BIO-客戶端
public class BIOClient {
    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("localhost", 6666);
        OutputStream outputStream = socket.getOutputStream();
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String message = scanner.nextLine();
            if ("exit".equals(message)) {
                break;
            }
            outputStream.write(message.getBytes());
        }
        outputStream.close();
        socket.close();
    }
}

BIO問題分析

從上面程式碼中可以看出BIO程式設計的兩個問題:

  1. 伺服器端在監聽客戶端連線時(serverSocket.accept()),伺服器端處於阻塞狀態,不能處理其他事務

  2. 伺服器端需要為每個客戶端建立一個執行緒,雖然可以用執行緒池來優化,但在併發較大時,執行緒開銷依舊很大

  3. 當連線的客戶端沒有傳送資料時,伺服器端會阻塞在read操作上,等待客戶端輸入,造成執行緒資源浪費

 

NIO

從JDK1.4開始,java提供了一系列改進輸入/輸出的新特性,統稱為NIO,全稱n為new I/O,是同步非阻塞的,所以也有人稱為non-blocking I/O。NIO的相關類都放在java.nio包或其子包下,並對原先java.io包中許多類進行了改寫。

NIO的三大核心

緩衝區(Buffer)

NIO是面向緩衝區, 或者說是面向塊程式設計的。在NIO的IO傳輸中,資料會先讀入到緩衝區,當需要時再從緩衝區寫出,這樣減少了直接讀寫磁碟的次數,提高了IO傳輸的效率。

緩衝區(buffer)本質上是一個可以讀寫資料的記憶體塊,即在記憶體空間中預留了一定的儲存空間,這些儲存空間用來緩衝輸入和輸出的資料,這部分預留的儲存空間就叫緩衝區。

在NIO程式中,通道channel雖然負責資料的傳輸,但是輸入和輸出的資料都必須經過緩衝區buffer。

在java中,緩衝區的相關類都在java.nio包下,其最頂層的類是 Buffer,它是一個抽象類。

Buffer類的4個重要屬性:

  • mark:標記

  • position:位置,下一個要被讀或寫的元素的索引,每次讀寫緩衝區都會改變該值,為下次讀寫做準備

  • limit:表示緩衝區的終點,不能對緩衝區中超過極限的位置進行讀寫操作,且極限是可修改的

  • capacity:容量,即緩衝區的最多可容納的資料量,該值在建立緩衝區時被設立,且不可修改

Buffer類常用方法:

Buffer的常用子類(它們之間最大區別在於底層實現陣列的資料型別):

  • ByteBuffer:儲存位元組資料到緩衝區

  • CharBuffer:儲存字元資料到緩衝區

  • IntBuffer:儲存整型資料到緩衝區

  • ShortBuffer:儲存短整型資料到緩衝區

  • LongBuffer:儲存長整型資料到緩衝區

  • FloatBuffer:儲存浮點型資料到緩衝區

  • DoubleBuffer:儲存雙精度浮點型資料到緩衝區

ByteBuffer

在Buffer的所有子類中,最常用的還是ByteBuffer,它的常用方法:

 

通道(Channel)

在NIO程式中伺服器端和客戶端之間的資料讀寫不是通過流,而是通過通道來讀寫的。

通道類似於流,都是用來讀寫資料的,但它們之間也是有區別的:

  • 通道是雙向的,即可以讀也可以寫,而流是單向的,只能讀或寫

  • 通道可以實現非同步讀寫資料

  • 通道可以從緩衝區讀資料,也可以把資料寫入緩衝區

java中channel的相關類在java.nio.channel包下。Channel是一個介面,其常用的實現類有:

  • FileChannel:用於檔案的資料讀寫,其真正的實現類為FileChannelImpl

  • DatagramChannel:用於UDP的資料讀寫,其真正的實現類為DatagramChannelImpl

  • ServerSocketChannel:用於監聽TCP連線,每當有客戶端連線時都會建立一個SocketChannel,功能類似ServerSocket,其真正的實現類為ServerSocketChannelImpl

  • SocketChannel:用於TCP的資料讀寫,功能類似節點流+Socket,其真正的實現類為SocketChannelImpl

FileChannel

FileChannel主要用於對本地檔案進行IO操作,如檔案複製等。它的常用方法有:

在檔案傳輸流中有個屬性channel,它預設是空的,可以通過流中的getChanel()方法根據當前檔案流的屬性生成對應的FileChannel。

public FileChannel getChannel() {
        synchronized (this) {
            if (channel == null) {
                channel = FileChannelImpl.open(fd, path, false, true, append, this);
            }
            return channel;
        }
    }
}

下面是通道使用的程式碼例項:

public class NIOChannel {
    public static void main(String[] args) throws IOException {
    }

    //將資料寫入目標檔案
    public static void writeFile() throws IOException{
        String str = "Hello, gofy";
        //建立檔案輸出流
        FileOutputStream fileOutputStream = new FileOutputStream("f:\\file.txt");
        //根據檔案輸出流生成檔案通道
        FileChannel fileChannel = fileOutputStream.getChannel();
        //建立位元組緩衝區,並將字串轉成位元組存入
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.put(str.getBytes());
        //注意,在存入後需要進行寫出操作時,需將緩衝區翻轉
        byteBuffer.flip();
        //將緩衝區資料寫入通道
        fileChannel.write(byteBuffer);
        //將檔案輸出流關閉(該方法同時會關閉通道)
        fileOutputStream.close();
    }

    //從檔案中讀取資料
    public static void readFile() throws IOException{
        //建立檔案輸入流
        File file = new File("f:\\file.txt");
        FileInputStream fileInputStream = new FileInputStream(file);
        //根據檔案輸入流生成檔案通道
        FileChannel fileChannel = fileInputStream.getChannel();
        //建立位元組緩衝區,大小為檔案大小
        ByteBuffer byteBuffer = ByteBuffer.allocate((int)file.length());
        //將通道資料讀入緩衝區
        fileChannel.read(byteBuffer);
        //同樣,在讀入後需要取出緩衝區內所有資料時,需將緩衝區翻轉
        byteBuffer.flip();
        System.out.println(new String(byteBuffer.array()));
        fileInputStream.close();
    }

    //將檔案資料傳輸到另一個檔案
    public static void readAndWriteFile() throws IOException{
        //建立檔案輸入流和檔案輸出流,並生成對應的通道
        FileInputStream fileInputStream = new FileInputStream("file1.txt");
        FileChannel inputStreamChannel= fileInputStream.getChannel();
        FileOutputStream fileOutputStream = new FileOutputStream("file2.txt");
        FileChannel outputStreamChannel = fileOutputStream.getChannel();
        //建立位元組緩衝區
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        //進行資料讀取
        while (true){
            //在讀取前需清除緩衝區
            byteBuffer.clear();
            //將檔案輸入的通道的資料讀入緩衝區
            int read = inputStreamChannel.read(byteBuffer);
            //當read為-1時,即通道資料已讀取完畢
            if (read == -1){
                break;
            }
            //將緩衝區翻轉後,將緩衝區資料寫入檔案輸出的通道
            byteBuffer.flip();
            outputStreamChannel.write(byteBuffer);
        }
        fileInputStream.close();
        fileOutputStream.close();
    }

    //檔案的複製貼上
    public static void copyAndPaste() throws IOException{
        //複製的檔案輸入流
        FileInputStream fileInputStream = new FileInputStream("f:\\a.jpg");
        FileChannel srcChannel = fileInputStream.getChannel();
        //貼上的檔案輸出流
        FileOutputStream fileOutputStream = new FileOutputStream("f:\\b.jpg");
        FileChannel targetChannel = fileOutputStream.getChannel();
        //使用transferFrom進行復制貼上
        targetChannel.transferFrom(srcChannel, 0, srcChannel.size());
        fileInputStream.close();
        fileOutputStream.close();
    }
}

 

選擇器(Selector)

在NIO程式中,可以用選擇器Selector實現一個選擇器處理多個通道,即一個執行緒處理多個連線。只要把通道註冊到Selector上,就可以通過Selector來監測通道,如果通道有事件發生,便獲取事件通道然後針對每個事件進行相應的處理。這樣,只有在通道(連線)有真正的讀/寫事件發生時,才會進行讀寫操作,大大減少了系統開銷,並且不必為每個連線建立單獨執行緒,就不用去維護過多的執行緒。

選擇器的相關類在java.nio.channels包和其子包下,頂層類是Selector,它是一個抽象類,它的常用方法有:

通道的註冊

在ServerSocketChannel和SocketChannel類裡都有一個註冊方法 register(Selector sel, int ops),sel為要註冊到的選擇器,ops為該通道監聽的操作事件的型別,可以通過該方法將ServerSocketChannel或SocketChannel註冊到目標選擇器中,該方法會返回一個SelectionKey(真正實現類為SelectionKeyImpl)儲存在註冊的Selector的publicKeys集合屬性裡。SelectionKey儲存了通道的事件型別和該註冊的通道物件,可以通過SelectionKey.channel()方法獲取SelectionKey對應的通道。

每個註冊到選擇器的通道都需定義需進行的操作事件型別,通過檢視SelectionKey類的屬性可以知道操作事件的型別有4種:

public static final int OP_READ = 1 << 0; //讀操作
public static final int OP_WRITE = 1 << 2; //寫操作
public static final int OP_CONNECT = 1 << 3; //連線操作
public static final int OP_ACCEPT = 1 << 4; //接收操作

選擇器的檢查

我們可以通過選擇器的檢查方法,如select()來得知發生事件的通道數量,當該數量大於為0時,即至少有一個通道發生了事件,就可以使用selectedKeys()方法來獲取所有發生事件的通道對應的SelectionKey,通過SelectionKey中的方法來判斷對應通道中需處理的事件型別是什麼,在根據事件做出相應的處理。

public final boolean isReadable() { //判斷是否是讀操作
    return (readyOps() & OP_READ) != 0;
}

public final boolean isWritable() { //判斷是否是寫操作
    return (readyOps() & OP_WRITE) != 0;
}

public final boolean isConnectable() { //判斷是否是連線操作
    return (readyOps() & OP_CONNECT) != 0;
}

public final boolean isAcceptable() { //判斷是否是接收操作
    return (readyOps() & OP_ACCEPT) != 0;
}

 

NIO實現簡單的聊天群

//伺服器端
public class GroupChatSever {
    private final static int PORT = 6666;//監聽埠
    private Selector selector;//選擇器
    private ServerSocketChannel serverSocketChannel;

    public GroupChatSever(){
        try{
            selector = Selector.open();//開啟選擇器
            serverSocketChannel = ServerSocketChannel.open();//開啟通道
            serverSocketChannel.configureBlocking(false);//將通道設為非阻塞狀態
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));//通道繫結監聽埠
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//將通道註冊到選擇器上,事件型別為接收
            listen();
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    //對埠進行監聽
    public void listen(){
        try {
            while (true){
                //檢查註冊通道是否有事件發生,檢查時長為2秒
                int count = selector.select(2000);
                if (count > 0){//如果註冊通道有事件發生則進行處理
                    //獲取所有發生事件的通道對應的SelectionKey
                    Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                    while (keyIterator.hasNext()){
                        SelectionKey key = keyIterator.next();
                        if (key.isAcceptable()){//判斷該key對應的通道是否需進行接收操作
                            //雖然accept()方法是阻塞的,但是因為對通道進行過判斷,
                            //可以確定是有客戶端連線的,所以此時呼叫accept並不會阻塞
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            socketChannel.configureBlocking(false);
                            //接收後,將獲取的客戶端通道註冊到選擇器上,事件型別為讀
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            System.out.println(socketChannel.getRemoteAddress() + "上線!");
                        }
                        if (key.isReadable()){//判斷該key對應的通道是否需進行讀操作
                            readFromClient(key);
                        }
                        //注意當處理完一個通道key時,需將它從迭代器中移除
                        keyIterator.remove();
                    }
                }
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    /**
     * 讀取客戶端發來的訊息
     * @param key 需讀取的通道對應的SelectionKey
     */
    public void readFromClient(SelectionKey key){
        SocketChannel socketChannel = null;
        try{
            //通過SelectionKey獲取對應通道
            socketChannel = (SocketChannel)key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int read = socketChannel.read(byteBuffer);
            if (read > 0){
                String message = new String(byteBuffer.array());
                System.out.println("客戶端: " + message);
                sendToOtherClient(message, socketChannel);
            }
        }catch (IOException e){
            //這裡做了簡化,將所有異常都當做是客戶端斷開連線觸發的異常,實際專案中請不要這樣做
            try{
                System.out.println(socketChannel.getRemoteAddress() + "下線");
                key.cancel();//將該SelectionKey撤銷
                socketChannel.close();//再關閉對應通道
            }catch (IOException e2){
                e2.printStackTrace();
            }
        }
    }

    /**
     * 將客戶端傳送的訊息轉發到其他客戶端
     * @param message 轉發的訊息
     * @param from 傳送訊息的客戶端通道
     * @throws IOException
     */
    public void sendToOtherClient(String message, SocketChannel from) throws IOException{
        System.out.println("訊息轉發中......");
        for (SelectionKey key : selector.keys()){//遍歷選擇器中所有SelectionKey
            Channel channel = key.channel();//根據SelectionKey獲取對應通道
            //排除掉髮送訊息的通道,將訊息寫入到其他客戶端通道
            if (channel instanceof SocketChannel && channel != from){
                SocketChannel socketChannel = (SocketChannel)channel;
                ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes());
                socketChannel.write(byteBuffer);
            }
        }
    }

    public static void main(String[] args) {
        GroupChatSever groupChatSever = new GroupChatSever();
    }
}
//客戶端
public class GroupChatClient {
    private final static String SEVER_HOST = "127.0.0.1";//連線的客戶端主機
    private final static int SEVER_PORT = 6666;//連線的客戶端埠
    private Selector selector;//選擇器
    private SocketChannel socketChannel;
    private String username;//儲存客戶端ip地址

    public GroupChatClient(){
        try {
            selector = Selector.open();//開啟選擇器
            socketChannel = SocketChannel.open(new InetSocketAddress(SEVER_HOST, SEVER_PORT));//開啟通道
            socketChannel.configureBlocking(false);//將通道設為非阻塞
            socketChannel.register(selector, SelectionKey.OP_READ);//將通道註冊在選擇器上,事件型別為讀
            username = socketChannel.getLocalAddress().toString().substring(1);//獲取客戶端ip地址
            String message = " 進入聊天群!";
            sendMessage(message);
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    //傳送訊息
    public void sendMessage(String message){
        message = username+": "+message;
        try{
            ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes());
            socketChannel.write(byteBuffer);
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    //讀取從伺服器轉發送過來的訊息
    public void readMessage(){
        try{
            int read = selector.select();
            if (read > 0){
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                while (keyIterator.hasNext()){
                    SelectionKey key = keyIterator.next();
                    if (key.isReadable()){
                        SocketChannel socketChannel = (SocketChannel)key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        socketChannel.read(byteBuffer);
                        System.out.println(new String(byteBuffer.array()));
                    }
                    keyIterator.remove();
                }
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        final GroupChatClient groupChatClient = new GroupChatClient();
        //客戶端開啟一個執行緒來監聽是否有伺服器轉發來訊息
        new Thread(){
            @Override
            public void run() {
                while (true){
                    groupChatClient.readMessage();
                    try {
                        Thread.currentThread().sleep(1000);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            }
        }.start();
        
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String message = scanner.nextLine();
            groupChatClient.sendMessage(message);
        }
    }
}

&n