1. 程式人生 > >java之實現自己的資料庫連線池

java之實現自己的資料庫連線池

最近仿mybatis寫了一個自己的orm框架 專案已上傳到github上 https://github.com/skybluehhx/MYORM.git,既然是orm框架肯定需要事務管理器和資料庫連線池,下面將介紹我自己實現一個連線池 (主要藉助阻塞佇列)

首先定義一個介面,給出執行緒池的基本功能

package Pools;

import java.sql.Connection;

/**
 * Created by zoujianglin
 * 2018/8/25 0025.
 */
public interface Pool {

    //獲取連線池中的“連線”
    PoolConnection getPoolConnection();


    boolean relasePoolConnection(PoolConnection connection);

    //連線池大小
    int getPoolSize();

    //銷燬連線池
    boolean destroy();

    //釋放連線,歸還連線,使用該方法並不是銷燬連線而是
    //重新歸回執行緒池,共其他執行緒複用
    boolean relaseConnection(Connection connection);

}

在這裡我將連線池中的連線做了一個抽象,主要是方便後續擴充套件,如標定每個連線所屬的事務管理器等等

具體定義如下

package Pools;


import java.sql.Connection;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by zoujianglin
 * 2018/8/25 0025.
 * PoolConnection為連線池對連線的抽象
 */
public class PoolConnection {

    //表示連線的唯一標識,
    private long id = 0;

    private AtomicLong Iid = new AtomicLong(0);
    // 維持著資料庫連線
    private Connection connection;
    //標誌著該連線是否被其他執行緒使用
    // false表示連線不可用,表明連線已經被佔用
    //true表示該連線未被佔用,可以正常使用
    private volatile boolean isAvailable;
    //管理該連線的事務管理器,只有從事務管理器中獲取的連線才會被設定 暫未使用
    // private TransactionManage transactionManage;

    //建立時,不帶引數預設沒有被使用
    public PoolConnection(Connection connection) {
        this(connection, true);
    }

    public PoolConnection(Connection connection, boolean isAvailable) {
        this.connection = connection;
        this.isAvailable = isAvailable;
        this.id = Iid.getAndIncrement();
    }

    @Override
    public String toString() {
        return "PoolConnection{" +
                "connection=" + connection +
                ", isAvailable=" + isAvailable + "connection=" + id +
                '}';
    }

    public long getId() {
        return id;
    }

    public Connection getConnection() {
        return connection;
    }

    public boolean isAvailable() {
        return isAvailable;
    }


}

//接著是basePoolt提供了對連線池的基本實現 該類有點複雜,比較重要的方法一般都做了註釋,具體實現如下

package Pools;

import ORMException.DestoryPoolException;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by zoujianglin
 * 2018/8/25 0025.
 * 提供連線池的基本功能
 */
@Component
public abstract class BasePool implements Pool {

    private static Logger logger = Logger.getLogger(BasePool.class);
    //資料來源,獲取連線資料庫的基本資訊
    @Autowired
    private DataSource dataSource;
    //用來持有連線池連線
    private final BlockingQueue<PoolConnection> blockingQueue;

    //資料庫持有連線數量,初始值為0;
    private AtomicInteger poolSize = new AtomicInteger(0);
    //擴容標誌,當為true時,表明連線池正在擴容,
    //一個執行緒池我們設定只有一個執行緒進行擴容,後面想了想多個執行緒同時擴容也沒事,所以註釋掉
    //private AtomicBoolean isDilatation = new AtomicBoolean(false);

    //初始化標誌 只能被初始化一次
    private AtomicBoolean isInitialization;
    //銷燬標誌
    private AtomicBoolean isDestroy = new AtomicBoolean(false);
    //預設重試次數,當資料庫獲取連線失敗時,將會重試,預設
    //重試次數為連線池規定連線數,當為0時將不會重試,
    private int tryTimes;
    //增長的步伐,從連線池中獲取連線時,如果沒有
    //獲取到連線而連線數小於最大連線時將以該步長增長連線數
    //預設步長為4
    private int step;

    public BasePool(DataSource dataSource, BlockingQueue blockingQueue) {
        this(dataSource, blockingQueue, dataSource.getMinConnection(), 4);
    }

    public BasePool(DataSource dataSource, BlockingQueue blockingQueue, int step) {
        this(dataSource, blockingQueue, dataSource.getMinConnection(), step);
    }

