1. 程式人生 > >Java程式設計思想學習筆記3

Java程式設計思想學習筆記3

本章內容承接“Java程式設計思想 - 18.1 - 位元組流和字元流”。JDK 1.4引入了新的Java IO類庫java.nio.*旨在提高IO速度。筆者愚鈍,認為NIO是Java程式設計思想第18章中最難理解的部分,Eckel在這部分內容的安排上也讓人難以抓到重點,筆者總結主要有以下內容:

  1. Java NIO概述:NIO與IO的區別
  2. 緩衝器Buffer
  3. 通道Channel
  4. 選擇器Selector

特別申明下,本章的大部分內容來源於併發程式設計網的Java NIO系列教程http://ifeve.com/overview/ 。筆者在此基礎上做了整理和總結,並豐富了例項。不足之處,還望賜教!

1. Java NIO概述:NIO與IO的區別

Java NIO由三個核心部分組成:

  • 通道Channel
  • 緩衝器Buffer
  • 選擇器Selector

通道類似於流(Stream),但通道是雙向的(Stream是單向的,輸入流只能輸入,輸出流負責輸出)。它相當於一座資料寶庫,而唯一能與Channel互動的就是緩衝器Buffer,Channel要麼從Buffer讀資料,要麼向Buffer寫資料。
選擇器Selector是負責管理通道的。Java NIO的選擇器允許一個單獨的執行緒來監視多個輸入通道,你可以註冊多個通道使用一個選擇器,然後使用一個單獨的執行緒來“選擇”通道:這些通道里已經有可以處理的輸入,或者選擇已準備寫入的通道。這種選擇機制,使得一個單獨的執行緒很容易來管理多個通道。
問題來了,通過Channel、Buffer、Selector,NIO是如何來提速的呢?
1. NIO是面向緩衝的


Java IO是面向流的,這意味著每次從流中讀入的位元組是固定的,如果你想要位元組多少能夠滑動,你必須先設定一個快取(BufferInputStream),再從快取中讀資料。
Java NiO是面向緩衝的,資料讀取和寫入直接通過一個緩衝區來處理。這就增加了處理過程的靈活性。
2. NIO是非阻塞的
Java IO的流是阻塞的,比如呼叫InputStream.read()方法時是阻塞的,它會一直等到資料到來時(或超時)才會返回。
Java NIO的非阻塞模式,使一個執行緒從某通道傳送請求讀取資料,但是它僅能得到目前可用的資料,如果目前沒有資料可用時,就什麼都不會獲取。非阻塞寫也是如此。一個執行緒請求寫入一些資料到某通道,但不需要等待它完全寫入,這個執行緒同時可以去做別的事情。
3. NIO通過Selector管理多通道

Java NIO的非阻塞模式,需要一個Selector執行緒來靈活的排程各個通道。執行緒通常將非阻塞IO的空閒時間用於在其它通道上執行IO操作,所以一個單獨的執行緒現在可以管理多個輸入和輸出Channel。

2. 緩衝器Buffer

Java NIO 有以下Buffer型別,分別對應不同的Java基礎型別:byte, short, int, long, float, double 和 char。而MappedByteBuffer用於表示記憶體對映檔案,我們將單獨用一小節介紹。

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

Buffer處理資料的一般步驟

Buffer原始碼如下所示:

package java.nio;

public abstract class Buffer {

    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;

    long address;

