1. 程式人生 > >C 語言實現MySQL連線池

C 語言實現MySQL連線池

原始碼:連結:https://pan.baidu.com/s/1y0F3YrFfsZgDRe6g6r4RMg 密碼:vg2m
引言:資料庫連線池負責分配、管理和釋放資料庫連線,它允許應用程式重複使用一個現有的資料庫連線,而不是再重新建立一個; 連線池技術大多運用在高併發伺服器的後面;在現有的大型高併發伺服器上,每一次執行緒與資料庫的資料交換都屬於網路連線,頻繁的啟停連線是極不合理的,特別是在大型Web伺服器上過長時間的延遲在使用者體驗上極其糟糕,而重複的建立與斷開資料庫連線對程式的執行也有相當大的影響;

連線池是一個抽象的概念,它包含了一堆資料庫連線、連線管理執行緒、對外提供的介面;
1.執行緒通過呼叫連線池介面獲得一個數據庫連線,使用後無需銷燬連線,只需要在邏輯上放棄連線;當下一個執行緒獲取時仍然能夠正常使用;
2.連線池管理執行緒動態的對資料庫連線進行管理,合理的增刪,在滿足外來呼叫的基礎上保證連線數量的合理利用, 它就像一個吝嗇的僱主; 增加和刪除都會改變連線池的當前的狀態,所以在下圖資料庫連線部池分的互斥鎖將保證對狀態操作的唯一性;
3.資料庫連線應該在使用執行緒上加互斥鎖,即使我們有十個使用了連線的執行緒,但在同一時間對同一個資料的修改我們也應該保證只有一個執行緒在操作,該功能在下圖的執行緒部分中的互斥鎖實現;

該圖為一個簡單的連線池實現模型,我們將以該圖來實現一個最基礎的連線池,該資料庫採用MySQL
這裡寫圖片描述

一、資料庫連線池的抽象結構體(圓圈1)

#define IP_LEN      15
#define DBNAME_LEN  64
#define DBUSER_LEN  64
#define PASSWD_LEN  64
#define POOL_MAX_NUMBER 20
typedef struct _SQL_NODE SQL_NODE;                /* 連線節點 */
typedef struct _SQL_CONN_POOL SQL_CONN_POOL;      /* 連線池 */
/* 連線節點 */ typedef struct _SQL_NODE{ MYSQL fd; /* MYSQL物件檔案描述符 */ MYSQL *mysql_sock; /* 指向已經連線的MYSQL的指標 */ pthread_mutex_t lock; /* 互斥鎖; 用線上程對資料的操作限制*/ int used; /* 使用標誌 */ int index; /* 下標 */
enum{ /* 連線狀態 */ DB_DISCONN, DB_CONN }sql_state; }SQL_NODE; /* 連線池 */ typedef struct _SQL_CONN_POOL{ int shutdown; /* 是否關閉 */ SQL_NODE sql_pool[POOL_MAX_NUMBER]; /* 一堆連線 */ int pool_number; /* 連線數量 */ int busy_number; /* 被獲取了的連線數量 */ char ip[IP_LEN+1]; /* 資料庫的ip */ int port; /* 資料庫的port,一般是3306 */ char db_name[DBNAME_LEN+1]; /* 資料庫的名字 */ char user[DBUSER_LEN+1]; /* 使用者名稱 */ char passwd[PASSWD_LEN+1]; /* 密碼 */ }SQL_CONN_POOL;

二、函式概況 (圓圈3) : 伺服器建立連線池(建立節點),分配連線到執行緒,處理並歸還;

/*建立連線池*/
SQL_CONN_POOL *sql_pool_create(int connect_pool_number, char ip[], int port, 
                               char db_name[], char user[], char passwd[]);
/*節點建立連線*/
int create_db_connect(SQL_CONN_POOL *sp, SQL_NODE *node);
/*銷燬連線池*/
void sql_pool_destroy(SQL_CONN_POOL *sp);
/*取出一個未使用的連線*/
SQL_NODE *get_db_connect(SQL_CONN_POOL *sp);
/*歸回連線*/
void release_node(SQL_CONN_POOL *sp, SQL_NODE *node);
/*增加或刪除連線*/
SQL_CONN_POOL *changeNodeNum(SQL_CONN_POOL *sp, int op);

三、分部實現
3.1 建立連線池函式 ==> 思路:規劃一段記憶體空間存放資料庫的連線,並初始化連線池狀態資訊;

