1. 程式人生 > >【Netty原始碼分析】資料讀取過程

【Netty原始碼分析】資料讀取過程

首先客戶端連線到服務端時服務端會開啟一個執行緒,不斷的監聽客戶端的操作。

這個執行緒的執行操作在NioEventLoop的run方法中,其實操作是在processSelectedKeys中,監聽是否進行讀操作

protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();
                    processSelectedKeys();
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } 
		}
    }
private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
判斷selectedKeys陣列中是否有值,其實就是read,write或accept事件
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            selectedKeys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
        }
    }

主要判斷int readyOps = k.readyOps();拿到的值是否是SelectionKey.OP_READ | SelectionKey.OP_ACCEPT

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
            
            }
          
            if (eventLoop != this || eventLoop == null) {
                return;
            }
          
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
        
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
              
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

如果int readyOps = k.readyOps();拿到的值是SelectionKey.OP_READ | SelectionKey.OP_ACCEPT就執行unsafe.read();

具體實現在AbstractNioByteChannel執行read操作

@Override
public final void read() {
	final ChannelConfig config = config();
	final ChannelPipeline pipeline = pipeline();
	final ByteBufAllocator allocator = config.getAllocator();
	final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
	allocHandle.reset(config);

	ByteBuf byteBuf = null;
	boolean close = false;
	try {
		do {
			byteBuf = allocHandle.allocate(allocator);
			allocHandle.lastBytesRead(doReadBytes(byteBuf));
			if (allocHandle.lastBytesRead() <= 0) {
				// nothing was read. release the buffer.
				byteBuf.release();
				byteBuf = null;
				close = allocHandle.lastBytesRead() < 0;
				break;
			}
			allocHandle.incMessagesRead(1);
			readPending = false;
			pipeline.fireChannelRead(byteBuf);
			byteBuf = null;
            } while (allocHandle.continueReading());

		allocHandle.readComplete();
		pipeline.fireChannelReadComplete();

		if (close) {
			closeOnRead(pipeline);
		}
	} catch (Throwable t) {
		handleReadException(pipeline, byteBuf, t, close, allocHandle);
	} 
}

讀取操作是在doReadBytes(byteBuf)中,具體實現是在NioSocketChannel中

@Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }
接下來的操作是在AbstractByteBuf的writeBytes中進行。
@Override
    public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
        ensureAccessible();
        ensureWritable(length);
        int writtenBytes = setBytes(writerIndex, in, length);
        if (writtenBytes > 0) {
            writerIndex += writtenBytes;
        }
        return writtenBytes;
    }
讀取資料操作是在setBytes函式中進行的,一般實現是在PooledUnsafeDirectByteBuf類中
 @Override
    public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
        checkIndex(index, length);
        ByteBuffer tmpBuf = internalNioBuffer();
        index = idx(index);
        tmpBuf.clear().position(index).limit(index + length);
        try {
            return in.read(tmpBuf);
        } catch (ClosedChannelException ignored) {
            return -1;
        }
    }
在這裡我們可以看到操作in.read(tmpBuf),這個操作就是NIO的讀取資料操作了,in是SocketChannel物件,將資料讀取到快取中去,完成資料的讀取過程。

相關推薦

Netty原始碼分析資料讀取過程

