Java NIO之通道Channel分析
目錄
簡介
通道Channel是NIO裡面的一個創新點,用於緩衝區和檔案或者套接字之間的資料傳輸。通道Channel的繼承體系相對比較複雜,主要在java.nio.channels,部分的channels類還會依賴java.nio.channels.spi子包。頂層的channe介面定義判斷通道是否開啟和關閉通道的方法。
package java.nio.channels;
public interface Channel extends Closeable {
public boolean isOpen();
public void close() throws IOException;
}
我們知道傳統的IO流都是單向的,比如FileInputStream只能讀取資料和FileOutputStream只能寫入資料。而通道Channel可以是單向的,也可以是雙向的,如果一個channel類實現了定義read()方法的ReadableByteChannel介面或者實現了定義write()方法的WritableByteChannel介面都只是單向的,但同時實現了這兩個介面就是雙向通道(既可以讀,又可以寫),ByteChannel就是這樣一個介面,繼承了ReadableByteChannel介面和WritableByteChannel介面,所以ByteChannel子類通道都是雙向的。
通道可以分為兩種型別,一種是檔案(file)通道和套接字(socket)通道。都屬於ByteChannel子類,所以是雙向的通道。
a.檔案(file)通道
- FileChannel
b.套接字(socket)通道
- SocketChannel
- ServerSocketChannel
- DatagramChannel
檔案通道FileChannel
1.FileChannel介紹
檔案通道FileChannel裡面定義常用的read,write,scatter/gatter操作,同時提供了操作檔案的方法。檔案通道FileChannel不能直接建立,只能通過開啟的RandomAccessFile,FileInputStream,FileOutputStream物件上呼叫getChannel()方法獲取。呼叫getChannel()所獲取的FileChannel()物件與流物件連線的是同一個檔案,並且具有相同訪問許可權。
所以儘管FileChannel定義上是雙向通道,但一個FileInputStream物件上獲取的FileChannel物件只能讀取資料,不能寫入資料。FileChannel總是在阻塞模式下,不能設定為非阻塞模式。
//將位元組序列讀取到給定的緩衝區
public abstract int read(ByteBuffer dst)
//將位元組序列讀取到給定的緩衝區
public final long read(ByteBuffer[] dsts)
//將位元組序列讀取到給定的緩衝區指定位置
public abstract long read(ByteBuffer[] dsts, int offset, int length)
//從給定的檔案位置position開始,從通道中讀取資料到緩衝區中
public abstract int read(ByteBuffer dst, long position)
//將給定的緩衝區中的位元組序列寫到通道中
public abstract int write(ByteBuffer src)
//將給定的多個緩衝區中位元組序列寫到通道中
public final long write(ByteBuffer[] srcs)
//將給定的緩衝區中位元組序列寫到通道中,檔案位置從position開始
public abstract int write(ByteBuffer src, long position)
//將給定的多個緩衝區指定位置位元組序列寫到通道中
public abstract long write(ByteBuffer[] srcs, int offset, int length)
//返回此通道中檔案位置
public abstract long position()
//設定此通道中檔案位置
public abstract FileChannel position(long newPosition)
//將通道中的檔案擷取為指定的大小
public abstract FileChannel truncate(long size)
//強制將通道中的檔案寫到包含該檔案的儲存裝置中
public abstract void force(boolean metaData)
訪問檔案的方法中可以看到不管read()還會write()方法都有position的方法,position在此處並不是緩衝區Buffer裡面的索引位置,而是檔案中讀取或者寫入的位置。可以將檔案看做的是一個龐大的位元組陣列,利用position就可以實現將資料寫到指定的檔案位置或者讀取檔案中指定位置資料,操作類似於緩衝區。此外還提供了truncate()方法來丟棄檔案中超過引數size以外的資料,force()方法會將通道中待修改的資料強制應用到磁碟上,做到對檔案待定修改及時同步到磁碟,force(Boolean metaData)方法中布林型別引數表示的是是否將元資料(檔案所有者,訪問許可權,最後修改時間等資訊)也進行同步。
2.檔案鎖
JDK1.4版本開始提供檔案鎖功能,檔案鎖有獨佔鎖和共享鎖兩種。
//獲取通道檔案中指定位置的獨佔鎖
public abstract FileLock lock(long position, long size, boolean shared)
//獲取通道檔案的獨佔鎖
public final FileLock lock()
//嘗試獲取通道檔案中指定位置的獨佔鎖
public abstract FileLock tryLock(long position, long size, boolean shared)
//嘗試獲取通道檔案的獨佔鎖
public final FileLock tryLock()
可以看到檔案鎖有兩種型別,lock()和tryLock()兩種型別。
lock()除非檔案指定位置鎖定,通道關閉或者執行緒中斷其中一種情況,否則方法將會一直阻塞。帶有引數的lock()方法指定檔案內部鎖定的開始位置position以及鎖定區域的size,shared引數表示想獲取鎖是否是共享的。
tryLock()方法,不管能否成功獲取鎖,將會立即返回,方法不會阻塞。獲取一個其他程式已經佔用的鎖,將返回null,其他原因導致的獲取鎖失敗,將會丟擲異常。
有些作業系統不支援共享鎖,所以請求共享鎖將會自動轉化成獨佔鎖,一個請求過來獲取的鎖是共享還是獨佔的可通過呼叫FileLock中isShared()方法檢視。鎖的範圍不一定是限制在size範圍內,可以鎖定未包含任何的內容的區域,這樣下次寫入資料時,對應區域就可以受到檔案鎖保護,如果只是鎖定檔案中某塊內容,那麼下次寫到檔案中的資料將會超出鎖定區域,不會受到檔案鎖的保護。
需要注意的是檔案鎖物件FileLock與FileChannel例項關聯,但鎖的物件是檔案,而不是通道和執行緒,所以使用完鎖後要及時釋放,不然會導致衝突或者死鎖。
3.記憶體對映檔案
呼叫FileChannel中的map()方法可以得到MappedByteBuffer物件(ByteBuffer的子類),一般讀取檔案簡易過程是先將資料讀取到核心記憶體緩衝區中臨時存放,緩衝區滿後,核心會將資料複製到使用者空間的緩衝區中。記憶體對映大致就是將使用者空間的緩衝區和核心空間緩衝區對映到同一個地方,但是實際儲存資料的地方是磁碟上,這樣省略了中間複製到核心緩衝區的過程。對於頻繁訪問檔案或者更改比較大的檔案時,使用記憶體對映檔案將會極大提高效率。
public abstract MappedByteBuffer map (MapMode mode, long position,long size)
map()方法需要傳三個引數,mode有三種模式:MapMode.READ_ONLY(只讀模式),MapMode.READ_WRITE(讀寫模式), MapMode.PRIVATE(寫時拷貝,呼叫put()修改的資料最終不會寫到檔案中,只能通過MappedByteBuffer實時呼叫get()檢視)。
Socket通道
socket通道是以非阻塞模式執行,依靠他們共同的超父類是java.nio.channelss.spi中AbstractSelectableChannel,由此得到一種通道的選擇機制,socket通道和選擇器Selector聯合使用,Selector可以看做用來管理通道的類,多個通道可以註冊到選擇器上,通過輪詢來檢視通道是否就緒狀態(是否準備好讀寫操作)。設定通道的阻塞模式只要呼叫configureBlocking()方法即可。此外每個socket通道都關聯一個java.net.socket物件,但通過傳統方式建立Socket物件,不會關聯到通道。
1.SocketChannel
SocketChannel通道針對的是點對點的連線,類似於TCP/IP,面向流的連線。只有連線成功情況下,才可以接收到資料或者傳送資料給連線的地址。建立SocketChannel方式有兩種:
- 呼叫SocketChannel中open()方法開啟通道並連線到指定地址。
- ServerSocketChannel用來監聽連線,連線過來時會建立SocketChannel。
開啟SocketChannel通道
SocketChannel socketChannel = SocketChannel.open( );
socketChannel.connect (new InetSocketAddress ("address", port));
關閉SocketChannel通道
socketChannel.close()
讀取資料
SocketChannel類中提供了多個read()方法:read(ByteBuffer dst),read(ByteBuffer[] dsts, int offset, int length),
read(ByteBuffer[] dsts)。可以看到引數都與緩衝區ByteBuffer關聯,從socketChannel中讀取的資料都會先放到緩衝區Buffer中。
ByteBuffer buffer = ByteBuffer.allocate(100);
socketChannel.read(buffer);
寫入資料
SocketChnnel類提供了多個write()方法:write(ByteBuffer src), write(ByteBuffer[] srcs, int offset, int length), write(ByteBuffer[] srcs)。寫入SocketChannel通道的資料,同樣是先放到緩衝區中,然後再寫入通道中。
ByteBuffer buffer = ByteBuffer.allocate(50);
buffer.put("hello".getBytes());
buffer.flip();
socketChannel.write(buffer);
2.ServerSocketChannel
ServerSocketChannel沒有提供傳送和接收資料的方法,只是用於監聽TCP連線過來的通道。ServerSocketChannel類提供bind()方法或者呼叫socket()方法獲取對應的ServerSocket物件來繫結指定地址/埠。
開啟通道
呼叫靜態方法open()建立一個未繫結的通道serverSocketChannel物件,呼叫bind()或者socket()物件獲取ServerSocketChannel()物件繫結地址/埠。
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
//或者獲取對應ServerSocket物件繫結
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
關閉通道
serverSocketChannel.close()
監聽連線
呼叫accept()會返回一個包含了新連線的SocketChannel物件,可以在非阻塞模式下執行。如果以非阻塞模式下執行,呼叫accept()方法時還沒有連線進來,會立即返回null。
while(true){
SocketChannel socketChannel=serverSocketChannel.accept();
....
}
3.DatagramChannel
DatagramChannel模擬的是包導向的連線(UDP/IP),也就是不需要連線,就可以傳送資料給不同目的地,也可以接受任意地址過來的資料(資料裡面包含地址資訊)。DatagramChannel可以通過呼叫靜態方法open()開啟通道,呼叫socket()方法可以得到對應的DatagramSocket物件。DatagramChannel既可以充當伺服器(監聽端)又可以充當客戶端(接收端),用於監聽的時候,需要將通道繫結到一個地址或者埠上面。呼叫send()方法並不能保證資料能夠傳送到另外一端,因為資料在傳送過程會被拆分成多個數據碎片,到目的端再組合起來,如果其中一個碎片丟失了,將會導致整個資料報丟失。
DatagramChannel可以進行任意次數的斷開和連線,應用場景就是客戶端/服務端的方式,connect()方法中如果傳入指定地址,就可以只接收指定地址過來的資料忽略其他來源的資料。
開啟DatagramChannel通道
呼叫靜態open()建立一個通道,監聽時需要呼叫bind()方法繫結地址/埠。
DatagramChannel channel = DatagramChannel.open();
DatagramSocket socket = channel.socket();
//DatagramChannel用於監聽時,需要繫結埠
socket.bind(new InetSocketAddress(port));
接收資料
阻塞模式下,receive()方法會一直等待,直到接收到資料。非阻塞模式下,如果沒有接收到資料時,會返回null。如果接收到的資料超出了緩衝區的容量,會丟棄超出的部分。
ByteBuffer buffer = ByteBuffer.allocate(100);
channel.receive(buffer);
傳送資料
呼叫send()方法會將緩衝區Buffer中的資料傳送到DatagramChannel繫結的地址和埠上,如果通道處於阻塞模式下,呼叫執行緒可能會休眠知直到資料報加入到傳送的佇列中。如果處於非阻塞模式下,將返回緩衝區中位元組數或者是“0”。
ByteBuffer buffer = ByteBuffer.allocate(48);
buffer.put("hello".getBytes());
buf.flip();
int bytesSent = channel.send(buf, new InetSocketAddress("address", port));
特定連線
DatagramChannel中提供了傳入地址的connect()方法,但是功能並不是和指定的地址建立連線,而是鎖住了DatagramChannel,只接收指定地址過來的資料,而不接收其他地址過來的資料。
channel.connect(new InetSocketAddress("address", port));
其他內容
1.管道
Pipe類在java.nio.channels.spi包下面,呼叫靜態方法open()建立通道Pipe物件。在Pipe物件上呼叫sink()方法可以得到Pipe.sinkChannel()(管道寫入端),呼叫source()方法可以得到Pipe.sourceChannel(管道讀取端)。功能類似於java.io.PipedOutputStream和java.io.PipedInputStream。
//建立管道物件
Pipe pipe = Pipe.open();
//寫入資料管道
Pipe.SinkChannel sinkChannel = pipe.sink();
//讀取資料管道
Pipe.SourceChannel sourceChannel = pipe.source();
2.通道工具類
在java.nio.channels包下面提供了通道的工具類Channels.如下方法都是靜態的方法,直接通過Channels.xxxx()直接呼叫。
方法 |
返回 |
描述 |
---|---|---|
newChannel(final InputStream in) |
ReadableByteChannel |
構造一個將從給定的輸入流讀取資料的通道。 |
newChannel(final OutputStream out) |
WritableByteChannel |
構造一個將向給定的輸出流寫入資料的通道。 |
newReader(ReadableByteChannel ch, CharsetDecoder dec,int minBufferCap) |
Reader |
從給定的通道讀取位元組並依據提供的CharsetDecoder 對讀取到的位元組進行解碼 |
newReader(ReadableByteChannel ch, String csName) |
Reader |
從給定的通道讀取位元組並依據提供的字符集名稱將讀取到的位元組解碼成字元。 |
newWriter(final WritableByteChannel ch, final CharsetEncoder enc, final int minBufferCap) |
Writer |
使用給定的 CharsetEncoder 物件對字元編碼後寫到給定的通道中。 |
newWriter(WritableByteChannel ch, String csName) |
Writer |
使用給定的字符集名稱對字元編碼後寫到給定的通道中。 |
newInputStream(ReadableByteChannel ch) |
InputStream |
構造一個從給定的通道讀取位元組的流。 |
newOutputStream(final WritableByteChannel ch) |
OutputStream |
構造一個將向給定的通道寫入位元組的流。 |
3.Scatter/Gather
Scatter就是將從Channel讀取的資料按順序放到(分散)多個緩衝區Buffer中,會順序填滿每個緩衝區直到通道中資料讀取完。
Gather就是將多個緩衝區中順序抽取(聚集)到同一個Channel通道中。
使用場景一般是需要將資料分開進行傳送,比如訊息頭和訊息頭分開發送,方便資料的處理。如下:從通道接收到位元組為38個,那麼會先header緩衝區填滿,剩餘28個位元組會被放到body緩衝區中。聚集過程是將資料順序抽取,先將header中的10個位元組傳送到通道中,然後是body中位元組傳送到通道中。
//scatter分散資料
ByteBuffer header = ByteBuffer.allocateDirect (10);
ByteBuffer body = ByteBuffer.allocateDirect (100);
ByteBuffer [] buffers = { header, body };
int bytesRead = channel.read (buffers);
//gather聚集資料
ByteBuffer header = ByteBuffer.allocate(10);
ByteBuffer body = ByteBuffer.allocate(100);
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);
案例
測試案例所有流關閉正常應該通過try{}catch{}finally在finally中關閉,而不是直接丟擲異常的方式。
1.FileChannel相關
public class FileChannelDemo {
public static void main(String[] args) throws IOException {
testFileChannel();
long start = System.currentTimeMillis();
copyFileByFileChannel();
long end = System.currentTimeMillis();
copyFileByFileChannel2();
long end1 = System.currentTimeMillis();
copyFileByMappedByteBuffer();
long end2 = System.currentTimeMillis();
System.out.println("Copy by FileChannel+Buffer----"+(end-start));
System.out.println("Copy by FileChannel+transferTo()----"+(end1-end));
System.out.println("Copy by MapByteBuffer---"+(end2-end1));
}
/**通過MappedByteBuffer實現檔案複製
*/
private static void copyFileByMappedByteBuffer() throws FileNotFoundException, IOException {
FileInputStream fis = new FileInputStream(new File("D:\\java.txt"));
FileChannel fc = fis.getChannel();
MappedByteBuffer mapBuffer =fis.getChannel().map(MapMode.READ_ONLY,0,fc.size());
FileOutputStream fos = new FileOutputStream(new File("D:\\map.txt"));
FileChannel channel = fos.getChannel();
channel.write(mapBuffer);
fis.close();
fos.close();
}
/**
* 通過FileChannel讀取檔案
*/
private static void testFileChannel() throws IOException {
FileInputStream fis = new FileInputStream(new File("D:\\java.txt"));
FileChannel channel = fis.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int i ;
while((i=channel.read(buffer))!=-1) {
buffer.flip();
System.out.println(Charset.forName(System.getProperty("file.encoding")).decode(buffer));
buffer.clear();
}
channel.close();
fis.close();
}
/**使用fileChannel結合Buffer實現檔案的複製
*/
private static void copyFileByFileChannel() throws IOException {
FileInputStream fis = new FileInputStream(new File("D:\\java.txt"));
FileOutputStream fos = new FileOutputStream(new File("D:\\copy.txt"));
FileChannel in = fis.getChannel();
FileChannel out = fos.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while(in.read(buffer)!=-1) {
buffer.flip();
out.write(buffer);
buffer.clear();
}
fos.close();
fis.close();
}
/**
* 呼叫FileChannel中transferTo()可以實現兩個通道的連線。
*/
private static void copyFileByFileChannel2() throws IOException {
FileInputStream fis = new FileInputStream(new File("D:\\java.txt"));
FileOutputStream fos = new FileOutputStream(new File("D:\\copy.txt"));
FileChannel in = fis.getChannel();
FileChannel out = fos.getChannel();
in.transferTo(0, in.size(), out);
fos.close();
fis.close();
}
}
執行結果:
Copy by FileChannel+Buffer----11
Copy by FileChannel+transferTo()----3
Copy by MapByteBuffer---0
2.Scatter/Gatter案例
public class MappedByteBufferDemo {
private static final String OUTPUT_FILE = "D:\\info.txt";
private static final String INPUT_FILE = "D:\\java.txt";
private static final String LINE_SEP = "\r\n";
private static final String SERVER_ID = "Server: Ronsoft Dummy Server";
private static final String HTTP_LINE = "HTTP/1.0 200 OK" + LINE_SEP + SERVER_ID + LINE_SEP;
public static void main(String[] argv) throws Exception {
ByteBuffer requestHeader = ByteBuffer.allocate(128);
ByteBuffer requestLine = ByteBuffer.wrap(bytes(HTTP_LINE));
//請求報文包含line,header,body。將三部分緩衝區內容寫到檔案中
ByteBuffer[] gather = {requestLine, requestHeader, null};
String contentType = "unknown/unknown";
long contentLength = -1;
try {
FileInputStream fis = new FileInputStream(INPUT_FILE);
FileChannel fc = fis.getChannel();
MappedByteBuffer requestBody = fc.map(MapMode.READ_ONLY, 0, fc.size());// MappedByteBuffer只讀模式
gather[2] = requestBody;
contentLength = fc.size();
contentType = URLConnection.guessContentTypeFromName(INPUT_FILE);
} catch (IOException e) {
}
StringBuffer sb = new StringBuffer();
sb.append("Content-Length: " + contentLength);
sb.append(LINE_SEP);
sb.append("Content-Type: ").append(contentType);
sb.append(LINE_SEP).append(LINE_SEP);
requestHeader.put(bytes(sb.toString()));
requestHeader.flip();
FileOutputStream fos = new FileOutputStream(OUTPUT_FILE);
FileChannel out = fos.getChannel();
while (out.write(gather) > 0) {
}
out.close();
}
private static byte[] bytes(String string) throws Exception {
return string.getBytes("UTF-8");
}
}
3.Channels工具類的使用
public static void main(String[] args) throws IOException {
ReadableByteChannel source = Channels.newChannel(System.in);
WritableByteChannel dest = Channels.newChannel(System.out);
channelCopy(source,dest);
source.close();
dest.close();
}
private static void channelCopy(ReadableByteChannel source,WritableByteChannel dest) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
while(source.read(buffer)!=-1) {
buffer.flip();
dest.write(buffer);
buffer.clear();
}
}
}
總結
1.通道分為檔案通道和套接字通道,都是雙向通道,即可以寫又可以讀取。需要注意的是檔案通道FileChannel沒有直接建立的方法,只能在RandomAcessFile,FileInputStream,FileInputStream物件上呼叫getChannel()獲取,所以會受到對應流物件的限制,比如FileInputStream物件上獲取的FileChannel也只能讀取資料,呼叫write()將會丟擲異常。
2.檔案通道里面提供了檔案鎖以及記憶體對映等功能,實現了檔案安全以及高效率訪問。
2.SocketChannel是面向連線的,類似於TCP/IP,只能傳送和接收指定地址的資料。而DatagramChannel面向無連線,類似於UDP/IP,可以接收和傳送到任意地址的資料。ServerSocketChannel本身沒有提供讀取方法,只是用於監聽連線。