/*建立連線池*/
SQL_CONN_POOL *sql_pool_create(int connect_pool_number, char ip[], int port, 
                               char db_name[], char user[], char passwd[])
{
   SQL_CONN_POOL *sp = NULL;
   /* 錯誤輸入檢測 */
   if (connect_pool_number < 1)
   {
      printf("connect_pool_number < 1. defalut 1 \n");
      connect_pool_number = 1;
   }
   /* 為連線池劃分記憶體空間 */
   if ((sp=(SQL_CONN_POOL *)malloc(sizeof(SQL_CONN_POOL))) == NULL)
   {
      printf("malloc SQL_CONN_POOL error.\n");
      return NULL;
   }

   sp->shutdown    = 0;              //開啟連線池
   sp->pool_number = 0;              //連線數量
   sp->busy_number = 0;              //正在使用的連線數
   strcpy(sp->ip, ip);               //資料庫IP
   sp->port = port;                  //資料庫Port
   strcpy(sp->db_name, db_name);     //資料庫名字
   strcpy(sp->user, user);           //使用使用者名稱
   strcpy(sp->passwd, passwd);       //密碼

   /* 建立連線 */
   if (connect_pool_number > POOL_MAX_NUMBER)
     connect_pool_number = POOL_MAX_NUMBER;

   for (int index=0; index < connect_pool_number; index++)
   {
      //建立失敗, 自定義函式,建立節點函式;
      if (0 != create_db_connect(sp, &sp->sql_pool[index]))
      {
         //銷燬連線池,自定義函式,銷燬連線池;
         sql_pool_destroy(sp);
         return NULL;
      }
      //建立成功
      sp->sql_pool[index].index = index;
      sp->pool_number++;
      printf("create database pool connect:-%d-.\n",sp->sql_pool[index].index); 
   }

   return sp;
}

3.2 建立連線節點 ==> 思路:建立連線前加鎖,成功後記得設定該節點的自動連線與超時限定;特別注意當一個連線在一定時間後沒有使用後會斷開連線,所以我們需要設定自動連線,當連線斷開後自動重新連線;

int opt=1; //超時時間
int res=0; //0正常 -1初始化失敗 1 連線失敗

 do
 {
    if (shutdown == 1)  
       return -1;
    /* 節點加鎖 */
    pthread_mutex_init(&node->lock, NULL);

    /* 初始化mysql物件 */
    if (NULL == mysql_init(&node->fd))
    {
      printf("mysql init error. \n");
      res = -1;
      break;
    }
    if (!(node->mysql_sock = mysql_real_connect(
         &node->fd, sp->ip, sp->user, sp->passwd, sp->db_name, sp->port, NULL, 0)))
    {
      printf("can not connect to mysql.\n");
      node->sql_state = DB_DISCONN;
      res = 1;
      break;
    }
    //使用狀態與連線狀態
    node->used = 0;
    node->sql_state = DB_CONN;
    //設定自動連線開啟
    mysql_options(&node->fd, MYSQL_OPT_RECONNECT, &opt);
    opt = 3;
    //設定連線超時時間為3s,3s未連線成功則超時
    mysql_options(&node->fd, MYSQL_OPT_CONNECT_TIMEOUT, &opt);
    res = 0;

 }while(0);

 return res;

3.3 連線池銷燬

void sql_pool_destroy(SQL_CONN_POOL *sp)
{
  printf("destroy sql pool ... ... \n");

  sp->shutdown = 1; //關閉連線池
  for (int index=0; index < sp->pool_number; index++)
  {
     if (NULL != sp->sql_pool[index].mysql_sock)
     {
        mysql_close(sp->sql_pool[index].mysql_sock);
        sp->sql_pool[index].mysql_sock = NULL;
     }
     sp->sql_pool[index].sql_state = DB_DISCONN; 
     sp->pool_number--;
  }
}

3.4 從連線池中取出一個能夠使用的連線;為了保證取出每一個連線的可能性大致相同,這裡將開始訪問地址下標用隨機數;在取出時也做了連線狀態檢測,若斷開則重新建立連線

