1. 程式人生 > >基於Netty3的RPC架構筆記3之執行緒模型原始碼分析

基於Netty3的RPC架構筆記3之執行緒模型原始碼分析

      隨著使用者量上升,專案的架構也在不斷的升級,由最開始的MVC的垂直架構(傳統專案)到RPC架構(webservice,rest,netty,mina),再到SOA模型(dubbo),再到最近的微服務,又比如Tomcat6之前的IO模型都是BIO 也就是阻塞IO,到後來變成多路複用,也是阻塞IO。到非阻塞NIO,再到非同步非阻塞AIO,

     言歸正傳,接著談netty,傳統IO是一個執行緒服務一個客戶,後來通過netty,可以一個執行緒服務多個客戶,下面的那個圖展示的是netty的NIO通過引入多執行緒來提高效能,既一個執行緒負責一片使用者

直接上程式碼

package com.cn;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import com.cn.pool.NioSelectorRunnablePool;
/**
 * 啟動函式
 *
 */
public class Start {

	public static void main(String[] args) {

		
		//初始化執行緒
		NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
		
		//獲取服務類
		ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);
		
		//繫結埠
		bootstrap.bind(new InetSocketAddress(10101));
		
		System.out.println("start");
	}

}
package com.cn.pool;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import com.cn.NioServerBoss;
import com.cn.NioServerWorker;
/**
 * selector執行緒管理者
 *
 */
public class NioSelectorRunnablePool {

	/**
	 * boss執行緒陣列
	 */
	private final AtomicInteger bossIndex = new AtomicInteger();
	private Boss[] bosses;

	/**
	 * worker執行緒陣列
	 */
	private final AtomicInteger workerIndex = new AtomicInteger();
	private Worker[] workeres;

	
	public NioSelectorRunnablePool(Executor boss, Executor worker) {
		initBoss(boss, 1);
		initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);
	}

	/**
	 * 初始化boss執行緒
	 * @param boss
	 * @param count
	 */
	private void initBoss(Executor boss, int count) {
		this.bosses = new NioServerBoss[count];
		for (int i = 0; i < bosses.length; i++) {
			bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);
		}

	}

	/**
	 * 初始化worker執行緒
	 * @param worker
	 * @param count
	 */
	private void initWorker(Executor worker, int count) {
		this.workeres = new NioServerWorker[count];
		for (int i = 0; i < workeres.length; i++) {
			workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);
		}
	}

	/**
	 * 獲取一個worker
	 * @return
	 */
	public Worker nextWorker() {
		 return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];

	}

	/**
	 * 獲取一個boss
	 * @return
	 */
	public Boss nextBoss() {
		 return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
	}

}

package com.cn;

import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;

import com.cn.pool.Boss;
import com.cn.pool.NioSelectorRunnablePool;
/**
 * 服務類
 *
 */
public class ServerBootstrap {

private NioSelectorRunnablePool selectorRunnablePool;
	
	public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {
		this.selectorRunnablePool = selectorRunnablePool;
	}
	