    Buffer(int mark, int pos, int lim, int cap) {       
        if (cap < 0)
            throw new IllegalArgumentException("Negative capacity: " + cap);
        this.capacity = cap;
        limit(lim);
        position(pos);
        if (mark >= 0) {
            if (mark > pos)
                throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")");
            this.mark = mark;
        }
    }
    //返回緩衝區容量
    public final int capacity() {
        return capacity;
    }
    //返回position值
    public final int position() {
        return position;
    }
    //設定新的position值
    public final Buffer position(int newPosition) {
        if ((newPosition > limit) || (newPosition < 0))
            throw new IllegalArgumentException();
        position = newPosition;
        if (mark > position) mark = -1;
        return this;
    }
    //返回limit值
    public final int limit() {
        return limit;
    }
    //設定新的limit值
    public final Buffer limit(int newLimit) {
        if ((newLimit > capacity) || (newLimit < 0))
            throw new IllegalArgumentException();
        limit = newLimit;
        if (position > limit) position = limit;
        if (mark > limit) mark = -1;
        return this;
    }
    //將mark值設定為position   
    public final Buffer mark() {
        mark = position;
        return this;
    }
    //將position值復位
    public final Buffer reset() {
        int m = mark;
        if (m < 0)
            throw new InvalidMarkException();
        position = m;
        return this;
    }
    //清空緩衝區,將position值清0,limit值設為容量    
    public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }
    //用於準備從緩衝區讀取已寫入的資料
    public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }
    //Buffer.rewind()將position設回0,所以你可以重讀Buffer中的所有資料。
    public final Buffer rewind() {
        position = 0;
        mark = -1;
        return this;
    }

    public final int remaining() {
        return limit - position;
    }

    //若有介於limit和position之間的元素則返回true
    public final boolean hasRemaining() {
        return position < limit;
    }
    ...
}

筆者認為使用Buffer讀寫資料一般遵循以下幾個步驟:
1. allocate():分配容量
2. read(buffer):讀資料
3. flip():讀寫切換
4. write(buffer):寫資料
5. clear():清空緩衝區,讓它可以再次被寫入
舉個例子:

package c18;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;


public class ByteBufferDemo {
    private static final int BSIZE = 1024;
    public static void main(String[] args) throws IOException {
        if (args.length != 2) {
            System.out.println("arguments:sourcefile destfile");
            System.exit(1);
        }
        FileChannel in = new FileInputStream(args[0]).getChannel(),
                out = new FileOutputStream(args[1]).getChannel();
        //靜態的allocate()方法來分配ByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(BSIZE);
        //read告知FileChannel向ByteBuffer儲存位元組
        while (in.read(buffer)!= -1) {
            //filp做好讓別人讀取位元組的準備
            buffer.flip();
            out.write(buffer);
            buffer.clear();         
        }
    }
}

緩衝區細節

Buffer可以高效地訪問及操縱資料主要通過四個索引:mark、position、limit和capacity。而緩衝區是如何來處理資料的讀寫的,這個過程我在網上看過幾篇文章,個人認為併發程式設計網上講的是比較清晰的(http://ifeve.com/buffers/):
Buffer讀模式和寫模式

Buffer寫模式:將資料寫到Buffer中,position表示當前的位置。初始的position值為0.當一個byte、long等資料寫到Buffer後,position會向前移動到下一個可插入資料的Buffer單元。limit表示你最多能往Buffer裡寫多少資料。 寫模式下,limit=capacity。
Buffer讀模式:讀取Buffer中的資料,position會被重置為0,limit會被設定成寫模式下的position值。 limit表示你最多能讀到多少資料。(這個過程通過flip()函式來完成)
mark是輔助用的遊標,用於標記position的特殊位置。capacity表示Buffer的固定大小。

Buffer大小的分配

Buffer大小的分配可以通過allocate方法或者allocateDirect方法來實現,二者有什麼區別呢?我們以ByteBuffer來說:

  1. allocate(int capacity):從堆空間中分配一個容量大小為capacity的byte陣列作為緩衝區的byte資料儲存器
public static ByteBuffer allocate(int capacity) {
        if (capacity < 0)
            throw new IllegalArgumentException();
        return new HeapByteBuffer(capacity, capacity);
}
  1. allocateDirect(int capacity):不通過JVM,而是直接通過作業系統來建立記憶體塊用作緩衝區,它與當前作業系統能夠更好的耦合,因此能進一步提高I/O操作速度。但是分配直接緩衝區的系統開銷很大,因此只有在緩衝區較大並長期存在,或者需要經常重用時,才使用這種緩衝區。
public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
}

向Buffer中寫資料和讀資料

有兩種方式向Buffer中寫資料和讀資料:
1. 通過Channel儲存到Buffer,從Buffer中讀取資料到Channel
2. 通過Buffer的put()方法寫到Buffer裡,使用get()方法從Buffer中讀取資料。

public abstract class ByteBuffer extends Buffer implements Comparable {  
    // This is a partial API listing  
    public abstract byte get( );   
    public abstract byte get (int index);   
    public abstract ByteBuffer put (byte b);   
    public abstract ByteBuffer put (int index, byte b);  
}  

