1. 程式人生 > >co_routine.cpp/.h/inner.h(第四部分:定時器和事件迴圈)—— libco原始碼分析、學習筆記

co_routine.cpp/.h/inner.h(第四部分:定時器和事件迴圈)—— libco原始碼分析、學習筆記

由於本原始碼蠻長的,所以按照功能劃分模組來分析,分為若干部分,詳見二級目錄↑

定時器和事件迴圈

libco管理定時事件便是使用時間輪這種資料結構

定時器前驅知識本篇只稍微提一下,具體知識請參考《Linux高效能伺服器程式設計 遊雙 著》第11章,或其他方式學習。

先說有序連結串列方式:

將事件按照超時時間以升序的方式串起來(很顯然新增一個事件的複雜度為O(n)),然後連結串列頭部的事件就是最接近超時的事件的定時器。

時間堆:

時間堆是對有序連結串列的優化,利用堆我們可以O(logn)插入,O(1)取最接近超時的事件。除了用堆優化了以外其他跟上面差不多。

時間輪:(以下大部分描述來自於《Linux高效能伺服器程式設計》)

時間輪是對有序連結串列的一種優化,通過一種hash的思想使得新增定時事件的時間複雜度降到接近O(1),是有序連結串列管理方式的一種改進,大大提高了效率。 

pic

上圖方格代表事件(的定時器),輪盤上槽的數字=超時時間%(輪子轉一圈所需時間)  (%為取餘運算)。

輪子中間的箭頭是一個指標,每次輪子轉動它都會指向下一格,而每次轉動的時間(槽間隔ti)稱為一個滴答時間,它實際上就是一個心博時間。該時間輪共有N個槽,因此它執行一週的時間為N×si。輪盤上每個槽都串了一個連結串列(無序),表示餘數為槽內數值的定時器的集合,每個槽的連結串列(也就是這個集合)上所有的定時器具有相同的特徵:它們的定時時間相差N×si的整數倍。時間輪正是利用這個關係將定時器雜湊到不同連結串列中的。假設我們現在指標指向槽cs,我們要新增一個定時時間為ti的定時器,則該定時器將被插入到槽ts對應的連結串列中:

ts=(cs+(ti/si))%N

很顯然,對於時間輪而言,要提高定時精度,就要使si足夠小;要提高執行效率,則要N值足夠大。

上述圖是簡單時間輪(只有一個輪子)。而複雜實踐論可能有多個輪子,不同輪子擁有不同粒度。相鄰兩個輪子,精度高的轉一圈精度低的僅移動一槽,就像水錶一樣。

先說周邊函式

定時器總是離不開定時的。

先說一種計時方法。

static unsigned long long getCpuKhz(); 作用是獲得cpu頻率,如果你看不懂getCpuKhz這個函式,可以開啟/proc/cpuinfo看一眼,就可以知道這裡記載的是cpu的動態資訊。

counter()主要是呼叫rdtscp這條彙編指令,將計數(來一個時鐘脈衝+1)讀出來。 將總共的時鐘脈衝數讀出再除以cpu的頻率(每秒時鐘脈衝)就是時間。

雖然對於現代計算機都是可以變頻的,但這裡不會有影響,查閱INTEL,有

1.The time stamp counter in newer processors may support an enhancement, referred to as invariant TSC. 

Processor’s support for invariant TSC is indicated by CPUID.80000007H:EDX[8].   

  2.The invariant TSC will run at a constant rate in all ACPI P-, C-. and Tstates. This is the architectural behavior 

moving forward. On processors with invariant TSC support, the OS may use the TSC for wall clock timer 

services (instead of ACPI or HPET timers). TSC reads are much more efficient and do not incur the overhead 

associated with a ring transition or access to a platform resource.  

也就是說rdtsc或rdtscp不用考慮CPU的變頻問題,他會以固定的速率增加。

static unsigned long long counter(void)
{
	register uint32_t lo, hi;
	register unsigned long long o;
	__asm__ __volatile__ (
			"rdtscp" : "=a"(lo), "=d"(hi)::"%rcx"
			);//eax暫存器的值賦給lo,edx賦給hi
	o = hi;//o為64位,將hi先放在低32位
	o <<= 32;//移到高位
	return (o | lo);//將lo放在低32位return

}

方法1:主要是使用counter將總共的時鐘脈衝數讀出再除以cpu的頻率(每秒時鐘脈衝)就是時間

方法2:gettimeofday自然不用說,好處是跨平臺不用切換到核心態。讀取1970年1月1日到現在的時間。

static unsigned long long GetTickMS()//返回值單位ms
{
#if defined( __LIBCO_RDTSCP__) 
	static uint32_t khz = getCpuKhz();
	return counter() / khz;
#else
	struct timeval now = { 0 };
	gettimeofday( &now,NULL );
	unsigned long long u = now.tv_sec;
	u *= 1000;
	u += now.tv_usec / 1000;
	return u;
#endif
}

