1. 程式人生 > >【從入門到放棄-Java】併發程式設計-NIO-Channel

【從入門到放棄-Java】併發程式設計-NIO-Channel

簡介

channel即通道,可以用來讀、寫資料,它是全雙工的可以同時用來讀寫操作。這也是它與stream流的最大區別。

channel需要與buffer配合使用,channel通道的一端是buffer,一端是資料來源實體,如檔案、socket等。在nio中,通過channel的不同實現來處理 不同實體與資料buffer中的資料傳輸。

channel介面:

package java.nio.channels;

import java.io.IOException;
import java.io.Closeable;


/**
 * A nexus for I/O operations.
 *
 * <p> A channel represents an open connection to an entity such as a hardware
 * device, a file, a network socket, or a program component that is capable of
 * performing one or more distinct I/O operations, for example reading or
 * writing.
 *
 * <p> A channel is either open or closed.  A channel is open upon creation,
 * and once closed it remains closed.  Once a channel is closed, any attempt to
 * invoke an I/O operation upon it will cause a {@link ClosedChannelException}
 * to be thrown.  Whether or not a channel is open may be tested by invoking
 * its {@link #isOpen isOpen} method.
 *
 * <p> Channels are, in general, intended to be safe for multithreaded access
 * as described in the specifications of the interfaces and classes that extend
 * and implement this interface.
 *
 *
 * @author Mark Reinhold
 * @author JSR-51 Expert Group
 * @since 1.4
 */

public interface Channel extends Closeable {

    /**
     * Tells whether or not this channel is open.
     *
     * @return <tt>true</tt> if, and only if, this channel is open
     */
    public boolean isOpen();

    /**
     * Closes this channel.
     *
     * <p> After a channel is closed, any further attempt to invoke I/O
     * operations upon it will cause a {@link ClosedChannelException} to be
     * thrown.
     *
     * <p> If this channel is already closed then invoking this method has no
     * effect.
     *
     * <p> This method may be invoked at any time.  If some other thread has
     * already invoked it, however, then another invocation will block until
     * the first invocation is complete, after which it will return without
     * effect. </p>
     *
     * @throws  IOException  If an I/O error occurs
     */
    public void close() throws IOException;

}

常見的channel實現有:

  • FileChannel:檔案讀寫資料通道
  • SocketChannel:TCP讀寫網路資料通道
  • ServerSocketChannel:服務端網路資料讀寫通道,可以監聽TCP連線。對每一個新進來的連線都會建立一個SocketChannel。
  • DatagramChannel:UDP讀寫網路資料通道

FileChannel

FileChannel是一個抽象類,它繼承了AbstractInterruptibleChannel類,並實現了 SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel介面。

具體的實現類主要是sun.nio.ch.FileChannelImpl。下面詳細分析下FileChannelImpl中每個方法的具體實現。

open

private FileChannelImpl(FileDescriptor var1, String var2, boolean var3, boolean var4, boolean var5, Object var6) {
    //主要記載作業系統維護的檔案描述符
    this.fd = var1;
    
    //是否可讀
    this.readable = var3;
    
    //是否可寫
    this.writable = var4;
    
    //是否以追加的方式開啟
    this.append = var5;
    
    this.parent = var6;
    this.path = var2;
    
    //底層使用native的read和write來處理檔案的
    this.nd = new FileDispatcherImpl(var5);
}

//FileInputStream::getChannel 呼叫 FileChannelImpl.open(fd, path, true, false, this) 獲取只讀channel
public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, Object var4) {
    return new FileChannelImpl(var0, var1, var2, var3, false, var4);
}

