1. 程式人生 > >劍指架構師系列-tomcat6通過偽異步實現connector

劍指架構師系列-tomcat6通過偽異步實現connector

偽異步 setname lose ++ tar integer ext mil 開始

首先在StandardService中start接收請求的線程,如下:

 synchronized (connectors) {
            for (int i = 0; i < connectors.length; i++) {
                try {
                    ((Lifecycle) connectors[i]).start();
                } catch (Exception e) {
                    log.error(sm.getString("standardService.connector.startFailed",connectors[i]), e);
                }
            }
        }

然後進入Connector,在這個類中調用了org.apache.coyote.http11.Http11Protocol類

protocolHandler.start();

在Http11Protocol類中又調用了org.apache.tomcat.util.net.JIoEndpoint類

endpoint.start();

下面看一下JIoEndpoint類中的start源代碼,如下:

public void start() throws Exception {
		// Initialize socket if not done before
		if (!initialized) {
			init();
		}
		if (!running) {
			running = true;
			paused = false;

			// Create worker collection
			if (executor == null) {
				workers = new WorkerStack(maxThreads); // maxThreads值為200,可同時處理200個請求
			}

			// Start acceptor threads
			for (int i = 0; i < acceptorThreadCount; i++) {  // acceptorThreadCount值為1,只有一個接收請求的線程
				Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
				acceptorThread.setPriority(threadPriority);
				acceptorThread.setDaemon(daemon);
				acceptorThread.start();
			}
		}
	}

WorkerStack類使用了定長的數組來方便存取worker,也就是真正處理請求的線程。重點看一下Acceptor這個Runnable類的實現。

/**
	 * Server socket acceptor thread.
	 */
	protected class Acceptor implements Runnable {

		/**
		 * The background thread that listens for incoming TCP/IP connections
		 * and hands them off to an appropriate processor.
		 */
		public void run() {

			// Loop until we receive a shutdown command
			while (running) {

				// Loop if endpoint is paused
				while (paused) {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						// Ignore
					}
				}

				// Accept the next incoming connection from the server socket
				try {
					Socket socket = serverSocketFactory.acceptSocket(serverSocket);  // 阻塞接收socket請求
					serverSocketFactory.initSocket(socket);
					// Hand this socket off to an appropriate processor
					if (!processSocket(socket)) {   // 處理socket請求
						// Close socket right away
						try {
							socket.close();  
						} catch (IOException e) {
							// Ignore
						}
					}
				} catch (IOException x) {
					if (running)
						log.error(sm.getString("endpoint.accept.fail"), x);
				} catch (Throwable t) {
					log.error(sm.getString("endpoint.accept.fail"), t);
				}
				// The processor will recycle itself when it finishes ???
			}
		}// end run

	}

最重要的就是processSocket()方法了,看下源代碼:

protected boolean processSocket(Socket socket) {
		try {
			if (executor == null) {  // 默認情況
				getWorkerThread().assign(socket);
			} else {  // 用戶自己指定了執行任務的線程池
				executor.execute(new SocketProcessor(socket));
			}
		} catch (Throwable t) {
			// This means we got an OOM or similar creating a thread, or that
			// the pool and its queue are full
			log.error(sm.getString("endpoint.process.fail"), t);
			return false;
		}
		return true;
	}

在默認情況下,首先要get到一個Worker的Thread,然後才能assign任務。

看一下getWorkerThread()這條邏輯:

/**
	 * Return a new worker thread, and block while to worker is available.
	 */
	protected Worker getWorkerThread() {
		// Allocate a new worker thread
		synchronized (workers) {   // 獲取workers鎖
			Worker workerThread;
			while ((workerThread = createWorkerThread()) == null) {
				try {
					workers.wait();  // 沒有可用線程時釋放workers鎖,等待notify
				} catch (InterruptedException e) {
					// Ignore
				}
			}
			return workerThread;
		}
	}

代碼要通過createWorkerThread()方法來獲取一個workerThread,閱讀如下代碼就可以知道,這個方法有可能返回null。這樣這個線程就需要讓鎖等待了,直到有線程notify。想一下就知道,肯定是分配出去執行任務的線程執行完成後,就可以notify接口請求的線程。接收請求的線程繼續while循環,直到獲取到一個workerThread為止。

createWorkerThread()方法源代碼:

protected Worker createWorkerThread() {

		synchronized (workers) {
			if (workers.size() > 0) { // 通過WorkerStack提供的方法來操作Worker
				curThreadsBusy++;
				return workers.pop();
			}
			if ((maxThreads > 0) && (curThreads < maxThreads)) { // 保證不能大於指定的最大線程數
				curThreadsBusy++;
				if (curThreadsBusy == maxThreads) {
					log.info(sm.getString("endpoint.info.maxThreads", Integer.toString(maxThreads), address,Integer.toString(port)));
				}
				return (newWorkerThread());
			} else {  
				if (maxThreads < 0) {  // maxThreads小於0時會無限制的new WorkerThread,表示不限制
					curThreadsBusy++;
					return (newWorkerThread());
				} else {  // 當curThreads等於maxThreads或者大於maxThreads且maxThreads大於0的情況
					return (null);
				}
			}
		}

	}  

recycleWorkerThread()方法源代碼:

protected void recycleWorkerThread(Worker workerThread) {
		synchronized (workers) {
			workers.push(workerThread);
			curThreadsBusy--;
			workers.notify();
		}
	}

這個方法被誰調用了呢?當然是被執行任何的線程調用了。  

下面來看一下最重要的Worker類中非常重要的幾個方法,如下:

protected class Worker implements Runnable {

		protected Thread thread = null;
		protected boolean available = false;  // available初始化為false
		protected Socket socket = null;

		/**
		 * The background thread that listens for incoming TCP/IP connections
		 * and hands them off to an appropriate processor.
		 */
		public void run() {

			// Process requests until we receive a shutdown signal
			while (running) {

				// Wait for the next socket to be assigned
				Socket socket = await(); // 1
				if (socket == null)
					continue;

				// Process the request from this socket
				if (!setSocketOptions(socket) || !handler.process(socket)) {
					// Close socket
					try {
						socket.close();
					} catch (IOException e) {
					}
				}
				// Finish up this request
				socket = null;
				recycleWorkerThread(this);
			}

		}

		/**
		 * Start the background processing thread.
		 */
		public void start() {
			thread = new Thread(this);
			thread.setName(getName() + "-" + (++curThreads));
			thread.setDaemon(true);
			thread.start();
		}

	}

這個線程在assign任務之前是start的,看一下run()方法中的第1步調用了await()方法,在await()方法中由於available值默認為false,所以進入了while循環後讓出了線程鎖並等待assign()方法notifyAll()。

/**
		 * Await a newly assigned Socket from our Connector, or
		 * null if we are supposed to shut down.
		 */
		private synchronized Socket await() {

			// Wait for the Connector to provide a new Socket
			while (!available) {
				try {
					wait();
				} catch (InterruptedException e) {
				}
			}

			// Notify the Connector that we have received this Socket
			Socket socket = this.socket;
			available = false;
			notifyAll();

			return (socket);

		}	

當我們assign任務後,調用的assign()方法如下:

/**
		 * Process an incoming TCP/IP connection on the specified socket. Any
		 * exception that occurs during processing must be logged and swallowed.
		 * NOTE: This method is called from our Connector‘s thread. We
		 * must assign it to our own thread so that multiple simultaneous
		 * requests can be handled.
		 */
		synchronized void assign(Socket socket) {

			// Wait for the Processor to get the previous Socket
			while (available) {
				try {
					wait();
				} catch (InterruptedException e) {
				}
			}

			// Store the newly available Socket and notify our thread
			this.socket = socket;
			available = true;
			notifyAll();

		}

沒有進入while循環,置available為true後notifyAll()。這樣await()方法就跳出循環並置available為false後返回一個局部變量socket(為什麽要返回一個局部變量socket呢?),這樣run()方法就可以開始往下走了,完成後調用recycleWorkerThread()方法進行線程回收。 

這個run()方法再次進入while循環,調用await()方法後,由於await()方法在之前跳出循環時將available設置為false,所以就進入了讓鎖等待,等待請求線程調用assign()方法指定任務,這樣就回到了開始敘述的地方了。

為什麽在await()方法中使用局部變量socket呢?

摘自深入剖析Tomcat:因為使用局部變量可以在當前Socket對象處理完之前,繼續接收下一個Socket對象。 

個人認為是怕在run()方法運行的過程中其它線程調用這個Worker對象的assign()方法,畢竟這個對象的引用是可以被其它線程獲取到的。為什麽可以調用assign()方法重新指定呢?因為run()方法沒有加synchronized關鍵字,所以不能與assign()方法互斥訪問socket資源。還是為了安全性吧。

  

  

  

  

  

  

 

劍指架構師系列-tomcat6通過偽異步實現connector