1. 程式人生 > >java實現簡單web容器(執行緒池)

java實現簡單web容器(執行緒池)

 執行緒池ThreadPool.java

package webserver;

import java.util.Collection;
import java.util.Iterator;
import java.util.Vector;
/**
 * 執行緒池
 * @author ShaoJiang
 *
 */
public class ThreadPool {

	protected int maxPoolSize;//最大執行緒數
	protected int initPoolSize;//初始化執行緒數
	protected Vector<UserThread> threads = new Vector<UserThread>();//存放執行緒的向量
	protected boolean initialized = false;//執行緒池初始化狀態
	protected boolean hasIdleThread = false;

	/**
	 * 初始化執行緒池
	 * @param maxPoolSize --最大執行緒數
	 * @param initPoolSize --初始化執行緒數
	 */
	public ThreadPool(int maxPoolSize, int initPoolSize) {
		this.maxPoolSize = maxPoolSize;
		this.initPoolSize = initPoolSize;
		System.out.println("建立執行緒池...\r\n最大執行緒數為:"+this.maxPoolSize+"\r\n初始化執行緒數為:"+this.initPoolSize);
	}

	/**
	 * 執行緒池的初始化,建立活動執行緒新增到執行緒向量
	 */
	public void init() {
		initialized = true;//設定執行緒池初始化狀態為true
		for (int i = 0; i < initPoolSize; i++) {
			UserThread thread = new UserThread(this);//初始化使用者執行緒
			thread.start();//啟動使用者執行緒,此時執行緒處於等待狀態
			threads.add(thread);//將活動執行緒新增到執行緒池
		}
		System.out.println("初始化執行緒池...\r\n是否有閒置執行緒:"+hasIdleThread);
	}

	/**
	 * 設定最大連線數
	 * @param maxPoolSize
	 */
	public void setMaxPoolSize(int maxPoolSize) {
		this.maxPoolSize = maxPoolSize;
		if (maxPoolSize < getPoolSize())
			setPoolSize(maxPoolSize);
	}

	/**
	 * 重設當前執行緒數 若需殺掉某執行緒,執行緒不會立刻殺掉,而會等到執行緒中的事務處理完成 但此方法會立刻從執行緒池中移除該執行緒,不會等待事務處理結束
	 * 
	 * @param size
	 */
	public void setPoolSize(int size) {
		if (!initialized) {
			initPoolSize = size;
			return;
		} else if (size > getPoolSize()) {
			for (int i = getPoolSize(); i < size && i < maxPoolSize; i++) {
				UserThread thread = new UserThread(this);
				thread.start();
				threads.add(thread);
			}
		} else if (size < getPoolSize()) {
			while (getPoolSize() > size) {
				UserThread th = (UserThread) threads.remove(0);
				th.kill();
			}
		}
	}

	/**
	 * 獲得執行緒池大小
	 * @return
	 */
	public int getPoolSize() {
		return threads.size();
	}

	/**
	 * 提示有閒置執行緒
	 */
	protected void notifyForIdleThread() {
		hasIdleThread = true;
	}

	protected boolean waitForIdleThread() {
		hasIdleThread = false;
		while (!hasIdleThread && getPoolSize() >= maxPoolSize) {
			try {
				Thread.sleep(5);
			} catch (InterruptedException e) {
				return false;
			}
		}
		return true;
	}

	/**
	 * 獲取閒置執行緒
	 * @return
	 */
	public synchronized UserThread getIdleThread() {
		while (true) {
			for (Iterator<UserThread> itr = threads.iterator(); itr.hasNext();) {
				UserThread th = (UserThread) itr.next();
				if (!th.isRunning())
					return th;
			}
			if (getPoolSize() < maxPoolSize) {
				UserThread thread = new UserThread(this);
				thread.start();
				threads.add(thread);
				return thread;
			}
			if (waitForIdleThread() == false)
				return null;
		}
	}
	