再看時間輪(成員)函式和結構

成員加括號的原因是:時間輪不是以類的形式給出的。libco整體上的風格都是手動實現面向物件的操作。

struct stTimeout_t:時間輪結構體

struct stTimeout_t
{
	stTimeoutItemLink_t *pItems;//時間輪盤陣列(連續地址的連結串列),只有兩個成員:head,tail。
//尤其注意即使用了連結串列的資料結構,但是地址連續,可以使用基址+下標訪問。
	int iItemSize; //時間輪尺寸(槽個數N)也是最大計時時間。

	unsigned long long ullStart;//時間輪起始時間(時間基址)
	long long llStartIdx;//時間輪指標當前指向的位置。但是這個指標要對iItemSize取餘才是真正的指標。
};

定時器(事件)結構體。

struct stTimeoutItem_t
{

	enum
	{
		eMaxTimeout = 40 * 1000 //40s
	};
	stTimeoutItem_t *pPrev; //連結串列結構
	stTimeoutItem_t *pNext;
	stTimeoutItemLink_t *pLink;

	unsigned long long ullExpireTime;//超時的絕對時間

	OnPreparePfn_t pfnPrepare;//兩個處理函式,互斥
	OnProcessPfn_t pfnProcess;

	void *pArg; // routine 
	bool bTimeout;//是否超時。1=超時
};

TakeAllTimeout():讓時間輪轉動一下,並取出所有因此timeout的時間(定時器)。

好吧,看這個函式的時候本來很順利地看了大部分然後看到join的操作之後把之前的思路全盤推翻......

事情是這樣的:

libco中的時間輪跟上面講解的不太一樣。libco的時間輪有一個最大計時時間:itemsize(從這裡可以看出每個槽對應時間為1)。超過這個時間的時間會向co_log_err中寫一條錯誤資訊,然後把超時時間強行賦值為itemsize-1(最大超時時間)。

由於GetTickMS()返回值單位是ms,所以時間輪時間單位也是ms。

inline void TakeAllTimeout( stTimeout_t *apTimeout,
        unsigned long long allNow,stTimeoutItemLink_t *apResult )
{
	if( apTimeout->ullStart == 0 )//第一次使用初始化
	{
		apTimeout->ullStart = allNow;
		apTimeout->llStartIdx = 0;
	}

	if( allNow < apTimeout->ullStart )//error
	{
		return ;
	}
	int cnt = allNow - apTimeout->ullStart + 1;//計算時間差
	if( cnt > apTimeout->iItemSize ) 
	{//若時間差過大,應該是error了,但是為了保證健壯性則 假裝 只過了允許的最大超時時間活在過去。
		cnt = apTimeout->iItemSize;
    //經過幾次最大超時時間後應該可以攆上正確的時間。
	}
	if( cnt < 0 )
	{//當前時間比開始時間還早就肯定是error了,說明傳參出了問題,取不到任何事件。
		return;
	}
	for( int i = 0;i<cnt;i++)
	{//摘下所有超時的事件。把每個時間節點的超時事件連結串列摘下加入到result連結串列。
		int idx = ( apTimeout->llStartIdx + i) % apTimeout->iItemSize;
		Join<stTimeoutItem_t,stTimeoutItemLink_t>( apResult,apTimeout->pItems + idx  );
	}
	apTimeout->ullStart = allNow;//讓起始時間=當前時間。
	apTimeout->llStartIdx += cnt - 1;//但是這個指標要對iItemSize取餘才是真正的指標。


}

再來看看如何往時間輪中加入定時器(事件),跟取出相對應比較相似。前面健壯性描述就不再重複了。

引數1:時間輪

引數2:插入的定時器。包含超時的絕對時間點

引數3:當前時間。(用超時的絕對時間點-當前時間=相對超時時間)

