java實現簡單web容器(執行緒池)
阿新 • • 發佈:2019-02-10
執行緒池ThreadPool.java
package webserver; import java.util.Collection; import java.util.Iterator; import java.util.Vector; /** * 執行緒池 * @author ShaoJiang * */ public class ThreadPool { protected int maxPoolSize;//最大執行緒數 protected int initPoolSize;//初始化執行緒數 protected Vector<UserThread> threads = new Vector<UserThread>();//存放執行緒的向量 protected boolean initialized = false;//執行緒池初始化狀態 protected boolean hasIdleThread = false; /** * 初始化執行緒池 * @param maxPoolSize --最大執行緒數 * @param initPoolSize --初始化執行緒數 */ public ThreadPool(int maxPoolSize, int initPoolSize) { this.maxPoolSize = maxPoolSize; this.initPoolSize = initPoolSize; System.out.println("建立執行緒池...\r\n最大執行緒數為:"+this.maxPoolSize+"\r\n初始化執行緒數為:"+this.initPoolSize); } /** * 執行緒池的初始化,建立活動執行緒新增到執行緒向量 */ public void init() { initialized = true;//設定執行緒池初始化狀態為true for (int i = 0; i < initPoolSize; i++) { UserThread thread = new UserThread(this);//初始化使用者執行緒 thread.start();//啟動使用者執行緒,此時執行緒處於等待狀態 threads.add(thread);//將活動執行緒新增到執行緒池 } System.out.println("初始化執行緒池...\r\n是否有閒置執行緒:"+hasIdleThread); } /** * 設定最大連線數 * @param maxPoolSize */ public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; if (maxPoolSize < getPoolSize()) setPoolSize(maxPoolSize); } /** * 重設當前執行緒數 若需殺掉某執行緒,執行緒不會立刻殺掉,而會等到執行緒中的事務處理完成 但此方法會立刻從執行緒池中移除該執行緒,不會等待事務處理結束 * * @param size */ public void setPoolSize(int size) { if (!initialized) { initPoolSize = size; return; } else if (size > getPoolSize()) { for (int i = getPoolSize(); i < size && i < maxPoolSize; i++) { UserThread thread = new UserThread(this); thread.start(); threads.add(thread); } } else if (size < getPoolSize()) { while (getPoolSize() > size) { UserThread th = (UserThread) threads.remove(0); th.kill(); } } } /** * 獲得執行緒池大小 * @return */ public int getPoolSize() { return threads.size(); } /** * 提示有閒置執行緒 */ protected void notifyForIdleThread() { hasIdleThread = true; } protected boolean waitForIdleThread() { hasIdleThread = false; while (!hasIdleThread && getPoolSize() >= maxPoolSize) { try { Thread.sleep(5); } catch (InterruptedException e) { return false; } } return true; } /** * 獲取閒置執行緒 * @return */ public synchronized UserThread getIdleThread() { while (true) { for (Iterator<UserThread> itr = threads.iterator(); itr.hasNext();) { UserThread th = (UserThread) itr.next(); if (!th.isRunning()) return th; } if (getPoolSize() < maxPoolSize) { UserThread thread = new UserThread(this); thread.start(); threads.add(thread); return thread; } if (waitForIdleThread() == false) return null; } } /** * 獲取一個閒置執行緒,執行任務 * @param task */ public void processTask(ThreadTask task) { UserThread th = getIdleThread(); System.out.println("執行任務執行緒id:"+th.getId()); if (th != null) { th.putTask(task);//新增任務 th.startTasks();//結束等待執行任務 } System.out.println("任務執行完畢...\r\n執行緒池當前執行緒數:"+getPoolSize()); } /** * 獲取一個閒置執行緒,執行一組任務 * @param tasks */ public void processTasksInSingleThread(ThreadTask[] tasks) { UserThread th = getIdleThread(); if (th != null) { th.putTasks(tasks); th.startTasks(); } } /** * 獲取一個閒置執行緒,執行一組任務 * @param tasks */ public void processTasksInSingleThread(Collection<ThreadTask> tasks) { UserThread th = getIdleThread(); if (th != null) { th.putTasks(tasks); th.startTasks(); } } }
使用者執行緒UserThread.java
package webserver; import java.util.Collection; import java.util.Vector; /** * 使用者執行緒 * @author ShaoJiang * */ public class UserThread extends Thread { protected Vector<ThreadTask> tasks = new Vector<ThreadTask>();//任務佇列 protected boolean running = false;//控制執行緒執行狀態 protected boolean stopped = false;//任務停止狀態 protected boolean paused = false;//任務暫停狀態 protected boolean killed = false;//當前執行緒是否被殺死 private ThreadPool pool;//執行緒池 public UserThread(ThreadPool pool) { this.pool = pool; } /** * 新增任務 * @param task */ public void putTask(ThreadTask task) { tasks.add(task); } public void putTasks(ThreadTask[] tasks) { for (int i = 0; i < tasks.length; i++) this.tasks.add(tasks[i]); } public void putTasks(Collection<ThreadTask> tasks) { this.tasks.addAll(tasks); } /** * 移除隊頭任務 * @return */ protected ThreadTask popTask() { if (tasks.size() > 0) return (ThreadTask) tasks.remove(0); else return null; } /** * 返回任務執行狀態 * @return */ public boolean isRunning() { return running; } /** * 任務停止狀態 */ public void stopTasks() { stopped = true; } public void stopTasksSync() { stopTasks(); while (isRunning()) { try { sleep(5); } catch (InterruptedException e) { } } } /** * 暫停任務 */ public void pauseTasks() { paused = true; } public void pauseTasksSync() { pauseTasks(); while (isRunning()) { try { sleep(5); } catch (InterruptedException e) { } } } /** * 殺死當前執行緒 */ public void kill() { if (!running) interrupt(); else killed = true; } public void killSync() { kill(); while (isAlive()) { try { sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 啟動任務,喚醒執行緒 */ public synchronized void startTasks() { running = true; this.notify(); } /** * 執行執行緒執行任務 */ public synchronized void run() { try { while (true) { if (!running || tasks.size() == 0) { pool.notifyForIdleThread(); this.wait(); } else { ThreadTask task; while ((task = popTask()) != null) { task.run(); if (stopped) { stopped = false; if (tasks.size() > 0) { tasks.clear(); System.out.println(Thread.currentThread() .getId() + ": Tasks are stopped"); break; } } if (paused) { paused = false; if (tasks.size() > 0) { System.out.println(Thread.currentThread() .getId() + ": Tasks are paused"); break; } } } running = false; } if (killed) { killed = false; break; } } } catch (InterruptedException e) { return; } } }
任務介面ThreadTask.java
package webserver;
/**
* 執行緒任務介面
* @author ShaoJiang
*
*/
public interface ThreadTask {
public void run();
}
連線任務ConnectionTask .java
package webserver; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.PrintStream; import java.net.Socket; /** * 連線任務 * @author ShaoJiang * */ class ConnectionTask implements ThreadTask { Socket client; // 連線Web瀏覽器的socket字 public ConnectionTask(Socket cl) { client = cl; } @SuppressWarnings("deprecation") public void run() { try { PrintStream outstream = new PrintStream(client.getOutputStream()); DataInputStream instream = new DataInputStream(client.getInputStream()); String inline = instream.readLine(); // 讀取Web瀏覽器提交的請求資訊,主要是這裡,獲取協議 System.out.println("Received:" + inline); if (getrequest(inline)) { // 如果是GET請求 String filename = getFilename(inline); File file = new File(filename); System.out.println("File Path:"+file.getAbsolutePath()); if (file.exists()) { // 若檔案存在,則將檔案送給Web瀏覽器 outstream.println("HTTP/1.0 200 OK"); outstream.println("MIME_version:1.0"); outstream.println("Content_Type:text/html"); int len = (int) file.length(); outstream.println("Content_Length:" + len); outstream.println(""); sendFile(outstream, file); // 傳送檔案 outstream.flush(); } else { // 檔案不存在時 String notfound = "<html><head><title>Not Found</title></head><body><h1>Error 404-file not found</h1></body></html>"; outstream.println("HTTP/1.0 404 no found"); outstream.println("Content_Type:text/html"); outstream.println("Content_Length:" + notfound.length() + 2); outstream.println(""); outstream.println(notfound); outstream.flush(); } } client.close(); } catch (IOException e) { e.printStackTrace(); } } //獲取請求型別是否為“GET” boolean getrequest(String s) { if (s.length() > 0) { if (s.substring(0, 3).equalsIgnoreCase("GET")) { return true; } } return false; } //獲取要訪問的檔名 String getFilename(String s) { String f = s.substring(s.indexOf(' ') + 1); f = f.substring(0, f.indexOf(' ')); try { if (f.charAt(0) == '/') { f = f.substring(1); } } catch (StringIndexOutOfBoundsException e) { System.out.println("Exception:" + e); } if (f.equals("")) { f = "index.html";//設定預設首頁 } return f; } //把指定檔案傳送給Web瀏覽器 void sendFile(PrintStream outs, File file) { try { DataInputStream in = new DataInputStream(new FileInputStream(file)); int len = (int) file.length(); byte buf[] = new byte[len]; in.readFully(buf); outs.write(buf, 0, len); outs.flush(); in.close(); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } }
測試程式:
package webserver;
import java.net.ServerSocket;
import java.net.Socket;
public class WebServer {
public static void main(String args[]) {
int PORT = 8070;
ServerSocket server = null;
Socket client = null;
ThreadPool pool = new ThreadPool(3, 2);
pool.init();
try {
server = new ServerSocket(PORT);
System.out.println("伺服器正在監聽埠:"+ server.getLocalPort());
while(true) {
client = server.accept(); // 接受客戶機的連線請求
ThreadTask task = new ConnectionTask(client);
pool.processTask(task);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}