記憶體對映檔案

本章的內容安排是根據Buffer處理資料的一般步驟展開的。而這一節則是單獨的,用於介紹記憶體對映檔案MappedByteBuffer
記憶體對映檔案允許我們建立和修改那些因為太大而不能放入記憶體的檔案。有了記憶體對映檔案,我們就可以假定整個檔案都放在記憶體中,而且可以完全把它當做非常大的陣列來訪問

舉個例子:

/*
 * 記憶體對映檔案的實現,訪問很大的(128M)的資源
 */
package c18;

import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

public class MappedByteBufferDemo {
    static int length = 0x8FFFFFF; // 128M
    public static void main(String[] args) throws Exception{
        /*
         * 指定對映檔案的讀寫許可權、初識位置和對映區域長度
         */
        @SuppressWarnings("resource")
        MappedByteBuffer out = 
                new RandomAccessFile("test.dat", "rw").getChannel()
                .map(FileChannel.MapMode.READ_WRITE, 0, length);
        for (int i = 0; i < length; i++) {
            out.put((byte) 'x');
        }
        System.out.println("Finished writing");
        for (int j = length/2; j < length/2 + 6; j++) {
            System.out.println((char)out.get(j));
        }
    }
}

輸出結果:
Finished writing
x
x
x
x
x
x

記憶體對映檔案訪問可以更加顯著地提高效能,下面的程式進行了簡單的效能比較。

package c18;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.IntBuffer;
import java.nio.channels.FileChannel;

public class TestNIO {
    private static int numOfInts = 4000000;
    private static int numOfbuffInts = 200000;
    /**
     * 模板模式Template
     * 為後面的匿名內部子類定義的test()的各種實現建立了測試框架,每個子類都執行一種測試
     */
    private abstract static class Tester {
        private String name;
        public Tester(String name) {
            this.name = name;
        }
        public void runTest() {
            System.out.println(name + ":");
            try {
                //系統計時器的當前值,以毫微秒為單位。
                long start = System.nanoTime();
                test();
                double duration = System.nanoTime() - start;
                System.out.format("%.2f\n",duration/1.0e9);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }           
        }
        public abstract void test() throws IOException;
    }
    private static Tester[] tests = {
        /*
         * IO Wtrite
         */
        new Tester("Stream Write") {
            public void test() throws IOException {
                DataOutputStream dos = new DataOutputStream(
                        new BufferedOutputStream(
                                new FileOutputStream(new File("temp.tmp"))));
                for(int i = 0; i < numOfInts; i++)
                    dos.writeInt(i);
                dos.close();
            }
        },
        /*
         * NIO Write
         */
        new Tester("Mapped Write") {
            public void test() throws IOException {
                @SuppressWarnings("resource")
                FileChannel fc = new 
                        RandomAccessFile("temp.tmp", "rw").getChannel();
                IntBuffer ib = fc.map(
                        FileChannel.MapMode.READ_WRITE,0,fc.size())
                        .asIntBuffer();
                for(int i = 0; i < numOfInts; i++)
                    ib.put(i);
                fc.close();
            }
        },
        /*
         * IO Read
         */
        new Tester("Stream Read") {
            public void test() throws IOException {
                DataInputStream dis = new DataInputStream(
                        new BufferedInputStream(new FileInputStream("temp.tmp")));
                for(int i = 0; i < numOfInts; i++)
                    dis.readInt();
                dis.close();                        
            }
        },
        /*
         * NIO Read
         */
        new Tester("Map Read") {
            public void test() throws IOException {
                @SuppressWarnings("resource")
                FileChannel fc = new FileInputStream(
                        new File("temp.tmp")).getChannel();
                IntBuffer ib = fc.map(
                        FileChannel.MapMode.READ_ONLY,0,fc.size())
                        .asIntBuffer();
                while(ib.hasRemaining())
                    ib.get();
                fc.close();             
            }
        },
        /*
         * IO Read/Write
         */
        new Tester("Stream Read/Write") {
            public void test() throws IOException {
                RandomAccessFile raf = new RandomAccessFile(
                        new File("temp.tmp"),"rw");
                raf.writeInt(1);
                for(int i = 0; i < numOfbuffInts; i++) {
                    raf.seek(raf.length() - 4);
                    raf.writeInt(raf.readInt());
                }
                raf.close();
            }
        },
        /*
         * NIO Read/Write
         */
        new Tester("Map Read/Write") {
            public void test() throws IOException {
                @SuppressWarnings("resource")
                FileChannel fc = new 
                        RandomAccessFile("temp.tmp", "rw").getChannel();
                IntBuffer ib = fc.map(
                        FileChannel.MapMode.READ_WRITE,0,fc.size())
                        .asIntBuffer();
                for(int i = 1; i < numOfbuffInts; i++)
                    ib.put(ib.get(i - 1));
                fc.close();             
            }
        },
    };
    public static void main(String[] args) {
        for (Tester tester: tests) {
            tester.runTest();
        }
    }

}

