1. 程式人生 > >java執行緒(超時等待+簡單資料庫連線池)

java執行緒(超時等待+簡單資料庫連線池)

超時等待模式在一般的等待/通知機制上新增超時控制,使得方法執行時間過長也不會一直造成阻塞,而是在一段時間後返回。

通常java連線資料庫時會將其執行過程交由一個執行緒進行處理,使得在一些I/O操作或需要一定時間執行的行為在後臺執行而客戶端能立刻對當前的動作做出下一個反應。例如客戶端的檔案下載上傳等往往另開一個執行緒,而主程式繼續監聽將要發生的動作,在與資料庫互動時,每個使用者獲得一個專屬的執行緒對其需要的內容進行修改和同步,在併發量較低的情況下,當然可以依次new出一個執行緒來,但一旦加大了使用者數量,或使用者頻繁地進行操作時,服務端反覆建立和回收也會造成比較大的開銷,加大系統的負載。(且通常會有很多一些小操作需要服務端快速地進行處理)

執行緒池的技術能比較合理地解決此問題,預先建立一定量的執行緒,不由使用者對執行緒直接進行建立控制,而是爭取資源獲得複用執行緒進行操作。可以消除頻繁建立和消亡所帶來的開銷(具體執行緒量和執行速度與多因素有關,在實際使用時應該視情況而定)。

等待超時的模式如下:(虛擬碼)

//同步物件鎖

public synchronized Object get(long mills) throws InterruptedException{

  long remaining=mills;

  long future=System.currentTimeMills()+mills;

......

...

 while((result==null)&&remaining>0){

wait(remaining);

remaining=future-System. currentTimeMills();

 }

return result;

}

remaining<=0 則為已經超時,退出。

————————————————————————————————————————————

簡單資料庫連線池例子:(引自java併發程式設計的藝術第4章)

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;

public class ConnectionDriver {
    static class ConnectionHandler implements InvocationHandler {
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getName().equals("commit")) {
                TimeUnit.MILLISECONDS.sleep(100);
            }
            return null;
        }
    }
    // 建立一個Connection的代理,在commit時休眠1秒
    public static final Connection createConnection() {
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(), new Class<?>[] { Connection.class },
            new ConnectionHandler());
    }
}
用ConnectionDriver建立返回connect連線,用於加入到執行緒池集合中
import java.sql.Connection;
import java.util.LinkedList;
public class ConnectionPool {
    private LinkedList<Connection> pool = new LinkedList<Connection>();
    public ConnectionPool(int initialSize) {
        if (initialSize > 0) {
            for (int i = 0; i < initialSize; i++) {
                pool.addLast(ConnectionDriver.createConnection());
            }
        }
    }
    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool) {
                // 新增後需要進行通知,這樣其他消費者能夠感知到連結池中已經歸還了一個連結
                pool.addLast(connection);
                pool.notifyAll();
            }
        }
    }
    // 在mills內無法獲取到連線,將會返回null
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool) {
            // 完全超時
            if (mills <= 0) {
                while (pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            } else {
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if (!pool.isEmpty()) {
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }
}
通過建構函式初始化連線的上限,用鏈式集合進行控制(LinkedList 在處理插入刪除方面具有較快效益)

在每返回一個連線之後,在集合中刪除,而每回收一個連線則通知所有尚在等待的使用者,此刻有空餘連線可供使用

若在超時時間範圍內依舊沒有現有連線可以返回則返回null。

import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class ConnectionPoolTest {
    static ConnectionPool pool  = new ConnectionPool(10);
    // 保證所有ConnectionRunner能夠同時開始
    static CountDownLatch start = new CountDownLatch(1);
    // main執行緒將會等待所有ConnectionRunner結束後才能繼續執行
    static CountDownLatch end;
    
    public static void main(String[] args) throws Exception {
        // 執行緒數量,可以執行緒數量進行觀察
        int threadCount = 50;
        end = new CountDownLatch(threadCount);
        int count = 20;
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new ConnetionRunner(count, got, notGot), "ConnectionRunnerThread");
            thread.start();
        }
        start.countDown();
        end.await();
        System.out.println("total invoke: " + (threadCount * count));
        System.out.println("got connection:  " + got);
        System.out.println("not got connection " + notGot);
    }

    static class ConnetionRunner implements Runnable {
        int           count;
        AtomicInteger got;
        AtomicInteger notGot;

        public ConnetionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        public void run() {
            try {
                start.await();
            } catch (Exception ex) {

            }
            while (count > 0) {
                try {
                    // 從執行緒池中獲取連線,如果1000ms內無法獲取到,將會返回null
                    // 分別統計連接獲取的數量got和未獲取到的數量notGot
                    Connection connection = pool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        }  finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        notGot.incrementAndGet();
                    }
                } catch (Exception ex) {
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }
}
執行結果:(執行緒數量threadCount與每個訪問次數count不同,會影響結果成功與失敗的比例)

total invoke: 1000
got connection:  836
not got connection 164