	/**
	 * 繫結埠
	 * @param localAddress
	 */
	public void bind(final SocketAddress localAddress){
		try {
			// 獲得一個ServerSocket通道
			ServerSocketChannel serverChannel = ServerSocketChannel.open();
			// 設定通道為非阻塞
			serverChannel.configureBlocking(false);
			// 將該通道對應的ServerSocket繫結到port埠
			serverChannel.socket().bind(localAddress);
			
			//獲取一個boss執行緒
			Boss nextBoss = selectorRunnablePool.nextBoss();
			//向boss註冊一個ServerSocket通道
			nextBoss.registerAcceptChannelTask(serverChannel);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
package com.cn.pool;

import java.nio.channels.SocketChannel;
/**
 * worker介面
 *
 */
public interface Worker {
	
	/**
	 * 加入一個新的客戶端會話
	 * @param channel
	 */
	public void registerNewChannelTask(SocketChannel channel);

}
package com.cn.pool;

import java.nio.channels.ServerSocketChannel;
/**
 * boss介面
 *
 */
public interface Boss {
	
	/**
	 * 加入一個新的ServerSocket
	 * @param serverChannel
	 */
	public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.cn;

import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import com.cn.pool.NioSelectorRunnablePool;

/**
 * 抽象selector執行緒類
 * 
 * 
 */
public abstract class AbstractNioSelector implements Runnable {

	/**
	 * 執行緒池
	 */
	private final Executor executor;

	/**
	 * 選擇器
	 */
	protected Selector selector;

	/**
	 * 選擇器wakenUp狀態標記
	 */
	protected final AtomicBoolean wakenUp = new AtomicBoolean();

	/**
	 * 任務佇列
	 */
	private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();

	/**
	 * 執行緒名稱
	 */
	private String threadName;
	
	/**
	 * 執行緒管理物件
	 */
	protected NioSelectorRunnablePool selectorRunnablePool;

	AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
		this.executor = executor;
		this.threadName = threadName;
		this.selectorRunnablePool = selectorRunnablePool;
		openSelector();
	}

	/**
	 * 獲取selector並啟動執行緒
	 */
	private void openSelector() {
		try {
			this.selector = Selector.open();
		} catch (IOException e) {
			throw new RuntimeException("Failed to create a selector.");
		}
		executor.execute(this);
	}

	@Override
	public void run() {
		
		Thread.currentThread().setName(this.threadName);

		while (true) {
			try {
				wakenUp.set(false);

				select(selector);

				processTaskQueue();

				process(selector);
			} catch (Exception e) {
				// ignore
			}
		}

	}

	/**
	 * 註冊一個任務並激活selector
	 * 
	 * @param task
	 */
	protected final void registerTask(Runnable task) {
		taskQueue.add(task);

		Selector selector = this.selector;

		if (selector != null) {
			if (wakenUp.compareAndSet(false, true)) {
				selector.wakeup();
			}
		} else {
			taskQueue.remove(task);
		}
	}

	/**
	 * 執行佇列裡的任務
	 */
	private void processTaskQueue() {
		for (;;) {
			final Runnable task = taskQueue.poll();
			if (task == null) {
				break;
			}
			task.run();
		}
	}
	
	/**
	 * 獲取執行緒管理物件
	 * @return
	 */
	public NioSelectorRunnablePool getSelectorRunnablePool() {
		return selectorRunnablePool;
	}

	/**
	 * select抽象方法
	 * 
	 * @param selector
	 * @return
	 * @throws IOException
	 */
	protected abstract int select(Selector selector) throws IOException;

	/**
	 * selector的業務處理
	 * 
	 * @param selector
	 * @throws IOException
	 */
	protected abstract void process(Selector selector) throws IOException;

}
package com.cn;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

import com.cn.pool.Boss;
import com.cn.pool.NioSelectorRunnablePool;
import com.cn.pool.Worker;
/**
 * boss實現類
 *
 */
public class NioServerBoss extends AbstractNioSelector implements Boss{

	public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
		super(executor, threadName, selectorRunnablePool);
	}

	@Override
	protected void process(Selector selector) throws IOException {
		Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey key = i.next();
            i.remove();
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
    		// 新客戶端
    		SocketChannel channel = server.accept();
    		// 設定為非阻塞
    		channel.configureBlocking(false);
    		// 獲取一個worker
    		Worker nextworker = getSelectorRunnablePool().nextWorker();
    		// 註冊新客戶端接入任務
    		nextworker.registerNewChannelTask(channel);
    		
    		System.out.println("新客戶端連結");
        }
	}
	
	
	public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){
		 final Selector selector = this.selector;
		 registerTask(new Runnable() {
			@Override
			public void run() {
				try {
					//註冊serverChannel到selector
					serverChannel.register(selector, SelectionKey.OP_ACCEPT);
				} catch (ClosedChannelException e) {
					e.printStackTrace();
				}
			}
		});
	}
	
	@Override
	protected int select(Selector selector) throws IOException {
		return selector.select();
	}

}
package com.cn;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

import com.cn.pool.NioSelectorRunnablePool;
import com.cn.pool.Worker;
/**
 * worker實現類
 *
 */
public class NioServerWorker extends AbstractNioSelector implements Worker{

	public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
		super(executor, threadName, selectorRunnablePool);
	}

	@Override
	protected void process(Selector selector) throws IOException {
		Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
		while (ite.hasNext()) {
			SelectionKey key = (SelectionKey) ite.next();
			// 移除,防止重複處理
			ite.remove();
			
			// 得到事件發生的Socket通道
			SocketChannel channel = (SocketChannel) key.channel();
			
			// 資料總長度
			int ret = 0;
			boolean failure = true;
			ByteBuffer buffer = ByteBuffer.allocate(1024);
			//讀取資料
			try {
				ret = channel.read(buffer);
				failure = false;
			} catch (Exception e) {
				// ignore
			}
			//判斷是否連線已斷開
			if (ret <= 0 || failure) {
				key.cancel();
				System.out.println("客戶端斷開連線");
	        }else{
	        	 System.out.println("收到資料:" + new String(buffer.array()));
	        	 
	     		//回寫資料
	     		ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
	     		channel.write(outBuffer);// 將訊息回送給客戶端
	        }
		}
	}

	/**
	 * 加入一個新的socket客戶端
	 */
	public void registerNewChannelTask(final SocketChannel channel){
		 final Selector selector = this.selector;
		 registerTask(new Runnable() {
			@Override
			public void run() {
				try {
					//將客戶端註冊到selector中
					channel.register(selector, SelectionKey.OP_READ);
				} catch (ClosedChannelException e) {
					e.printStackTrace();
				}
			}
		});
	}

	@Override
	protected int select(Selector selector) throws IOException {
		return selector.select(500);
	}
	
}

上面的例子是直接引用jar,也可以通過引用專案netty的原始碼從而理解netty工作原理

試想我們如何提高NIO的工作效率,一個NIO是不是隻能有一個selector?當然不是,一個系統可以有多個selector

selector可以註冊多個ServerSocketChannel

我們如何去看一個開源框架的程式碼

一斷點(多執行緒的情況下可以設定斷點的條件,指定列印某個執行緒)
二列印
三看呼叫棧
四搜尋