/*取出一個未使用的連線*/
SQL_NODE *get_db_connect(SQL_CONN_POOL *sp)
{
  //獲取一個未使用的連線,用隨機值訪問index,保證每次訪問每個節點的概率基本相同
  int start_index = 0, index = 0, i;
  int ping_res;     

  if (shutdown == 1)
     return NULL;

  srand((int)time(0)); //根據當前時間生成隨機數
  start_index = rand() % sp->pool_number; //訪問的開始地址

  for (i=0; i < sp->pool_number; i++)
  {
    index = (start_index + i) % sp->pool_number;

    if (!pthread_mutex_trylock(&sp->sql_pool[index].lock))
    {
       if (DB_DISCONN == sp->sql_pool[index].sql_state)
       {
          //重新連線
          if (0 != create_db_connect(sp, &(sp->sql_pool[index])))
          {
            //重新連線失敗
            release_node(sp, &(sp->sql_pool[index]));
            continue;
          }
       }
       //檢查伺服器是否關閉了連線
       ping_res = mysql_ping(sp->sql_pool[index].mysql_sock);
       if (0 != ping_res)
       {
         printf("mysql ping error.\n");
         sp->sql_pool[index].sql_state = DB_DISCONN;
         release_node(sp, &(sp->sql_pool[index]));        //釋放連線
       }
       else
       {
         sp->sql_pool[index].used = 1;
         sp->busy_number++;              //被獲取的數量增1
         break ;                         //只需要一個節點
       }
    }
  }

  if (i == sp->pool_number)
    return NULL;
  else
    return &(sp->sql_pool[index]);

}

3.5 迴歸連線

/*歸回連線*/
void release_node(SQL_CONN_POOL *sp, SQL_NODE *node)
{
  node->used = 0;
  sp->busy_number--;
  pthread_mutex_unlock(&node->lock);
}

四、連線池管理(圓圈2)
這裡我的實現並沒有採用一個自動的方案,只是提供了一個介面;有需求的朋友可以自行實現;我提一個解決方案:在初始化連線池階段建立一個管理者執行緒,該執行緒隔一段時間就去檢查連線池,通過一組邏輯程式碼去決定是否新增或刪除連線;

/*增加或刪除連線*/
SQL_CONN_POOL *changeNodeNum(SQL_CONN_POOL *sp, int op)  //增加或減少5個連線
{
   int Num = 5;      /* 此處可以用巨集代替,根據自己的需求來實現 */
   int index;        
   int endindex;

   if (op == 1)  //增加    0減少
   {
     endindex = sp->pool_number + Num;
     /*建立連線*/
     for (index=sp->pool_number; index < endindex; index++)
     {
         //建立失敗
         if (0 != create_db_connect(sp, &sp->sql_pool[index]))
         {
            //銷燬連線池
            sql_pool_destroy(sp);
            return NULL;
         }
         //建立成功
         sp->sql_pool[index].index = index;                                                                   
         sp->pool_number++;
         printf("create database pool connect:-%d-.\n",sp->sql_pool[index].index); 
      }
   }
   else if (op == 0)
   {
      endindex = sp->pool_number - Num -1;
      //減少連線
      for (index=sp->pool_number-1; index>endindex && index>=0; index--)
      {       
          if (NULL != sp->sql_pool[index].mysql_sock)
          {
             mysql_close(sp->sql_pool[index].mysql_sock);
             sp->sql_pool[index].mysql_sock = NULL;
          }
          sp->sql_pool[index].sql_state = DB_DISCONN; 
          sp->pool_number--;
          printf("delete database pool connect:-%d-.\n",sp->sql_pool[index].index);
      }
   }

   return sp;
}

五、mian函式呼叫測試

int main()  
{  
    SQL_CONN_POOL *sp = 
    sql_pool_create(10, "localhost", 3306, "ceshi", "root", "qc123456");  
    SQL_NODE *node  = get_db_connect(sp);  
    SQL_NODE *node2 = get_db_connect(sp);

    if (NULL == node)  
    {  
       printf("get sql pool node error.\n");  
       return -1;  
    } 
    printf("--%d-- \n", node->index);
    printf("busy--%d--\n", sp->busy_number);

    if (mysql_query(&(node->fd), "select * from c1"))
    {                                                    
        printf("query error.\n");                                     
        return -1;                                      
    }
    else  
        printf("succeed!\n");  

    changeNodeNum(sp, 0);//減少
    changeNodeNum(sp, 1);//增加
    sql_pool_destroy(sp);

    return 0;  
} 

感謝你看完我的文章,希望你的疑惑能夠得到解答