//FileOutputStream::getChannel 呼叫 FileChannelImpl.open(fd, path, false, true, append, this) 獲取只寫channel
public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, boolean var4, Object var5) {
    return new FileChannelImpl(var0, var1, var2, var3, var4, var5);
}
private FileChannelImpl(FileDescriptor fd, String path, boolean readable,
                        boolean writable, boolean direct, Object parent)
{
    this.fd = fd;
    
    //是否可讀
    this.readable = readable;
    
    //是否可寫
    this.writable = writable;
    
    //對於從流建立的channel,在結束時要做不同的清理動作,(openJDK中才有,sun的jdk中沒有)
    this.parent = parent;
    
    //原始檔的path
    this.path = path;
    
    //是否使用DirectIO
    this.direct = direct;
    
    this.nd = new FileDispatcherImpl();
    if (direct) {
        assert path != null;
        this.alignment = nd.setDirectIO(fd, path);
    } else {
        this.alignment = -1;
    }

    //當parent不存在時,則註冊一個cleaner,否則交由parent做清理動作。
    // Register a cleaning action if and only if there is no parent
    // as the parent will take care of closing the file descriptor.
    // FileChannel is used by the LambdaMetaFactory so a lambda cannot
    // be used here hence we use a nested class instead.
    this.closer = parent != null ? null :
        CleanerFactory.cleaner().register(this, new Closer(fd));
}

// Used by FileInputStream.getChannel(), FileOutputStream.getChannel
// and RandomAccessFile.getChannel()
public static FileChannel open(FileDescriptor fd, String path,
                               boolean readable, boolean writable,
                               boolean direct, Object parent)
{
    return new FileChannelImpl(fd, path, readable, writable, direct, parent);
}
  • open方法主要是返回一個新new的FileChannelImpl物件,初始化時設定fileDescriptor、readable、writable、append、parent、path等屬性,看變數名很容易理解,在此不贅述變數含義。

read