輸出結果:
Stream Write:
0.36
Mapped Write:
0.02
Stream Read:
0.34
Map Read:
0.01
Stream Read/Write:
7.59
Map Read/Write:
0.01

到這裡Buffer基本上已經講完了,最後補充一個問題,clear()compact()方法的區別是什麼?

clear():如果呼叫的是clear()方法,position將被設回0,limit被設定成 capacity的值。換句話說,Buffer 被清空了。Buffer中的資料並未清除,只是這些標記告訴我們可以從哪裡開始往Buffer裡寫資料。
compact(): 如果Buffer中仍有未讀的資料,且後續還需要這些資料,但是此時想要先先寫些資料,那麼使用compact()方法。compact()方法將所有未讀的資料拷貝到Buffer起始處。然後將position設到最後一個未讀元素正後面。limit屬性依然像clear()方法一樣,設定成capacity。現在Buffer準備好寫資料了,但是不會覆蓋未讀的資料。

3. 通道Channel

NIO中關鍵的通道Channel實現包括:

  • FileChannel:從檔案中讀寫資料
  • DatagramChannel:通過UDP讀寫網路中的資料
  • SocketChannel:通過TCP讀寫網路中的資料
  • ServerSocketChannel:監聽新進來的TCP連線,像Web伺服器那樣。對每一個新進來的連線都會建立一個SocketChannel

FileChannel

FileChannel是一個連線到檔案的通道。無法設定為非阻塞模式,總是執行在阻塞模式下。
我們無法通過FileChannel直接開啟一個檔案,需要通過InputStream、OutputStream或RandomAccessFile來獲取一個FileChannel例項。

下面給出FileChannel的一個例子:

/*
 * FileChannel的讀寫
 */
package c18;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class FileChannelDemo {
    private static final int BSIZE = 1024;

    public static void main(String[] args) throws IOException {

        String newData = "FileChannelDemo!";
        @SuppressWarnings("resource")
        FileChannel fc = new RandomAccessFile("newData.txt", "rw").getChannel();

        ByteBuffer buffer = ByteBuffer.allocate(BSIZE);
        buffer.put(newData.getBytes());
        buffer.flip();
        while (buffer.hasRemaining()) {
            fc.write(buffer);           
        }
        fc.close();
    }
}

開啟檔案newData.txt
內容為FileChannelDemo!

DatagramChannel

DatagramChannel是一個能收發UDP包的通道。它有兩種使用方式。

receive/send

通過receive方法接受資料,send方法傳送資料。舉個例子:

package c18;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;

public class DatagramChannelDemo1 {
    public static void main(String[] args)  {
        try {
            DatagramChannel channel = DatagramChannel.open();
            //DatagramChannel可以在UDP埠8080上接收資料包
            channel.socket().bind(new InetSocketAddress(8080));

            String newData = "DatagramChannelDemo1!";

            ByteBuffer buffer = ByteBuffer.allocate(48);
            buffer.clear();
            buffer.put(newData.getBytes());
            buffer.flip();
            //DatagramChannel向本機80埠傳送資料
            int bytesSent = channel.send(buffer, new InetSocketAddress("localhost",80));

            System.out.println(bytesSent);
            channel.close();

        } catch (IOException e) {
            e.printStackTrace();
        }       
    }
}

輸出結果:
21

read/write