	/**
	 * 獲取一個閒置執行緒,執行任務
	 * @param task
	 */
	public void processTask(ThreadTask task) {
		UserThread th = getIdleThread();
		System.out.println("執行任務執行緒id:"+th.getId());
		if (th != null) {
			th.putTask(task);//新增任務
			th.startTasks();//結束等待執行任務
		}
		System.out.println("任務執行完畢...\r\n執行緒池當前執行緒數:"+getPoolSize());
	}

	/**
	 * 獲取一個閒置執行緒,執行一組任務
	 * @param tasks
	 */
	public void processTasksInSingleThread(ThreadTask[] tasks) {
		UserThread th = getIdleThread();
		if (th != null) {
			th.putTasks(tasks);
			th.startTasks();
		}
	}
	
	/**
	 * 獲取一個閒置執行緒,執行一組任務
	 * @param tasks
	 */
	public void processTasksInSingleThread(Collection<ThreadTask> tasks) {
		UserThread th = getIdleThread();
		if (th != null) {
			th.putTasks(tasks);
			th.startTasks();
		}
	}
}


使用者執行緒UserThread.java

package webserver;

import java.util.Collection;
import java.util.Vector;

/**
 * 使用者執行緒
 * @author ShaoJiang
 *
 */
public class UserThread extends Thread {

	protected Vector<ThreadTask> tasks = new Vector<ThreadTask>();//任務佇列
	protected boolean running = false;//控制執行緒執行狀態
	protected boolean stopped = false;//任務停止狀態
	protected boolean paused = false;//任務暫停狀態
	protected boolean killed = false;//當前執行緒是否被殺死
	private ThreadPool pool;//執行緒池

	public UserThread(ThreadPool pool) {
		this.pool = pool;
	}

	/**
	 * 新增任務
	 * @param task
	 */
	public void putTask(ThreadTask task) {
		tasks.add(task);
	}

	public void putTasks(ThreadTask[] tasks) {
		for (int i = 0; i < tasks.length; i++)
			this.tasks.add(tasks[i]);
	}

	public void putTasks(Collection<ThreadTask> tasks) {
		this.tasks.addAll(tasks);
	}

	/**
	 * 移除隊頭任務
	 * @return
	 */
	protected ThreadTask popTask() {
		if (tasks.size() > 0)
			return (ThreadTask) tasks.remove(0);
		else
			return null;
	}

	/**
	 * 返回任務執行狀態
	 * @return
	 */
	public boolean isRunning() {
		return running;
	}

	/**
	 * 任務停止狀態
	 */
	public void stopTasks() {
		stopped = true;
	}

	public void stopTasksSync() {
		stopTasks();
		while (isRunning()) {
			try {
				sleep(5);
			} catch (InterruptedException e) {
			}
		}
	}

	/**
	 * 暫停任務
	 */
	public void pauseTasks() {
		paused = true;
	}

	public void pauseTasksSync() {
		pauseTasks();
		while (isRunning()) {
			try {
				sleep(5);
			} catch (InterruptedException e) {
			}
		}
	}

	/**
	 * 殺死當前執行緒
	 */
	public void kill() {
		if (!running)
			interrupt();
		else
			killed = true;
	}