    public BasePool(DataSource dataSource, BlockingQueue<PoolConnection> blockingQueue, int times, int step) {
        this.dataSource = dataSource;
        this.blockingQueue = blockingQueue;
        this.tryTimes = times;
        this.isInitialization = new AtomicBoolean(false);
        this.step = step;
        init();
    }
/**
 public PoolConnection getConnection(long timeout, TimeUnit unit) {

 return null;
 }
 **/
    /**
     * 獲取連線,獲取連線時並不能保證當連線池尺寸小於設定的最大連線數時
     * 能立馬獲取到連線,但能保證最終獲取連線
     *
     * @return
     */
    public PoolConnection getPoolConnection() {
        if (!isDestroy.get()) {
            try {

                PoolConnection poolConnection = blockingQueue.poll();
                if (poolConnection == null) { //判斷執行緒池大小是否達到最大
                    if (poolSize.get() >= dataSource.getMaxConnection()) {//達到最大阻塞等待
                        poolConnection = blockingQueue.take();
                    } else {

                        //if (isDilatation.compareAndSet(false, true)) {
                        //進行判斷,並確保每次只有一個執行緒進行擴增操作
                        int currentSize;
                        int afterSize;
                        int newSize;
                        while (true) {
                            currentSize = poolSize.get();
                            afterSize = currentSize + step;
                            newSize = afterSize > dataSource.getMaxConnection() ? dataSource.getMaxConnection() : afterSize;
                            int addNum = newSize - currentSize;
                            //確保只有一個執行緒進行擴容執行緒池
                            if (poolSize.compareAndSet(currentSize, newSize)) {
                                for (int i = 0; i < addNum; i++) {
                                    //這裡為提高效率可以返回第一個PoolConnecton,其他連線操作可以交由另一個執行緒操作
                                    blockingQueue.add(getNewOneConnection(dataSource.getUrl(),
                                            dataSource.getUserName(), dataSource.getPassword()));
                                }
                                break;
                            }
                            //isDilatation.set(false);看前面關於isDilatation屬性的介紹,後發現多執行緒擴容也沒事

                        }
                        return blockingQueue.take();
                        //} else { //有執行緒在擴容,直接阻塞等待返回
                        //  return blockingQueue.take();
                        //}
                    }

                }
                return poolConnection;

            } catch (InterruptedException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }
        throw new DestoryPoolException("連線池已被毀壞");

    }


    public int getPoolSize() {
        return poolSize.get();
    }

    public int getFreeConnectioNums() {
        return blockingQueue.size();
    }

    /**
     *
     * 歸還連線,
     *
     * @param connection
     * @return
     */
    public boolean relasePoolConnection(PoolConnection connection) {

        blockingQueue.add(connection);
        return true;
    }

    //釋放連線,正如你所料的 我們依靠poolSize來確保連線池大小
    //所以在釋放連線失敗時,poolSize的值應該減一,為了避免被錯誤的
    //使用,我們增加限制,傳入空值時,將會丟擲異常,這裡並沒有
    //做強制的保證,如果使用者歸還的連線不是連線池中的連線 我們也會
    //確保它歸還成功,錯誤的使用該方法該會造成連線池實際數量
    //大於現有數量,其實可以使用一個ThrealLocal 儲存一個標誌
    //當執行緒獲取連線時,將標誌設定為true,只有帶有標誌的執行緒才能歸還
    //並且歸還後需要重新置為false,考慮到該執行緒池,是內建使用,
    //這裡並沒有實現該功能,如果需求特殊,後續考慮補加
    public boolean relaseConnection(Connection connection) {
        if (connection == null) {
            throw new RuntimeException("請確保釋放的連線不為空");
        }
        //在歸還前 確保歸還的連線可用,
        boolean falg = false; //poolSize 減一是否成功的標誌
        try {

            if (connection.isClosed()) { //連線已關閉,但連線池大小小於規定最小數目
                //連線釋放失敗,直接尺寸減一
                poolSize.getAndDecrement();
                falg = true;
                return false;
            }
        } catch (SQLException e) {
            e.printStackTrace();

        } finally {
            if (!falg) {
                poolSize.getAndDecrement();
            }
        }
        return relasePoolConnection(new PoolConnection(connection));
    }

    public boolean destroy() {
        return false;
    }


    public void init() {
        //沒有被初始化才進行初始化
        if (isInitialization.compareAndSet(false, true)) {
            logger.error("開始初始化執行緒池");
            try {
                Class driver = Class.forName(dataSource.getDriverClassName());
                DriverManager.registerDriver((Driver) driver.newInstance());
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
                throw new RuntimeException(e + "資料庫驅動類錯誤");
            } catch (SQLException e) {
                e.printStackTrace();
                throw new RuntimeException(e + "註冊資料庫驅動失敗");
            } catch (IllegalAccessException e) {
                e.printStackTrace();
                throw new RuntimeException(e + "註冊資料庫驅動失敗");
            } catch (InstantiationException e) {
                e.printStackTrace();
                throw new RuntimeException(e + "註冊資料庫驅動失敗");
            }
            String url = dataSource.getUrl();
            String userName = dataSource.getUserName();
            String password = dataSource.getPassword();
            for (int i = 0; i < dataSource.getMinConnection(); i++) {
                try {
                    Connection connection = DriverManager.getConnection(url, userName, password);
                    blockingQueue.add(new PoolConnection(connection));
                    poolSize.getAndIncrement();
                    //將連線放入阻塞佇列
                } catch (SQLException e) {
                    logger.error("獲取一條資料庫連線失敗", e);
                    //補入重試機制,確保資料庫連線能夠完成
                    while (tryTimes > 0) {
                        try {
                            Connection connection = DriverManager.getConnection(url, userName, password);
                            blockingQueue.add(new PoolConnection(connection));
                            poolSize.getAndIncrement();
                        } catch (SQLException e1) {
                            e1.printStackTrace();
                            logger.error("重試時獲取一條資料庫連線失敗", e);
                        }
                        tryTimes--;
                        logger.error("重試次數剩餘" + tryTimes, e);
                    }

                }


            }


        } else {
            logger.warn("執行緒池正在被或已經被初始化,一個執行緒池只能被初始化一次");
            throw new RuntimeException("請確保執行緒池只被初始化一次");
        }

    }

    private PoolConnection getNewOneConnection(String url, String userName, String password) {

        try {
            Connection connection = DriverManager.getConnection(url, userName, password);
            return new PoolConnection(connection);
        } catch (SQLException e) {
            e.printStackTrace();
            while (tryTimes > 0) { //重試
                try {
                    Connection connection = DriverManager.getConnection(url, userName, password);
                    return new PoolConnection(connection);
                } catch (SQLException e1) {
                    e1.printStackTrace();
                    logger.error("重試時獲取一條資料庫連線失敗", e);
                }
                tryTimes--;
                logger.error("重試次數剩餘" + tryTimes, e);
            }
            throw new RuntimeException("重試次數用光,獲取連線失敗");
        }

    }

}

其中該類中涉及到的資料來源結構如下

package Pools;

/**
 * Created by zoujianglin
 * 2018/8/25 0025.
 * <p>
 * 資料來源 保留著資料庫連線的基本配置
 */
public class DataSource {