可以將DatagramChannel“連線”到網路中的特定地址的。由於UDP是無連線的,連線到特定地址並不會像TCP通道那樣建立一個真正的連線。而是鎖住DatagramChannel ,讓其只能從特定地址收發資料。

當連線後,也可以使用read()和write()方法,就像在用傳統的通道一樣。舉個例子:

package c18;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;

public class DatagramChannelDemo2 {
    public static void main(String[] args)  {
        try {
            DatagramChannel channel = DatagramChannel.open();
            channel.connect(new InetSocketAddress("localhost",80));

            String newData = "DatagramChannelDemo1!";

            ByteBuffer buffer = ByteBuffer.allocate(48);
            buffer.clear();
            buffer.put(newData.getBytes());
            buffer.flip();

            channel.write(buffer);  
            channel.close();        
        } catch (IOException e) {
            e.printStackTrace();
        }       
    }
}

SocketChannel

SocketChannel可以設定為非阻塞模式。

SocketChannel阻塞模式

沒什麼好講的,和其他類的實現基本一樣,主要方法包括connect()連線地址,、read()讀資料、write()寫資料。直接上例子:

package c18;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class SocketChannelBlockDemo {

    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost",80));

        String newData = "SocketChannelBlockDemo!";

        ByteBuffer buffer = ByteBuffer.allocate(48);
        buffer.clear();
        buffer.put(newData.getBytes());
        buffer.flip();

        socketChannel.write(buffer);
        socketChannel.close();      
    }
}

SocketChannel非阻塞模式

SocketChannel通過configureBlocking方法來設定非阻塞模式(預設為true,設定為false啟用非阻塞模式)。需要注意的是,如果SocketChannel在非阻塞模式下,為了保證在連線未建立前可以做其他事情,可以呼叫finishConnect()方法

舉個例子:

package c18;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class SocketChannelUnBlockDemo {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("localhost",80));
        while (!socketChannel.finishConnect()) {
            //wait, or do something else...

        }
        String newData = "SocketChannelBlockDemo!";

        ByteBuffer buffer = ByteBuffer.allocate(48);
        buffer.clear();
        buffer.put(newData.getBytes());
        buffer.flip();

        socketChannel.write(buffer);    
        socketChannel.close();
    }
}

ServerSocketChannel

通過 ServerSocketChannel可以監聽新進來的TCP連線。同樣包括了阻塞和非阻塞兩種方式。

ServerSocketChannel阻塞方式

通過 ServerSocketChannel.accept() 方法監聽新進來的連線。當 accept()方法返回的時候,它返回一個包含新進來的連線的 SocketChannel。因此, accept()方法會一直阻塞到有新連線到達。

舉個例子:

package c18;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class ServerSocketChannelBlockDemo {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.socket().bind(new InetSocketAddress("localhost",80));
        /**
         * 當接受到新的tcp連線後,才能將newData內容寫入tcp連線
         */
        while (true) {
            SocketChannel socketChannel = 
                    channel.accept();

            String newData = "SocketChannelBlockDemo!";

            ByteBuffer buffer = ByteBuffer.allocate(48);
            buffer.clear();
            buffer.put(newData.getBytes());
            buffer.flip();

            socketChannel.write(buffer);    
            socketChannel.close();
            channel.close();
        }
    }
}

ServerSocketChannel非阻塞方式

ServerSocketChannel可以設定成非阻塞模式。在非阻塞模式下,accept() 方法會立刻返回,如果還沒有新進來的連線,返回的將是null。 因此,需要檢查返回的SocketChannel是否是null。

舉個例子:

package c18;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class ServerSocketChannelUnBlockDemo {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.socket().bind(new InetSocketAddress("localhost",80));
        //設定為非阻塞模式
        channel.configureBlocking(false);
        /**
         * 當接受到新的tcp連線後,才能將newData內容寫入tcp連線
         */
        while (true) {
            SocketChannel socketChannel = 
                    channel.accept();

            if(socketChannel != null){
                String newData = "SocketChannelBlockDemo!";

                ByteBuffer buffer = ByteBuffer.allocate(48);
                buffer.clear();
                buffer.put(newData.getBytes());
                buffer.flip();

                socketChannel.write(buffer);    
                socketChannel.close();
                channel.close();
            }           
        }
    }
}

