1. 程式人生 > >netty源碼分析

netty源碼分析

分派 ges 作者 boolean () conf jar包 ++ 應用

  1、Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。也就是說,Netty 是一個基於NIO的客戶、服務器端編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。Netty相當簡化和流線化了網絡應用的編程開發過程,例如,TCP和UDP的socket服務開發。

  2、目前netty有3個版本netty3、netty4、netty5。3個版本的內容有所不同。neety3是核心的代碼介紹。相對於netty4、和netty5的復雜性來說。netty3的源碼是值得學習的。我這裏解析了netty3的一些源碼,僅供大家理解,也是為了方便大家理解做了很多簡化。不代表作者的開發思路。

  3、我們先來看一張圖(這張圖是我在學習源碼的時候扣的,哈哈)

  一、傳統NIO流

  技術分享圖片

  1)一個線程裏面,存在一個selector,當然這個selector也承擔起看大門和服務客人的工作。

  2)這裏不管多少客戶端進來,都是這個selector來處理。這樣就就加大了這個服務員的工作量

  3)為了加入線程池,讓多個selector同時工作,當時目的性都是一樣的。

  4)雖然看大門的和服務客人的都是服務員,但是還是存在差別的。為了更好的處理多個線程的問題。所以這裏netty就誕生了。

二、netty框架

  技術分享圖片

  理解:

  1)netty3的框架也是基於nio流做出來的。所以這裏會詳細介紹netty3框架的思路

  2)將看門的服務員和服務客人的服務員分開。形成兩塊(也就是2個線程池,也就是後面的boss和worker)

  3)當一個客人來的時候,首先boss,進行接待。然後boss分配工作給worker,這個,在兩個線程池的工作下,有條不亂。

  4)原理:就是將看大門的selector和服務客人的selector分開。然後通過boss線程池,下發任務給對應的worker

  4、netty3源碼分析

  1)加入對應的jar包。我這裏為了了解源碼用的是netty3的包。

    <dependency>
            <groupId>io.netty</
groupId> <artifactId>netty</artifactId> <version>3.10.6.Final</version> </dependency>

  2)目錄結構

  技術分享圖片

  說明:

  a、NettyBoss、NettyWork是針對於selector做區分。雖然他們很多共性,我這裏為了好理解,並沒有做抽象類(忽略開發思路)。

  b、ThreadHandle是用來初始化線程池和對應的接口。

  c、Start為啟動類

  3)NettyBoss(看大門的服務員,第一種線程selector)

package com.troy.application.netty;

import java.io.IOException;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

public class NettyBoss {

    //線程池
    public final Executor executor;
    //boss選擇器
    protected Selector selector;
    //原子變量,主要是用來保護線程安全。當本線程執行的時候,排除其他線程的執行
    protected final AtomicBoolean wakenUp = new AtomicBoolean();
    //隊列,線程安全隊列。
    public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();
    //線程處理,這裏主要是拿到work的線程池
    protected ThreadHandle threadHandle;