首先客戶端連線到服務端時服務端會開啟一個執行緒,不斷的監聽客戶端的操作。這個執行緒的執行操作在NioEventLoop的run方法中,其實操作是在processSelectedKeys中,監聽是否進行讀操作protected void run() { for

Netty原始碼分析傳送資料過程

future.channel().writeAndFlush("Hello Netty Server ,I am a common client"); 呼叫AbstractChannel的writeAndFlush函式@Override public ChannelFutu

Netty原始碼分析Netty服務端bind埠過程

這一篇部落格我們介紹一下Netty服務端繫結埠的過程,我們通過跟蹤程式碼一直到NIO原生繫結埠的操作。繫結埠操作ChannelFuture future = serverBootstrap.bind(8080).sync();AbstractBootstrap中bind操作p

NCNN原始碼分析1.基本資料型別

對於NCNN而言,核心在於網路的前向推理過程(Inference),其主要資料型別為mat,該資料型別以類的形式定義在src/mat.h中,其中包含了mat的建構函式、解構函式、常見的運算過程。 #if

NCNN原始碼分析3.基本資料結構分析

對於NCNN,在網路層傳遞的過程中,進行資料流動的方式是通過自定義的blob實現的,對於blob通過生產者編號和消費者編號進行定義,producer表示輸出該blob的網路層編號,consumers表示

Pig原始碼分析談談Pig的資料模型

1. 資料模型 Schema Pig Latin表示式操作的是relation,FILTER、FOREACH、GROUP、SPLIT等關係操作符所操作的relation就是bag,bag為tuple的集合,tuple為有序的field列表集合,而field表示資料塊(A field is a piece of

go原始碼分析go原始碼之slice原始碼分析

Go 語言切片是對陣列的抽象。 Go 陣列的長度不可改變,與陣列相比切片的長度是不固定的,可以追加元素,在追加時可能使切片的容量增大。 len() 和 cap() 函式     切片是可索引的,並且可以由 len() 方法獲取長度。    

go原始碼分析go原始碼之list原始碼分析

本文針對go 1.11版本,路徑src/container/list/list.go 資料結構 Element結構體 Value 前驅 後繼 // Element is an element of a linked list. type Element st

java原始碼分析Map中的hash演算法分析

全網把Map中的hash()分析的最透徹的文章,別無二家。 2018年05月09日 09:08:08 閱讀數:957 你知道HashMap中hash方法的具體實現嗎?你知道HashTable、ConcurrentHashMap中hash方法

ArcGIS|空間分析資料互動(Google Earth與ArcGIS,ENVI與ArcGIS)

文章目錄 Google Earth資料與ArcGIS資料互動 ENVI資料與ArcGIS資料互動 ENVI資料匯入ArcGIS ArcGIS資料匯入ENVI Google Earth資料與ArcGIS資料互動

spring原始碼分析IOC容器初始化(二)

前言:在【spring原始碼分析】IOC容器初始化(一)中已經分析了匯入bean階段,本篇接著分析bean解析階段。 1.解析bean程式呼叫鏈 同樣,先給出解析bean的程式呼叫鏈: 根據程式呼叫鏈,整理出在解析bean過程中主要涉及的類和相關方法。 2.解析bean原始碼分

Go 原始碼分析從 sort.go 看排序演算法的工程實踐

go version go1.11 darwin/amd64file: src/sort/sort.go 排序演算法有很多種類,比如快排、堆排、插入排序等。各種排序演算法各有其優劣性,在實際生產過程中用到的排序演算法(或者說 Sort 函式)通常是由幾種排序演算法組

PHP7原始碼分析如何理解PHP虛擬機器(一)

順風車運營研發團隊 李樂 1.從物理機說起 虛擬機器也是計算機,設計思想和物理機有很多相似之處; 1.1馮諾依曼體系結構 馮·諾依曼是當之無愧的數字計算機之父,當前計算機都採用的是馮諾依曼體系結構;設計思想主要包含以下幾個方面: 指令和資料不加區別混合儲存在同一個儲

Mybatis原始碼分析13-記一次PageHelper reasonable引數使用不當造成的死迴圈

問題描述及原因 使用Mybatis PageHelper外掛做了表的分頁查詢,要求查詢符合某一條件的所有記錄做處理,就寫了一個迭代器在while迴圈裡對每條記錄做處理,直到符合條件的記錄都處理完程式返回。程式碼如下 public class ReconPaymentI

Tomcat9原始碼分析元件與框架概述

1 元件與框架介紹 Server:代表整個Catalina Servlet容器,可以包含一個或多個Service Service:包含Connector和Container的集合,Service用適當的Connector接收使用者的請求,

E2LSH原始碼分析p穩定分佈LSH演算法初探

上一節,我們分析了LSH演算法的通用框架,主要是建立索引結構和查詢近似最近鄰。這一小節,我們從p穩定分佈LSH(p-Stable LSH)入手,逐漸深入學習LSH的精髓,進而靈活應用到解決大規模資料的

MyBatis原始碼分析TypeHandler解析屬性配置元素詳述及相關列舉使用高階進階

TypeHandler解析接著看一下typeHandlerElement(root.evalNode("typeHandlers"));方法,這句讀取的是<configuration>下的<typeHandlers>節點,程式碼實現為:private

Java8原始碼分析併發包-AtomicInteger

AtomicInteger類是實現了原子操作的Integer,以往對於保證int、double、float等基礎型別的運算原子性,需要採用加鎖的方式。但是為了一個簡單的運算操作採用鎖,在多執行緒競爭嚴重的情況下,會導致效能降低,所以在java1.5推出了Atom

proxyConfig原始碼分析proxyTargetClassexposeProxy

本文轉載自shysheng:spring proxyConfig原始碼分析 我們知道,ProxyConfig是所有產生Spring AOP代理物件的基類,它是一個配置源,主要為其AOP代理物件工廠實現類提供基本的配置屬性。 它一共包含5個屬性,本文主要就是分析這5個屬性在產生代理物件過

OkHttp3原始碼分析(一)Request的execute

簡單使用OkHttp3 閱讀本文需要對OkHttp3的使用有一定了解。 首先我們先看看如何簡單進行一個get請求的Request。 Request qqRequest = new Request.Builder()