1. 程式人生 > >Go語言之並發示例-Pool(一)

Go語言之並發示例-Pool(一)

go pool 並發

這篇文章演示使用有緩沖的通道實現一個資源池,這個資源池可以管理在任意多個goroutine之間共享的資源,比如網絡連接、數據庫連接等,我們在數據庫操作的時候,比較常見的就是數據連接池,也可以基於我們實現的資源池來實現。


可以看出,資源池也是一種非常流暢性的模式,這種模式一般適用於在多個goroutine之間共享資源,每個goroutine可以從資源池裏申請資源,使用完之後再放回資源池裏,以便其他goroutine復用。


好了,老規矩,我們先構建一個資源池結構體,然後再賦予一些方法,這個資源池就可以幫助我們管理資源了。


//一個安全的資源池,被管理的資源必須都實現io.Close接口

type Pool struct
{ m sync.Mutex res chan io.Closer factory func() (io.Closer,error) closed bool}


這個結構體Pool有四個字段,其中m是一個互斥鎖,這主要是用來保證在多個goroutine訪問資源時,池內的值是安全的。


res字段是一個有緩沖的通道,用來保存共享的資源,這個通道的大小,在初始化Pool的時候就指定的。註意這個通道的類型是io.Closer接口,所以實現了這個io.Closer接口的類型都可以作為資源,交給我們的資源池管理。


factory這個是一個函數類型,它的作用就是當需要一個新的資源時,可以通過這個函數創建,也就是說它是生成新資源的,至於如何生成、生成什麽資源,是由使用者決定的,所以這也是這個資源池靈活的設計的地方。


closed字段表示資源池是否被關閉,如果被關閉的話,再訪問是會有錯誤的。


現在先這個資源池我們已經定義好了,也知道了每個字段的含義,下面就開時具體使用。剛剛我們說到關閉錯誤,那麽我們就先定義一個資源池已經關閉的錯誤。


var ErrPoolClosed = errors.New("資源池已經關閉。")


非常簡潔,當我們從資源池獲取資源的時候,如果該資源池已經關閉,那麽就會返回這個錯誤。單獨定義它的目的,是和其他錯誤有一個區分,這樣需要的時候,我們就可以從眾多的error類型裏區分出來這個ErrPoolClosed


下面我們就該為創建Pool專門定一個函數了,這個函數就是工廠函數,我們命名為New


//創建一個資源池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) { if size <= 0 { return nil, errors.New("size的值太小了。") } return &Pool{ factory: fn, res: make(chan io.Closer, size), }, nil

}


這個函數創建一個資源池,它接收兩個參數,一個fn是創建新資源的函數;還有一個size是指定資源池的大小。


這個函數裏,做了size大小的判斷,起碼它不能小於或者等於 0 ,否則就會返回錯誤。如果參數正常,就會使用size創建一個有緩沖的通道,來保存資源,並且返回一個資源池的指針。


有了創建好的資源池,那麽我們就可以從中獲取資源了。


//從資源池裏獲取一個資源
func (p *Pool) Acquire() (io.Closer,error) { select { case r,ok := <-p.res: log.Println("Acquire:共享資源") if !ok { return nil,ErrPoolClosed } return r,nil default: log.Println("Acquire:新生成資源") return p.factory() }

}


Acquire方法可以從資源池獲取資源,如果沒有資源,則調用factory方法生成一個並返回。


這裏同樣使用了select的多路復用,因為這個函數不能阻塞,可以獲取到就獲取,不能就生成一個。


這裏的新知識是通道接收的多參返回,如果可以接收的話,第一參數是接收的值,第二個表示通道是否關閉。例子中如果ok值為false表示通道關閉,如果為true則表示通道正常。所以我們這裏做了一個判斷,如果通道關閉的話,返回通道關閉錯誤。


有獲取資源的方法,必然還有對應的釋放資源的方法,因為資源用完之後,要還給資源池,以便復用。在講解釋放資源的方法前,我們先看下關閉資源池的方法,因為釋放資源的方法也會用到它。


關閉資源池,意味著整個資源池不能再被使用,然後關閉存放資源的通道,同時釋放通道裏的資源。


//關閉資源池,釋放資源
func (p *Pool) Close() { p.m.Lock() defer p.m.Unlock() if p.closed { return } p.closed = true //關閉通道,不讓寫入了 close(p.res) //關閉通道裏的資源 for r:=range p.res { r.Close() }
}


這個方法裏,我們使用了互斥鎖,因為有個標記資源池是否關閉的字段closed需要再多個goroutine操作,所以我們必須保證這個字段的同步。這裏把關閉標誌置為true


然後我們關閉通道,不讓寫入了,而且我們前面的Acquire也可以感知到通道已經關閉了。同比通道後,就開始釋放通道中的資源,因為所有資源都實現了io.Closer接口,所以我們直接調用Close方法釋放資源即可。


關閉方法有了,我們看看釋放資源的方法如何實現。


func (p *Pool) Release(r io.Closer){   
     //保證該操作和Close方法的操作是安全的
    p.m.Lock()    
    defer p.m.Unlock() 
    //資源池都關閉了,就省這一個沒有釋放的資源了,釋放即可
    if p.closed {
        r.Close()        
        return
    }
    select {
    case p.res <- r:
        log.Println("資源釋放到池子裏了")    
    default:
        log.Println("資源池滿了,釋放這個資源吧")
        r.Close()
    }
}


釋放資源本質上就會把資源再發送到緩沖通道中,就是這麽簡單,不過為了更安全的實現這個方法,我們使用了互斥鎖,保證closed標誌的安全,而且這個互斥鎖還有一個好處,就是不會往一個已經關閉的通道發送資源。


這是為什麽呢?因為Close和Release這兩個方法是互斥的,Close方法裏對closed標誌的修改,Release方法可以感知到,所以就直接return了,不會執行下面的select代碼了,也就不會往一個已經關閉的通道裏發送資源了。


如果資源池沒有被關閉,則繼續嘗試往資源通道發送資源,如果可以發送,就等於資源又回到資源池裏了;如果發送不了,說明資源池滿了,該資源就無法重新回到資源池裏,那麽我們就把這個需要釋放的資源關閉,拋棄了。


Go語言之並發示例-Pool(一)