    private  String driverClassName;
    private  String userName;
    private  String password;
    private  String url;
    private int maxConnection = 10;
    private int minConnection = 20;
    private int timeout;


    public DataSource(){

    }



    protected DataSource(String driverClassName, String userName, String password, String url) {
        this.driverClassName = driverClassName;
        this.userName = userName;
        this.password = password;
        this.url = url;
    }

    public void setDriverClassName(String driverClassName) {
        this.driverClassName = driverClassName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getDriverClassName() {
        return driverClassName;
    }

    public String getUserName() {
        return userName;
    }

    public String getPassword() {
        return password;
    }

    public String getUrl() {
        return url;
    }

    public int getMaxConnection() {
        return maxConnection;
    }

    public void setMaxConnection(int maxConnection) {
        this.maxConnection = maxConnection;
    }

    public int getMinConnection() {
        return minConnection;
    }

    public void setMinConnection(int minConnection) {
        this.minConnection = minConnection;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

}

最後給出資料庫連線池的具體實現,預設連線池採用無界阻塞佇列

package Pools;

import org.springframework.stereotype.Component;

import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created by zoujianglin
 * 2018/8/25 0025.
 */

public class DefaultPool extends BasePool {


    public DefaultPool(DataSource dataSource) {
        super(dataSource, new LinkedBlockingQueue());
    }

    public DefaultPool(DataSource dataSource, int times) {
        super(dataSource, new LinkedBlockingQueue(), times);

    }


}