//實現自SeekableByteChannel介面的方法,將檔案中的內容讀取到給定的byteBuffer中
public int read(ByteBuffer dst) throws IOException {
    //保證讀寫時,channel處於開啟狀態
    ensureOpen();
    
    //判斷是否可讀
    if (!readable)
        throw new NonReadableChannelException();
    synchronized (positionLock) {
        if (direct)
            Util.checkChannelPositionAligned(position(), alignment);
        int n = 0;
        int ti = -1;
        try {
        
            //開始阻塞,並註冊為Interruptible,可以被中斷
            beginBlocking();
            
            //將當前執行緒新增到NativeThreadSet中,並返回索引,方便後續操作。
            //NativeThreadSet是一個執行緒安全的本地執行緒集合,方便管理,用來發送訊號
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                //當未被系統中斷(即讀取完畢)或channel未被關閉,則一直讀,將內容寫入到byteBuffer(dst)中
                n = IOUtil.read(fd, dst, -1, direct, alignment, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            //把當前執行緒從set中移出
            threads.remove(ti);
            
            //結束,釋放鎖
            endBlocking(n > 0);
            assert IOStatus.check(n);
        }
    }
}

//實現自ScatteringByteChannel介面的方法,將檔案中的內容依次讀取到給定的byteBuffer陣列中。
public long read(ByteBuffer[] dsts, int offset, int length)
    throws IOException
{
    if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
        throw new IndexOutOfBoundsException();
    //保證讀寫時,channel處於開啟狀態
    ensureOpen();
    
    //判斷是否可讀
    if (!readable)
        throw new NonReadableChannelException();
    synchronized (positionLock) {
        if (direct)
            Util.checkChannelPositionAligned(position(), alignment);
        long n = 0;
        int ti = -1;
        try {
            //開始阻塞,並註冊為Interruptible,可以被中斷
            beginBlocking();
            
            //將當前執行緒新增到NativeThreadSet中,並返回索引,方便後續操作。
            //NativeThreadSet是一個執行緒安全的本地執行緒集合,方便管理,用來發送訊號
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                //當未被系統中斷(即讀取完畢)或channel未被關閉,則一直讀,將內容寫入到byteBuffer(dst)中
                n = IOUtil.read(fd, dsts, offset, length,
                        direct, alignment, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            //把當前執行緒從set中移出
            threads.remove(ti);
            
            //結束,釋放鎖
            endBlocking(n > 0);
            assert IOStatus.check(n);
        }
    }
}

write

//實現自SeekableByteChannel介面的方法,將byteBuffer中的內容寫入到檔案中
public int write(ByteBuffer src) throws IOException {
    //保證寫時,channel處於開啟狀態
    ensureOpen();
    
    //判斷是否可寫
    if (!writable)
        throw new NonWritableChannelException();
    synchronized (positionLock) {
        if (direct)
            Util.checkChannelPositionAligned(position(), alignment);
        int n = 0;
        int ti = -1;
        try {
            //開始阻塞,並註冊為Interruptible,可以被中斷
            beginBlocking();
            
            //將當前執行緒新增到NativeThreadSet中,並返回索引,方便後續操作。
            //NativeThreadSet是一個執行緒安全的本地執行緒集合,方便管理,用來發送訊號
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                //當未被系統中斷(即寫入完畢)或channel未被關閉,則一直寫,將內容寫入到檔案中
                n = IOUtil.write(fd, src, -1, direct, alignment, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            //把當前執行緒從set中移出
            threads.remove(ti);
            
            //結束,釋放鎖
            assert IOStatus.check(n);
        }
    }
}

//實現自GatheringByteChannel介面的方法,將byteBuffer陣列中的內容依次寫入到檔案中
public long write(ByteBuffer[] srcs, int offset, int length)
    throws IOException
{
    if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
        throw new IndexOutOfBoundsException();
        //保證寫時,channel處於開啟狀態
        ensureOpen();
        
        //判斷是否可寫
    if (!writable)
        throw new NonWritableChannelException();
    synchronized (positionLock) {
        if (direct)
            Util.checkChannelPositionAligned(position(), alignment);
        long n = 0;
        int ti = -1;
        try {
            //開始阻塞,並註冊為Interruptible,可以被中斷
            beginBlocking();
            
            //將當前執行緒新增到NativeThreadSet中,並返回索引,方便後續操作。
            //NativeThreadSet是一個執行緒安全的本地執行緒集合,方便管理,用來發送訊號
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                //當未被系統中斷(即寫入完畢)或channel未被關閉,則一直寫,將內容寫入到檔案中
                n = IOUtil.write(fd, srcs, offset, length,
                        direct, alignment, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            //把當前執行緒從set中移出
            threads.remove(ti);
            
            //結束,釋放鎖
            assert IOStatus.check(n);
        }
    }
}

position

//實現自SeekableByteChannel介面的方法,獲取當前channel的position
public long position() throws IOException {
    ensureOpen();
    synchronized (positionLock) {
        long p = -1;
        int ti = -1;
        try {
            beginBlocking();
            ti = threads.add();
            if (!isOpen())
                return 0;
            boolean append = fdAccess.getAppend(fd);
            do {
                //append模式下,position在channel的末尾
                // in append-mode then position is advanced to end before writing
                p = (append) ? nd.size(fd) : nd.seek(fd, -1);
            } while ((p == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(p);
        } finally {
            threads.remove(ti);
            endBlocking(p > -1);
            assert IOStatus.check(p);
        }
    }
}

//實現自SeekableByteChannel介面的方法,設定當前channel的position為newPosition
public FileChannel position(long newPosition) throws IOException {
    ensureOpen();
    if (newPosition < 0)
        throw new IllegalArgumentException();
    synchronized (positionLock) {
        long p = -1;
        int ti = -1;
        try {
            beginBlocking();
            ti = threads.add();
            if (!isOpen())
                return null;
            do {
                //設定當前position為newPosition
                p = nd.seek(fd, newPosition);
            } while ((p == IOStatus.INTERRUPTED) && isOpen());
            return this;
        } finally {
            threads.remove(ti);
            endBlocking(p > -1);
            assert IOStatus.check(p);
        }
    }
}

size

實現自SeekableByteChannel介面的方法,返回當前實體(檔案)的大小

truncate

實現自SeekableByteChannel介面的方法,用來擷取檔案至newSize大小

force

實現自SeekableByteChannel介面的方法,用來將channel中尚未寫入磁碟的資料強制落盤

transferTo

將fileChannel中的資料傳遞至另一個channel

transferFrom

從其它channel讀取資料至fileChannel

SocketChannel

open

/**
 * Opens a socket channel.
 *
 * <p> The new channel is created by invoking the {@link
 * java.nio.channels.spi.SelectorProvider#openSocketChannel
 * openSocketChannel} method of the system-wide default {@link
 * java.nio.channels.spi.SelectorProvider} object.  </p>
 *
 * @return  A new socket channel
 *
 * @throws  IOException
 *          If an I/O error occurs
 */
public static SocketChannel open() throws IOException {
    return SelectorProvider.provider().openSocketChannel();
}

open方法是呼叫SelectorProvider中實現了java.nio.channels.spi.SelectorProvider#openSocketChannel的方法,底層實際是new SocketChannelImpl,呼叫native方法建立socket

connect

public boolean connect(SocketAddress sa) throws IOException {
    //校驗Address是否合法
    InetSocketAddress isa = Net.checkAddress(sa);
    
    //獲取系統安全管理器
    SecurityManager sm = System.getSecurityManager();
    if (sm != null)
        
        //校驗IP和埠是否被允許連線
        sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());

    InetAddress ia = isa.getAddress();
    
    //如果是本機地址,則獲取本機的host
    if (ia.isAnyLocalAddress())
        ia = InetAddress.getLocalHost();

    try {
        //加讀鎖
        readLock.lock();
        try {
            //加寫鎖
            writeLock.lock();
            try {
                int n = 0;
                
                //是否阻塞
                boolean blocking = isBlocking();
                try {
                    //開啟connect前的校驗並設定為ST_CONNECTIONPENDING,如果blocking是true 即阻塞模式,則記錄當前執行緒的ID,以便接收訊號處理。
                    beginConnect(blocking, isa);
                    do {
                        //呼叫native connect方法
                        n = Net.connect(fd, ia, isa.getPort());
                    } while (n == IOStatus.INTERRUPTED && isOpen());
                } finally {
                    //結束連線
                    endConnect(blocking, (n > 0));
                }
                assert IOStatus.check(n);
                return n > 0;
            } finally {
                //釋放寫鎖
                writeLock.unlock();
            }
        } finally {
            //釋放讀鎖
            readLock.unlock();
        }
    } catch (IOException ioe) {
        // connect failed, close the channel
        close();
        throw SocketExceptions.of(ioe, isa);
    }
}

configureBlocking

實現自SelectableChannel的介面方法,呼叫native方法設定socket的阻塞狀態

register

在AbstractSelectableChannel中定義,註冊要監聽的事件。

public final SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException
{
    if ((ops & ~validOps()) != 0)
        throw new IllegalArgumentException();
    if (!isOpen())
        throw new ClosedChannelException();
    synchronized (regLock) {
        if (isBlocking())
            throw new IllegalBlockingModeException();
        synchronized (keyLock) {
            // re-check if channel has been closed
            if (!isOpen())
                throw new ClosedChannelException();
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.attach(att);
                k.interestOps(ops);
            } else {
                // 向Selector中註冊事件
                // New registration
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
            return k;
        }
    }
}

read

//實現自ReadableByteChannel介面的方法,從socket中讀取資料至ByteBuffer
@Override
public int read(ByteBuffer buf) throws IOException {
    Objects.requireNonNull(buf);

    readLock.lock();
    try {
        boolean blocking = isBlocking();
        int n = 0;
        try {
            //檢查channel是否開啟並已經是connected的狀態。如果blocking是true 即阻塞模式,則記錄當前執行緒的ID,以便接收訊號處理。
            beginRead(blocking);

            // check if input is shutdown
            if (isInputClosed)
                return IOStatus.EOF;

            //如果是阻塞模式,則一直讀取直到資料讀取完畢;非阻塞模式則直接呼叫native方法不需要等待。
            if (blocking) {
                do {
                    n = IOUtil.read(fd, buf, -1, nd);
                } while (n == IOStatus.INTERRUPTED && isOpen());
            } else {
                n = IOUtil.read(fd, buf, -1, nd);
            }
        } finally {
            endRead(blocking, n > 0);
            if (n <= 0 && isInputClosed)
                return IOStatus.EOF;
        }
        return IOStatus.normalize(n);
    } finally {
        readLock.unlock();
    }
}

//實現自ScatteringByteChannel介面的方法,從socket中依次讀取資料至ByteBuffer陣列
@Override
public long read(ByteBuffer[] dsts, int offset, int length)
    throws IOException
{
    Objects.checkFromIndexSize(offset, length, dsts.length);

    readLock.lock();
    try {
        boolean blocking = isBlocking();
        long n = 0;
        try {
            beginRead(blocking);

            // check if input is shutdown
            if (isInputClosed)
                return IOStatus.EOF;

            //如果是阻塞模式,則一直讀取直到資料讀取完畢;非阻塞模式則直接呼叫native方法不需要等待。
            if (blocking) {
                do {
                    n = IOUtil.read(fd, dsts, offset, length, nd);
                } while (n == IOStatus.INTERRUPTED && isOpen());
            } else {
                n = IOUtil.read(fd, dsts, offset, length, nd);
            }
        } finally {
            endRead(blocking, n > 0);
            if (n <= 0 && isInputClosed)
                return IOStatus.EOF;
        }
        return IOStatus.normalize(n);
    } finally {
        readLock.unlock();
    }
}

write

//實現自ReadableByteChannel介面的方法,將ByteBuffer中的資料寫入socket
@Override
public int write(ByteBuffer buf) throws IOException {
    Objects.requireNonNull(buf);

    writeLock.lock();
    try {
        boolean blocking = isBlocking();
        int n = 0;
        try {
            beginWrite(blocking);
            //如果是阻塞模式,則一直讀取直到資料讀取完畢;非阻塞模式則直接呼叫native方法不需要等待。
            if (blocking) {
                do {
                    n = IOUtil.write(fd, buf, -1, nd);
                } while (n == IOStatus.INTERRUPTED && isOpen());
            } else {
                n = IOUtil.write(fd, buf, -1, nd);
            }
        } finally {
            endWrite(blocking, n > 0);
            if (n <= 0 && isOutputClosed)
                throw new AsynchronousCloseException();
        }
        return IOStatus.normalize(n);
    } finally {
        writeLock.unlock();
    }
}

@Override
public long write(ByteBuffer[] srcs, int offset, int length)
    throws IOException
{
    Objects.checkFromIndexSize(offset, length, srcs.length);

    writeLock.lock();
    try {
        boolean blocking = isBlocking();
        long n = 0;
        try {
            beginWrite(blocking);
            //如果是阻塞模式,則一直等待直到資料寫入完畢;非阻塞模式則直接呼叫native方法不需要等待。
            if (blocking) {
                do {
                    n = IOUtil.write(fd, srcs, offset, length, nd);
                } while (n == IOStatus.INTERRUPTED && isOpen());
            } else {
                n = IOUtil.write(fd, srcs, offset, length, nd);
            }
        } finally {
            endWrite(blocking, n > 0);
            if (n <= 0 && isOutputClosed)
                throw new AsynchronousCloseException();
        }
        return IOStatus.normalize(n);
    } finally {
        writeLock.unlock();
    }
}

//實現自ReadableByteChannel介面的方法,將ByteBuffer陣列中的資料依次寫入socket
/**
 * Writes a byte of out of band data.
 */
int sendOutOfBandData(byte b) throws IOException {
    writeLock.lock();
    try {
        boolean blocking = isBlocking();
        int n = 0;
        try {
            beginWrite(blocking);
            //如果是阻塞模式,則一直等待直到資料寫入完畢;非阻塞模式則直接呼叫native方法不需要等待。
            if (blocking) {
                do {
                    n = sendOutOfBandData(fd, b);
                } while (n == IOStatus.INTERRUPTED && isOpen());
            } else {
                n = sendOutOfBandData(fd, b);
            }
        } finally {
            endWrite(blocking, n > 0);
            if (n <= 0 && isOutputClosed)
                throw new AsynchronousCloseException();
        }
        return IOStatus.normalize(n);
    } finally {
        writeLock.unlock();
    }
}

ServerSocketChannel

socket

@Override
public ServerSocket socket() {
    synchronized (stateLock) {
        if (socket == null)
            socket = ServerSocketAdaptor.create(this);
        return socket;
    }
}

bind

@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
    synchronized (stateLock) {
        ensureOpen();
        if (localAddress != null)
            throw new AlreadyBoundException();
        InetSocketAddress isa = (local == null)
                                ? new InetSocketAddress(0)
                                : Net.checkAddress(local);
        SecurityManager sm = System.getSecurityManager();
        if (sm != null)
            sm.checkListen(isa.getPort());
            
        //繫結前做一些前置處理,如將tcp socket檔案描述符轉換成SDP
        NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
        
        //繫結IP和地址
        Net.bind(fd, isa.getAddress(), isa.getPort());
        
        //開始監聽,設定socket上最多可以掛起backlog個連線,若backlog小於1 則預設設定50個
        Net.listen(fd, backlog < 1 ? 50 : backlog);
        
        localAddress = Net.localAddress(fd);
    }
    return this;
}

accept

@Override
public SocketChannel accept() throws IOException {
    acceptLock.lock();
    try {
        int n = 0;
        FileDescriptor newfd = new FileDescriptor();
        InetSocketAddress[] isaa = new InetSocketAddress[1];

        boolean blocking = isBlocking();
        try {
            begin(blocking);
            do {
                //阻塞等待接收客戶端連結
                n = accept(this.fd, newfd, isaa);
            } while (n == IOStatus.INTERRUPTED && isOpen());
        } finally {
            end(blocking, n > 0);
            assert IOStatus.check(n);
        }

        if (n < 1)
            return null;

        //新接收的socket初始設定為阻塞模式(因此非阻塞模式的每次需要顯示設定)
        // newly accepted socket is initially in blocking mode
        IOUtil.configureBlocking(newfd, true);

        InetSocketAddress isa = isaa[0];
        
        //用新接收的socket建立SocketChannel
        SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);

        // check permitted to accept connections from the remote address
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            try {
                sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throw x;
            }
        }
        return sc;

    } finally {
        acceptLock.unlock();
    }
}

ServerSocketChannel並沒有read和write方法,只是繼承了AbstractSelectableChannel,以便在selector中使用

DatagramChannel

open

public DatagramChannelImpl(SelectorProvider sp)
    throws IOException
{
    super(sp);
    ResourceManager.beforeUdpCreate();
    try {
        //如果不支援IPv6則使用IPv4
        this.family = Net.isIPv6Available()
                ? StandardProtocolFamily.INET6
                : StandardProtocolFamily.INET;
                
        //設定非流式的socket(tcp是流模式協議,udp是資料報模式協議)
        this.fd = Net.socket(family, false);
        this.fdVal = IOUtil.fdVal(fd);
    } catch (IOException ioe) {
        ResourceManager.afterUdpClose();
        throw ioe;
    }
}

receive

public SocketAddress receive(ByteBuffer dst) throws IOException {
    if (dst.isReadOnly())
        throw new IllegalArgumentException("Read-only buffer");

    readLock.lock();
    try {
        boolean blocking = isBlocking();
        int n = 0;
        ByteBuffer bb = null;
        try {
            SocketAddress remote = beginRead(blocking, false);
            boolean connected = (remote != null);
            SecurityManager sm = System.getSecurityManager();
            if (connected || (sm == null)) {
                // connected or no security manager
                do {
                    n = receive(fd, dst, connected);
                } while ((n == IOStatus.INTERRUPTED) && isOpen());
                if (n == IOStatus.UNAVAILABLE)
                    return null;
            } else {
                // Cannot receive into user's buffer when running with a
                // security manager and not connected
                bb = Util.getTemporaryDirectBuffer(dst.remaining());
                for (;;) {
                    do {
                        n = receive(fd, bb, connected);
                    } while ((n == IOStatus.INTERRUPTED) && isOpen());
                    if (n == IOStatus.UNAVAILABLE)
                        return null;
                    InetSocketAddress isa = (InetSocketAddress)sender;
                    try {
                        sm.checkAccept(isa.getAddress().getHostAddress(),
                                       isa.getPort());
                    } catch (SecurityException se) {
                        // Ignore packet
                        bb.clear();
                        n = 0;
                        continue;
                    }
                    bb.flip();
                    dst.put(bb);
                    break;
                }
            }
            
            //sender:傳送方地址, Set by receive0 (## ugh)
            assert sender != null;
            return sender;
        } finally {
            if (bb != null)
                Util.releaseTemporaryDirectBuffer(bb);
            endRead(blocking, n > 0);
            assert IOStatus.check(n);
        }
    } finally {
        readLock.unlock();
    }
}

send

public int send(ByteBuffer src, SocketAddress target)
        throws IOException
{
    Objects.requireNonNull(src);
    InetSocketAddress isa = Net.checkAddress(target, family);

    writeLock.lock();
    try {
        boolean blocking = isBlocking();
        int n = 0;
        try {
            //當connect後,remote會設定為連線的地址
            SocketAddress remote = beginWrite(blocking, false);
            if (remote != null) {
                // connected
                if (!target.equals(remote)) {
                    throw new AlreadyConnectedException();
                }
                do {
                    n = IOUtil.write(fd, src, -1, nd);
                } while ((n == IOStatus.INTERRUPTED) && isOpen());
            } else {
                // not connected
                SecurityManager sm = System.getSecurityManager();
                if (sm != null) {
                    InetAddress ia = isa.getAddress();
                    if (ia.isMulticastAddress()) {
                        sm.checkMulticast(ia);
                    } else {
                        sm.checkConnect(ia.getHostAddress(), isa.getPort());
                    }
                }
                do {
                    n = send(fd, src, isa);
                } while ((n == IOStatus.INTERRUPTED) && isOpen());
            }
        } finally {
            endWrite(blocking, n > 0);
            assert IOStatus.check(n);
        }
        return IOStatus.normalize(n);
    } finally {
        writeLock.unlock();
    }
}

connect

@Override
public DatagramChannel connect(SocketAddress sa) throws IOException {
    InetSocketAddress isa = Net.checkAddress(sa, family);
    SecurityManager sm = System.getSecurityManager();
    if (sm != null) {
        InetAddress ia = isa.getAddress();
        if (ia.isMulticastAddress()) {
            sm.checkMulticast(ia);
        } else {
            sm.checkConnect(ia.getHostAddress(), isa.getPort());
            sm.checkAccept(ia.getHostAddress(), isa.getPort());
        }
    }

    readLock.lock();
    try {
        writeLock.lock();
        try {
            synchronized (stateLock) {
                ensureOpen();
                if (state == ST_CONNECTED)
                    throw new AlreadyConnectedException();

                int n = Net.connect(family,
                                    fd,
                                    isa.getAddress(),
                                    isa.getPort());
                if (n <= 0)
                    throw new Error();      // Can't happen

                // connected
                remoteAddress = isa;
                state = ST_CONNECTED;

                // refresh local address
                localAddress = Net.localAddress(fd);

                // flush any packets already received.
                boolean blocking = isBlocking();
                if (blocking) {
                    IOUtil.configureBlocking(fd, false);
                }
                try {
                    ByteBuffer buf = ByteBuffer.allocate(100);
                    while (receive(buf) != null) {
                        buf.clear();
                    }
                } finally {
                    if (blocking) {
                        IOUtil.configureBlocking(fd, true);
                    }
                }
            }
        } finally {
            writeLock.unlock();
        }
    } finally {
        readLock.unlock();
    }
    return this;
}

udp是資料報模式的協議,是沒有connect的。這裡的connect實際上是在底層忽略了與其他地址的資料傳輸。
在connect後,就可以像socketChannel似得使用read和write了

總結

本文學習了各種channel的實現,主要是對底層native方法的一些封裝,針對不同屬性的實體(檔案、socket),使用對應的channel與byteBuffer傳輸資料。再通過byteBuffer與byte資料進行轉換。
channel的實現中,封裝了大量的native方法,重要的底層實現全在native中,後續可以深入學習下。

本文中出現的byteBuffer和selector將在接下來的文章中,單獨分析。

作者:aloof_

原文連結

本文為雲棲社群原創內容,未經