    //初始化
    public NettyBoss(Executor executor,ThreadHandle threadHandle) {
        //賦值
        this.executor = executor;
        this.threadHandle = threadHandle;
        try {
            //每一個線程選擇器
            this.selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        //從線程中獲取一個線程執行以下內容
        executor.execute(() -> {
            while (true) {
                try {
                    //這裏的目前就是排除其他線程同事執行,false因為這裏處於阻塞狀態,不用開啟
                    wakenUp.set(false);
                    //選擇器阻塞
                    selector.select();
                    //運行隊列中的任務
                    while (true) {
                        final Runnable task = taskQueue.poll();
                        if (task == null) {
                            break;
                        }
                        //如果任務存在開始運行
                        task.run();
                    }
                    //對進來的進行處理
                    this.process(selector);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey key = i.next();
            i.remove();
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            // 新客戶端
            SocketChannel channel = server.accept();
            // 設置為非阻塞
            channel.configureBlocking(false);
            // 獲取一個worker
            NettyWork nextworker = threadHandle.workeres[Math.abs(threadHandle.workerIndex.getAndIncrement() % threadHandle.workeres.length)];
            // 註冊新客戶端接入任務
            Runnable runnable = () -> {
                try {
                    //將客戶端註冊到selector中
                    channel.register(nextworker.selector, SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            };
            //添加到work的隊列中
            nextworker.taskQueue.add(runnable);
            if (nextworker.selector != null) {
                //這裏的目前就是開啟執行過程
                if (nextworker.wakenUp.compareAndSet(false, true)) {
                    //放開本次阻塞,進行下一步執行
                    nextworker.selector.wakeup();
                }
            } else {
                //任務完成移除線程
                taskQueue.remove(runnable);
            }
            System.out.println("新客戶端鏈接");
        }
    }
}

  解釋:

  a、初始化的時候,賦值線程池,和線程處理類(線程處理類目的是獲取worker的工作線程)

  b、executor為線程池的執行過程。

  c、selector.select()為形成阻塞,wakenUp為了線程安全考核。在接入客戶端的時候用selector.wakeup()來放開本次阻塞(很重要)。

  d、然後在worker安全隊列中執行對應工作。(taskQueue的目前在boss和worker中的作用都是為了考慮線程安全,這裏采用線程安全隊列的目的是為了不直接操作其他線程)

  e、wakenUp.compareAndSet(false, true),這裏是考慮並發問題。在本線程運行的時候,其他線程處於等待狀態。這裏也是為了線程安全考慮。

  4)NettyWork(服務客人的服務員,第二種selector)

package com.troy.application.netty;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

public class NettyWork {
    //線程池
    public final Executor executor;
    //boss選擇器
    protected Selector selector;
    //原子變量,主要是用來保護線程安全。當本線程執行的時候,排除其他線程的執行
    protected final AtomicBoolean wakenUp = new AtomicBoolean();
    //隊列,線程安全隊列。
    public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();

    //初始化
    public NettyWork(Executor executor) {
        this.executor = executor;
        try {
            //每一個work也需要一個選擇器用來管理通道
            this.selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        //從線程池中獲取一個線程開始執行
        executor.execute(() -> {
            while (true) {
                try {
                    //阻塞狀態排除問題
                    wakenUp.set(false);
                    //阻塞
                    selector.select();
                    //處理work任務
                    while (true) {
                        final Runnable task = taskQueue.poll();
                        if (task == null) {
                            break;
                        }
                        //存在work任務開始執行
                        task.run();
                    }
                    //處理任務
                    this.process(selector);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
        while (ite.hasNext()) {
            SelectionKey key = (SelectionKey) ite.next();
            // 移除,防止重復處理
            ite.remove();
            // 得到事件發生的Socket通道
            SocketChannel channel = (SocketChannel) key.channel();
            // 數據總長度
            int ret = 0;
            boolean failure = true;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //讀取數據
            try {
                ret = channel.read(buffer);
                failure = false;
            } catch (Exception e) {
                // ignore
            }
            //判斷是否連接已斷開
            if (ret <= 0 || failure) {
                key.cancel();
                System.out.println("客戶端斷開連接");
            }else{
                System.out.println("收到數據:" + new String(buffer.array()));
                //回寫數據
                ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
                channel.write(outBuffer);// 將消息回送給客戶端
            }
        }
    }
}

  解釋:

  a、worker的執行方式基本上面和boss的方式是一樣的,只不夠是處理方式不一樣

  b、這裏需要註意的是,都是考慮線程隊列執行。

  3)ThreadHandle(線程處理,這裏主要是啟動需要的東西)

package com.troy.application.netty;


import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadHandle {

    public final AtomicInteger bossIndex = new AtomicInteger();
    public static NettyBoss[] bosses;
    public final AtomicInteger workerIndex = new AtomicInteger();
    public static NettyWork[] workeres;

    public ThreadHandle(ExecutorService boss,ExecutorService work) {
        this.bosses = new NettyBoss[1];
        //初始化boss線程池
        for (int i = 0; i < bosses.length; i++) {
            bosses[i] = new NettyBoss(boss,this);
        }
        this.workeres = new NettyWork[Runtime.getRuntime().availableProcessors() * 2];
        //初始化work線程池
        for (int i = 0; i < workeres.length; i++) {
            workeres[i] = new NettyWork(work);
        }
    }

    public void bind(InetSocketAddress inetSocketAddress) {
        try {
            // 獲得一個ServerSocket通道
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            // 設置通道為非阻塞
            serverChannel.configureBlocking(false);
            // 將該通道對應的ServerSocket綁定到port端口
            serverChannel.socket().bind(inetSocketAddress);
            //獲取一個boss線程
            NettyBoss nextBoss = bosses[Math.abs(bossIndex.getAndIncrement() % workeres.length)];
            //向boss註冊一個ServerSocket通道
            Runnable runnable = () -> {
                try {
                    //註冊serverChannel到selector
                    serverChannel.register(nextBoss.selector, SelectionKey.OP_ACCEPT);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            };
            //加入任務隊列
            nextBoss.taskQueue.add(runnable);
            if (nextBoss.selector != null) {
                //排除其他任務處理
                if (nextBoss.wakenUp.compareAndSet(false, true)) {
                    //放開阻塞
                    nextBoss.selector.wakeup();
                }
            } else {
                //移除任務
                nextBoss.taskQueue.remove(runnable);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  解釋:

  a、這裏采用數組的形式,主要目的是考慮多個看門的,和多個服務客人的線程。為了好控制,好選擇,哪一個來執行。

  b、端口的註冊,在NettyBoss裏面進行初始化的的原理都是一樣的。

  4)start

package com.troy.application.netty;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Start {

    public static void main(String[] args) {
        //聲明線程池
        ExecutorService boss = Executors.newCachedThreadPool();
        ExecutorService work = Executors.newCachedThreadPool();
        //初始化線程池
        ThreadHandle threadHandle = new ThreadHandle(boss,work);
        //聲明端口
        threadHandle.bind(new InetSocketAddress(9000));
        System.out.println("start");
    }
}

  說明一下流程

  a、初始化boss和work。讓boss線程池加入設定第一種boss的selector,並且處於阻塞狀態。work的初始化也基本上是一樣的,只不過換成了第二種selector線程池,處於阻塞狀態。

  b、當線程處理類初始化監聽端口的時候。就是選擇boss中其中一個selector。聲明一個線程先監聽,加入boss的線程安全隊列中。然後放開boss阻塞,向下執行。線程執行會監聽對應端口並阻塞。

  c、當一個客戶端接入的時候,boss中的selector會監聽到對應端口。然後選擇work線程中的一個selector給work分派任務。

  d、最後work中的selector來處理事務。

  4、源碼下載:https://pan.baidu.com/s/1pKIxuMf

  5、本代碼只是用於理解netty的實現過程,不代表開發思路。其中我為了簡化代碼,做了很多調整。目的就是壓縮代碼,方便理解。

netty源碼分析