	public void killSync() {
		kill();
		while (isAlive()) {
			try {
				sleep(5);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * 啟動任務,喚醒執行緒
	 */
	public synchronized void startTasks() {
		running = true;
		this.notify();
	}

	/**
	 * 執行執行緒執行任務
	 */
	public synchronized void run() {
		try {
			while (true) {
				if (!running || tasks.size() == 0) {
					pool.notifyForIdleThread();	
					this.wait();
				} else {
					ThreadTask task;
					while ((task = popTask()) != null) {
						task.run();
						if (stopped) {
							stopped = false;
							if (tasks.size() > 0) {
								tasks.clear();
								System.out.println(Thread.currentThread()
										.getId() + ": Tasks are stopped");
								break;
							}
						}
						if (paused) {
							paused = false;
							if (tasks.size() > 0) {
								System.out.println(Thread.currentThread()
										.getId() + ": Tasks are paused");
								break;
							}
						}
					}
					running = false;
				}
				if (killed) {
					killed = false;
					break;
				}
			}
		} catch (InterruptedException e) {
			return;
		}
	}
}

任務介面ThreadTask.java 

package webserver;

/**
 * 執行緒任務介面
 * @author ShaoJiang
 *
 */
public interface ThreadTask {
	public void run();
}

連線任務ConnectionTask .java

package webserver;

import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.Socket;

/**
 * 連線任務
 * @author ShaoJiang
 *
 */
class ConnectionTask implements ThreadTask {

    Socket client; // 連線Web瀏覽器的socket字
    public ConnectionTask(Socket cl) {
        client = cl;
    }
    
    @SuppressWarnings("deprecation")
	public void run() {
        try {
            PrintStream outstream = new PrintStream(client.getOutputStream());
            DataInputStream instream = new DataInputStream(client.getInputStream());
            String inline = instream.readLine(); // 讀取Web瀏覽器提交的請求資訊,主要是這裡,獲取協議
            System.out.println("Received:" + inline);
            if (getrequest(inline)) { // 如果是GET請求
                String filename = getFilename(inline);
                File file = new File(filename);
                System.out.println("File Path:"+file.getAbsolutePath());
                if (file.exists()) {
                    // 若檔案存在,則將檔案送給Web瀏覽器
                    outstream.println("HTTP/1.0 200 OK");
                    outstream.println("MIME_version:1.0");
                    outstream.println("Content_Type:text/html");
                    int len = (int) file.length();
                    outstream.println("Content_Length:" + len);
                    outstream.println("");
                    sendFile(outstream, file); // 傳送檔案
                    outstream.flush();
                } else {
                    // 檔案不存在時
                    String notfound = "<html><head><title>Not Found</title></head><body><h1>Error 404-file not found</h1></body></html>";
                    outstream.println("HTTP/1.0 404 no found");
                    outstream.println("Content_Type:text/html");
                    outstream.println("Content_Length:" + notfound.length() + 2);
                    outstream.println("");
                    outstream.println(notfound);
                    outstream.flush();
                }
            }    
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //獲取請求型別是否為“GET”
    boolean getrequest(String s) {
        if (s.length() > 0) {
            if (s.substring(0, 3).equalsIgnoreCase("GET")) {
                return true;
            }
        }
        return false;
    }

    //獲取要訪問的檔名 
    String getFilename(String s) {
        String f = s.substring(s.indexOf(' ') + 1);
        f = f.substring(0, f.indexOf(' '));
        try {
            if (f.charAt(0) == '/') {
                f = f.substring(1);
            }
        } catch (StringIndexOutOfBoundsException e) {
            System.out.println("Exception:" + e);
        }
        if (f.equals("")) {
            f = "index.html";//設定預設首頁
        }
        return f;
    }

    //把指定檔案傳送給Web瀏覽器 
    void sendFile(PrintStream outs, File file) {
        try {
            DataInputStream in = new DataInputStream(new FileInputStream(file));
            int len = (int) file.length();
            byte buf[] = new byte[len];
            in.readFully(buf);
            outs.write(buf, 0, len);
            outs.flush();
            in.close();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

測試程式:

package webserver;

import java.net.ServerSocket;
import java.net.Socket;

public class WebServer {
    public static void main(String args[]) {
        int PORT = 8070;
        ServerSocket server = null;
        Socket client = null;
        ThreadPool pool = new ThreadPool(3, 2);
        pool.init();
        try {
            server = new ServerSocket(PORT);
            System.out.println("伺服器正在監聽埠:"+ server.getLocalPort());
            while(true) {
            	client = server.accept(); // 接受客戶機的連線請求
            	ThreadTask task = new ConnectionTask(client); 
            	pool.processTask(task); 
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}