int AddTimeout( stTimeout_t *apTimeout,stTimeoutItem_t *apItem ,
    unsigned long long allNow )
{
	if( apTimeout->ullStart == 0 )//init
	{
	{
		apTimeout->ullStart = allNow;
		apTimeout->llStartIdx = 0;
	}
	if( allNow < apTimeout->ullStart )//err
	{
		co_log_err("CO_ERR: AddTimeout line %d allNow %llu apTimeout->ullStart %llu",
					__LINE__,allNow,apTimeout->ullStart);

		return __LINE__;
	}
	if( apItem->ullExpireTime < allNow )//err
	{
		co_log_err("CO_ERR: AddTimeout line %d apItem->ullExpireTime %llu allNow %llu apTimeout->ullStart %llu",
					__LINE__,apItem->ullExpireTime,allNow,apTimeout->ullStart);

		return __LINE__;
	}
	unsigned long long diff = apItem->ullExpireTime - apTimeout->ullStart;
//相對超時時間(偏移時間)

	if( diff >= (unsigned long long)apTimeout->iItemSize )
	{//diff超過了最大計時時間,error,但是為了保證健壯性將diff變為最大計時時間。
/* 這裡要注意的是,雖然這裡超出了允許的最大時間且報error,但是在eventloop中有相應的設計使得這個計時器可以真正地在應該超時的地方超時,不會出錯。做法是:這個計時器可以被提前取出,但是取出後發現它還沒有超時,就再將它放回去,如此經過幾次進進出出後就可以延遲到它應該到達的時間點。感慨一下工程師設計精妙吧。
*/
		diff = apTimeout->iItemSize - 1;
		co_log_err("CO_ERR: AddTimeout line %d diff %d",
					__LINE__,diff);

		//return __LINE__;
	}
	AddTail( apTimeout->pItems + ( apTimeout->llStartIdx + diff ) % apTimeout->iItemSize , apItem );
//將定時器加入到時間輪相應位置。

	return 0;
}

事件迴圈

先看資料結構。順便感慨一下萬物皆物件。 

事件迴圈結構體。

struct stCoEpoll_t
{
	int iEpollFd; //epoll檔案號,後面用co_epollwait函式從中取active事件。
	static const int _EPOLL_SIZE = 1024 * 10;

	struct stTimeout_t *pTimeout;//時間輪

	struct stTimeoutItemLink_t *pstTimeoutList;//timeout連結串列   結構體:只有head,tail。

	struct stTimeoutItemLink_t *pstActiveList;
//active連結串列,每次eventloop後把所有active事件處理完畢後此連結串列為空,包含兩部分事件:1.epoll就緒事件、2.時間輪超時事件。

	co_epoll_res *result; 

};

然後是事件迴圈。

引數1:事件迴圈結構體。

引數2:函式結束之前呼叫了一下(如果不為NULL的話),推測是某個管理函式。

引數3:pfn的引數。

void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
	if( !ctx->result )//若沒有分配空間,就分配
	{
		ctx->result =  co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
	}
	co_epoll_res *result = ctx->result;


	for(;;)
	{
		int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
//從epoll結構中選取active事件摘下存放在result連結串列裡。
		stTimeoutItemLink_t *active = (ctx->pstActiveList);
//active事件連結串列,該連結串列同時包含epoll函式active事件和實踐論上timeout的事件。
		stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);//timeout事件連結串列,連結串列為空。

		memset( timeout,0,sizeof(stTimeoutItemLink_t) );
        //之前的timeout事件已經全都處理完畢了(連結串列為空),把結構清空(head和tail清空)。

		for(int i=0;i<ret;i++)
		{
			stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
            //從連結串列result中取出一個active事件。
			if( item->pfnPrepare )
//若有預處理函式就呼叫預處理函式,就不進一步操作了。可見pfnPrepare和pfnProcess是互斥關係。
			{
				item->pfnPrepare( item,result->events[i],active );
			}
			else
			{
				AddTail( active,item );//沒有的話就把它加入到active連結串列中。
			}
		}


		unsigned long long now = GetTickMS();
//獲取當前絕對時間,與時間輪起始時間的時間差作為一個滴答時間
		TakeAllTimeout( ctx->pTimeout,now,timeout );
        //讓時間輪轉動一下,並取出所有因此timeout的事件(定時器)。

		stTimeoutItem_t *lp = timeout->head;
		while( lp )
		{
			//printf("raise timeout %p\n",lp);
			lp->bTimeout = true;//標記為超時
			lp = lp->pNext;
		}

		Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
        //把超時事件連結串列也新增到active連結串列裡。
		lp = active->head;
		while( lp )
		{

			PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
        //將一個active事件取下來
            if (lp->bTimeout && now < lp->ullExpireTime) 
			{
//如果是超時事件,而且時間沒超時。
 /*觸發條件:如果向時間輪加入的計時器超時時間過大且超過了最大計時時間,然後雖然會報error但是還會
    加入到時間輪中。後來將它取出時,它沒有真正的超時,也就是絕對超時時間大於當前時間,就會觸發這個條件。
這樣的話,我們將它重新加入到時間輪中繼續等待,如此迴圈,直到真正滿足它的時間點後就不會觸發這個條,
這樣就很完美地解決了時間過長無法存入的問題。
這裡不得不感嘆一下騰訊工程師設計的精妙。
*/
				int ret = AddTimeout(ctx->pTimeout, lp, now);
				if (!ret) //ret==0表示AddTimeout成功(時間超過最大超時時間也會返回0)。
				{
					lp->bTimeout = false;//重置計時器為未超時。
					lp = active->head;
					continue;
				}
        //因為程式碼裡寫的時間已經是滿足ret==0的條件了,若這樣還是不成功,那就是天意了。
			}
			if( lp->pfnProcess )//若存在就執行一下這個函式。
			{
				lp->pfnProcess( lp );
			}

			lp = active->head;
		}
		if( pfn )
		{
			if( -1 == pfn( arg ) )
			{
				break;
			}
		}

	}
}

看了eventloop後,是不是感覺很精巧啊。