非阻塞方式需要與Selector配合使用才能起到更好的效果,下一節我們將介紹排程器Selector。

通道間資料傳輸

對於FileChannel而言,可以通過transferFrom()transferTo()將一個通道與另一個通道相連:

public abstract long transferFrom(ReadableByteChannel src,long 
                                  position, long count) throws IOException;
public abstract long transferTo(long position, long count,
                                    WritableByteChannel target) throws IOException;

舉個例子:
package c18;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

public class TransferDemo {
    @SuppressWarnings("resource")
    public static void main(String[] args) throws IOException {
        if (args.length != 2) {
            System.out.println("arguments:sourcefile destfile");
            System.exit(1);
        }
        FileChannel in = new FileInputStream(args[0]).getChannel(),
                out = new FileOutputStream(args[1]).getChannel();
        in.transferTo(0, in.size(), out);
        //out.transferFrom(in, 0, in.size());
    }
}

4. 排程器Selector

Selector(選擇器)是Java NIO中能夠檢測一到多個NIO通道,並能夠知曉通道是否為諸如讀寫事件做好準備的元件。這樣,一個單獨的執行緒可以管理多個Channel,從而管理多個網路連線。Channel結合Selector才能更好實現非阻塞的傳輸方式。

為什麼使用Selector?

僅用單個執行緒來處理多個Channels的好處是,只需要更少的執行緒來處理通道。事實上,可以只用一個執行緒處理所有的通道。對於作業系統來說,執行緒之間上下文切換的開銷很大,而且每個執行緒都要佔用系統的一些資源(如記憶體)。因此,使用的執行緒越少越好
但是,需要記住,現代的作業系統和CPU在多工方面表現的越來越好,所以多執行緒的開銷隨著時間的推移,變得越來越小了。實際上,如果一個CPU有多個核心,不使用多工可能是在浪費CPU能力。不管怎麼說,關於那種設計的討論應該放在另一篇不同的文章中。在這裡,只要知道使用Selector能夠處理多個通道就足夠了。

Selector的建立和註冊

需要注意的是Selector只能在Channel非阻塞模式下才能使用,這意味著FileChannel是不能使用Selector的,而套接字通道都是可以的(包括TCP和UDP)。

//Selector建立
Selector selector = Selector.open();
//Channel設定非阻塞模式
channel.configureBlocking(false);
//Selector註冊
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);

其中,註冊register方法的第二個引數表示”interest”集合,意思是在通過Selector監聽Channel時感興趣的事件。包括:

  • SelectionKey.OP_CONNECT:連線就緒
  • SelectionKey.OP_ACCEPT:接受就緒,一個server socket channel準備好接收新進入的連線
  • SelectionKey.OP_READ:讀就緒,一個有資料可讀的通道
  • SelectionKey.OP_WRITE:寫就緒,等待寫資料的通道
    如果你對不止一種事件感興趣,那麼可以用“位或”操作符將常量連線起來,如下:
int interestSet = SelectionKey.OP_READ|SelectionKey.OP_WRITE;

SelectionKey物件的處理

在上一小節中,當向Selector註冊Channel時,register()方法會返回一個SelectionKey物件。包括:

  • interest集合:你所選擇的感興趣的事件集合。

  • ready集合:通道是否已經準備就緒的操作的集合。可以通過以下四個方法來判斷:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
  • Channel
Channel  channel  = selectionKey.channel();
  • Selector
Selector selector = selectionKey.selector();

通過Selector選擇通道

一旦向Selector註冊了一或多個通道,就可以呼叫幾個過載的select()方法。這些方法返回你所感興趣的事件(如連線、接受、讀或寫)已經準備就緒的那些通道

//select()阻塞到至少有一個通道在你註冊的事件上就緒了
public abstract int select() throws IOException;
//最長會阻塞timeout毫秒
public abstract int select(long timeout) throws IOException;
//selectNow()非阻塞的選擇操作。如果自從前一次選擇操作後,沒有通道變成可選擇的,則此方法直接返回零。
public abstract int selectNow() throws IOException;

一旦呼叫了select()方法,並且返回值表明有一個或更多個通道就緒了,然後可以通過呼叫selector的selectedKeys()方法,訪問“已選擇鍵集(selected key set)”中的就緒通道

Set selectedKeys = selector.selectedKeys();

