1. 程式人生 > >Java中Semaphore(信號量) 數據庫連接池

Java中Semaphore(信號量) 數據庫連接池

each jdb 同步方法 [] pop 線程 emp use builder

計數信號量用來控制同時訪問某個特定資源的操作數或同時執行某個指定操作的數量

A counting semaphore.Conceptually, a semaphore maintains a set of permits. Each acquire blocks if necessary until a permit is available, and then takes it. Each release adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.

從概念上來說,Semaphore中維護了一組許可,許可的數量在構造函數中指定。acquire方法將獲取一個可用的許可,如果沒有可用的許可,該方法會被阻塞,直到Semaphore中有可用的許可。release方法釋放一個許可,如果此時存在阻塞中的acqure方法,將釋放一個阻塞中的acquire

事實上,Semaphore中只維護可用請求數量,並不包含實際的請求對象

示例一:數據庫連接池

在初始化Semaphore時可以設置其公平性,如果為公平Semaphore,則按照請求時間獲得許可,即先發送的請求先獲得許可,如果為非公平Semaphore,則先發送的請求未必先獲得許可,這有助於提高程序的吞吐量,但是有可能導致某些請求始終獲取不到許可(tryAcquire方法不使用公平性設置)

[java] view plain copy
  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.util.HashMap;
  4. import java.util.LinkedList;
  5. import java.util.Map;
  6. import java.util.concurrent.Semaphore;
  7. public class MyConnPool {
  8. private LinkedList<Connection> unusedConns =
  9. new LinkedList<Connection>();
  10. //釋放連接時對查找性能要求較高,故使用哈希表
  11. private Map<Connection,String> usedConns =
  12. new HashMap<Connection,String>();
  13. private final Semaphore available;
  14. public MyConnPool(int size) throws Exception{
  15. StringBuilder builder = new StringBuilder();
  16. builder.append("-----pool-----\n");
  17. available = new Semaphore(size, true);//公平性Semaphore
  18. String url = "jdbc:mysql://ip:port/name?user=user&password=pwd";
  19. for(int i = 0 ; i < size ; i++){
  20. Connection conn = DriverManager.getConnection(url);
  21. unusedConns.add(conn);
  22. builder.append("conn-" + i + ":" + conn.hashCode() + "\n");
  23. }
  24. builder.append("--------------\n");
  25. System.out.print(builder.toString());
  26. }
  27. public Connection getConn() throws InterruptedException{
  28. //獲取Semaphore中的許可
  29. available.acquire();
  30. Connection conn = null;
  31. synchronized(this){
  32. conn = unusedConns.removeFirst();
  33. usedConns.put(conn, "");
  34. System.out.println(Thread.currentThread().getName()
  35. + ":" + conn.hashCode() + "[got]");
  36. System.out.println(display());
  37. }
  38. return conn;
  39. }
  40. public void close(Connection conn){
  41. synchronized(this){
  42. if(usedConns.containsKey(conn)){
  43. usedConns.remove(conn);
  44. unusedConns.addLast(conn);
  45. System.out.println(Thread.currentThread().getName()
  46. + ":" + conn.hashCode() + "[closed]");
  47. System.out.println(display());
  48. }
  49. }
  50. //釋放線程獲取的許可
  51. available.release();
  52. }
  53. private final synchronized String display(){
  54. String str = "";
  55. if(unusedConns.size() > 0){
  56. str = "";
  57. for(Connection conn : unusedConns){
  58. str += conn.hashCode() + "|";
  59. }
  60. }
  61. if(!str.equals(""))
  62. return str;
  63. else
  64. return "empty";
  65. }
  66. }
[java] view plain copy
  1. import java.sql.Connection;
  2. import java.util.concurrent.CountDownLatch;
  3. public class Test implements Runnable{
  4. private static CountDownLatch latch
  5. = new CountDownLatch(1);
  6. private MyConnPool pool;
  7. public Test(MyConnPool pool){
  8. this.pool = pool;
  9. }
  10. @Override
  11. public void run(){
  12. try {
  13. latch.await();
  14. Connection conn = pool.getConn();
  15. Thread.sleep(1*1000);
  16. pool.close(conn);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. public static void main(String[] args) throws Exception{
  22. MyConnPool pool = new MyConnPool(2);
  23. for(int i = 0 ; i < 4 ; i++){
  24. Thread t = new Thread(new Test(pool));
  25. t.start();
  26. }
  27. //保證4個線程同時運行
  28. latch.countDown();
  29. }
  30. }

運行結果如下:

[plain] view plain copy
  1. -----pool-----
  2. conn-0:11631043
  3. conn-1:14872264
  4. --------------
  5. Thread-4:11631043[got]
  6. 14872264|
  7. Thread-1:14872264[got]
  8. empty
  9. Thread-4:11631043[closed]
  10. 11631043|
  11. Thread-2:11631043[got]
  12. empty
  13. Thread-1:14872264[closed]
  14. 14872264|
  15. Thread-3:14872264[got]
  16. empty
  17. Thread-2:11631043[closed]
  18. 11631043|
  19. Thread-3:14872264[closed]
  20. 11631043|14872264|

特別註意如果getConn方法和close方法都為同步方法,將產生死鎖:

[java] view plain copy
  1. public synchronized Connection getConn() throws InterruptedException{
  2. ......
  3. }
  4. public synchronized void close(Connection conn){
  5. ......
  6. }

同一時刻只能有一個線程調用連接池的getConn方法或close方法,當Semaphore中沒有可用的許可,並且此時恰好有一個線程成功調用連接池的getConn方法,則該線程將一直阻塞在acquire方法上,其它線程將沒有辦法獲取連接池上的鎖並調用close方法釋放許可,程序將會卡死

阻塞方法上不要加鎖,否則將導致鎖長時間不釋放,如果該鎖為互斥鎖,將導致程序卡住

acquire方法本身使用樂觀鎖實現,也不需要再加互斥鎖

示例二:不可重入互斥鎖

[java] view plain copy
  1. import java.util.concurrent.CountDownLatch;
  2. import java.util.concurrent.Semaphore;
  3. public class Test implements Runnable{
  4. private static CountDownLatch latch =
  5. new CountDownLatch(1);
  6. private static Semaphore lock =
  7. new Semaphore(1, true);
  8. @Override
  9. public void run(){
  10. try {
  11. latch.await();
  12. this.work();
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. private void work() throws InterruptedException{
  18. lock.acquire();
  19. System.out.println("Locking by "
  20. + Thread.currentThread().getName());
  21. Thread.sleep(1*1000);
  22. lock.release();
  23. }
  24. public static void main(String[] args) throws Exception{
  25. for(int i = 0 ; i < 4 ; i++){
  26. Thread t = new Thread(new Test());
  27. t.start();
  28. }
  29. //保證4個線程同時運行
  30. latch.countDown();
  31. }
  32. }

運行結果如下:

[plain] view plain copy
  1. Locking by Thread-3
  2. Locking by Thread-0
  3. Locking by Thread-1
  4. Locking by Thread-2

Java中Semaphore(信號量) 數據庫連接池