因此,選擇一個可用的通道過程如下:

Set selectedKeys = selector.selectedKeys();
    Iterator keyIterator = selectedKeys.iterator();
    while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if(key.isAcceptable()) {
            // a connection was accepted by a ServerSocketChannel.
        } else if (key.isConnectable()) {
            // a connection was established with a remote server.
        } else if (key.isReadable()) {
            // a channel is ready for reading
        } else if (key.isWritable()) {
            // a channel is ready for writing
        }
        keyIterator.remove();
}

這個迴圈遍歷已選擇鍵集中的每個鍵,並檢測各個鍵所對應的通道的就緒事件。
注意每次迭代末尾的keyIterator.remove()呼叫。Selector不會自己從已選擇鍵集中移除SelectionKey例項。必須在處理完通道時自己移除。下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中。

到此,我們可以寫一個完整的Selector例項了:

NIO服務端:

package c18;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOServer {
    // 通道管理器  
    private Selector selector;  

    public void initServer(int port) throws Exception {  
        // 獲得一個ServerSocket通道  
        ServerSocketChannel serverChannel = ServerSocketChannel.open();  
        // 設定通道為 非阻塞  
        serverChannel.configureBlocking(false);  
        // 將該通道對於的serverSocket繫結到port埠  
        serverChannel.socket().bind(new InetSocketAddress(port));  
        // 獲得一通道管理器  
        this.selector = Selector.open();  

        // 將通道管理器和該通道繫結,併為該通道註冊selectionKey.OP_ACCEPT事件  
        // 註冊該事件後,當事件到達的時候,selector.select()會返回,  
        // 如果事件沒有到達selector.select()會一直阻塞  

        serverChannel.register(selector, SelectionKey.OP_ACCEPT);  
    }  

    // 採用輪訓的方式監聽selector上是否有需要處理的事件,如果有,進行處理  
    public void listen() throws Exception {  
        System.out.println("start server");  
        // 輪詢訪問selector  
        while (true) {  
            // 當註冊事件到達時,方法返回,否則該方法會一直阻塞  
            selector.select();  
            // 獲得selector中選中的相的迭代器,選中的相為註冊的事件  
            Iterator ite = this.selector.selectedKeys().iterator();  
            while (ite.hasNext()) {  
                SelectionKey key = (SelectionKey) ite.next();  
                // 刪除已選的key 以防重負處理  
                ite.remove();  
                // 客戶端請求連線事件  
                if (key.isAcceptable()) {  
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();  
                    // 獲得和客戶端連線的通道  
                    SocketChannel channel = server.accept();  
                    // 設定成非阻塞  
                    channel.configureBlocking(false);  
                    // 在這裡可以傳送訊息給客戶端  
                    channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));  
                    // 在客戶端 連線成功之後,為了可以接收到客戶端的資訊,需要給通道設定讀的許可權  
                    channel.register(this.selector, SelectionKey.OP_READ);  
                    // 獲得了可讀的事件  

                } else if (key.isReadable()) {  
                    read(key);  
                }  

            }  
        }  
    }  

    // 處理 讀取客戶端發來的資訊事件  
    private void read(SelectionKey key) throws Exception {  
        // 伺服器可讀訊息,得到事件發生的socket通道  
        SocketChannel channel = (SocketChannel) key.channel();  
        // 穿件讀取的緩衝區  
        ByteBuffer buffer = ByteBuffer.allocate(10);  
        channel.read(buffer);  
        byte[] data = buffer.array();  
        String msg = new String(data).trim();  
        System.out.println("server receive from client: " + msg);  
        ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());  
        channel.write(outBuffer);  
    }  

    public static void main(String[] args) throws Throwable {  
        NIOServer server = new NIOServer();  
        server.initServer(8989);  
        server.listen();  
    }  

}

NIO客戶端:

package c18;

import java.io.IOException;  
import java.net.InetSocketAddress;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.SocketChannel;  
import java.util.Iterator;  

public class NIOClient {  

    // 通道管理器  
    private Selector selector;  

    /** 
     * * // 獲得一個Socket通道,並對該通道做一些初始化的工作 * @param ip 連線的伺服器的ip // * @param port 
     * 連線的伺服器的埠號 * @throws IOException