1. 程式人生 > >Pika原始碼學習--pika的通訊和執行緒模型

Pika原始碼學習--pika的通訊和執行緒模型

pika的執行緒模型有官方的wiki介紹https://github.com/Qihoo360/pika/wiki/pika-%E7%BA%BF%E7%A8%8B%E6%A8%A1%E5%9E%8B,這裡主要介紹了pika都有哪些執行緒,這些執行緒用來幹嘛。本篇文章主要涉及監聽執行緒DispatchThread、IO工作執行緒WorkerThread和工作執行緒池ThreadPool,結合程式碼介紹裡面實現的一些細節。

  • 1.監聽執行緒DispatchThread

在建立PikaServer的時候,會構造一個PikaDispatchThread,這個PikaDispatchThread,實際上是用了pink網路庫的DispatchThread::DispatchThread

DispatchThread建構函式裡面會初始化好若干個WorkerThread

 DispatchThread繼承自ServerThread,ServerThread繼承了Thread,執行緒啟動時實際上執行的是子類的ThreadMain方法,繼承了Thread類的子類需要有自己的ThreadMain,監聽執行緒start的時候,入口是ServerThread::ThreadMain()。執行緒啟動會先ServerThread::InitHandle(),繫結和監聽埠,下面看看ServerThread::ThreadMain()裡面做了啥。

ServerThread::ThreadMain()主要邏輯是一個epoll,當有新的連線事件來的時候,accept,然後呼叫DispatchThread::HandleNewConn來處理這個新的連線

DispatchThread::HandleNewConn如何處理連線呢?實際上監聽執行緒會把連線分發給IO工作執行緒WorkerThread來處理。每個WorkerThread都有一個PinkEpoll,PinkEpoll有一個notify_queue_,新的連線會以PinkItem的形式push到這個佇列裡面,然後通知WorkerThread來處理。分發的方式類似輪訓,會按順序分發給notify_queue_沒有滿的WorkerThread。

 

那麼監聽執行緒如何通知WorkerThread來處理新的連線呢?使用的是管道的方式,PinkEpoll會建立一個管道用來通知,並且把這個管道加到Epoll裡面。在確定好要分發的WorkerThread後,往這個WorkerThread的管道寫進去一個1位元組的內容,來觸發這個管道的讀事件。

 

 

  • 2.IO工作執行緒WorkerThread

DispatchThread::StartThread的時候會起WorkerThread執行緒,WorkerThread也是繼承了Thread,因此工作執行緒的入口是WorkerThread::ThreadMain。上文說到監聽執行緒把新的連線放到WorkerThread的佇列裡面後,通知了WorkerThread進行處理。下面我們看看WorkerThread怎麼處理的。
WorkerThread同樣是一個Epoll,這裡會處理新連線請求事件和已連線請求的事件,如果Epoll返回的fd是notify_receive_fd,即管道的接收fd,說明是內部的通知事件,一次性讀取多個位元組的內容,因為前面已知每個通知是1個位元組,因此這裡讀到了多少個位元組就說明有多少個通知,然後在一個迴圈裡面處理這些請求。型別為kNotiConnect則是新的連線,這裡會把監聽執行緒push的PinkItem取出來,然後建立一個NewPinkConn,加到conns_裡面,並且把這個fd加到WorkerThread的epoll,後續的訊息事件就可以在這個epoll被處理。這裡conn_factory用的是ClientConnFactory,返回的是PikaClientConn,繼承了pink::RedisConn。

連線繫結到WorkerThread後,已建立連線的客戶端傳送請求過來,則是走的下面的分支,根據fd在conns_裡面找到PinkConn,我們先只看讀請求部分,迴響應部分後面再看。

在conns_裡面找到的是對應fd的PikaClientConn,使用RedisConn::GetRequest來讀取客戶端的的請求,此處有一個細節,如果read_status為kReadAll,則一次完整的請求被讀取,會先把這個請求fd的讀寫事件給刪除。這是為啥呢?刪除了不是後續就處理不了這個請求的讀寫嗎,這個我們後面講到了再說明。
RedisConn::GetRequest裡面,使用RedisParser::ProcessRequestBuffer來解析讀取到的內容,然後有2種處理方式,DealMessage和Complete

先看下這兩個函式的初始化,DealMessage對應著ParserDealMessageCb,Complete對應著ParserCompleteCb

 我們看這兩個方法,原來一個是同步處理,一個是非同步,同步的話就是一個個命令呼叫DealMessage來處理,非同步的話是解析完合成一組命令統一調Complete處理。非同步的處理方式是將請求的命令提交給執行緒池來處理PikaClientConn::AsynProcessRedisCmds,怎麼提交的我們在工作執行緒池裡面介紹。

 

  • 3.工作執行緒池ThreadPool

PikaServer構造的時候會建立一個PikaClientProcessor,PikaClientProcessor裡面有一個ThreadPool,ThreadPool啟動時會建立Worker執行緒,Worker執行緒實際的處理函式是ThreadPool::runInThread()

前面講到WorkerThread解析完redis命令後會把命令提交給ThreadPool來處理,實際上是呼叫了執行緒池的ThreadPool::Schedule方法,Schedule需要一個TaskFunc來真正處理命令,這裡使用的是DoBackgroundTask

ThreadPool::Schedule裡面,把引數封裝成Task,然後push到執行緒池的任務佇列,接著通知執行緒池處理,這裡WorkerThread是生產者,執行緒池是消費者。

 而執行緒池的工作執行緒,則是不斷地在佇列裡面取出Task進行處理。

 

  • 4.命令處理和響應流程

執行緒池裡面實際處理命令的是DoBackgroundTask,我們先來看看命令是怎麼被處理的。DoBackgroundTask裡面呼叫的是PikaClientConn::BatchExecRedisCmd

 BatchExecRedisCmd裡面是命令一個一個取出來ExecRedisCmd,然後PikaClientConn::DoCmd,響應訊息先塞到resp_array,在TryWriteResp裡面又把響應一個個取出來塞到response_裡,並且把is_reply_置為true,然後做了一個NotifyEpoll的操作。

 可以看到,這裡把處理結果又封裝成一個PinkItem,然後和前面介紹的監聽執行緒把連線請求分發給WorkerThread一樣,把PinkItem放到PinkEpoll的佇列裡面,然後通過在管道里面寫了一個位元組的字元觸發epoll的讀事件。所以我們回過頭來看看WorkerThread的處理WorkerThread::ThreadMain

 這裡的流程和前面介紹的差不多,可以看到這裡把這個連線的fd的讀寫事件重新加到epoll裡面,前面我們留了一個疑問,在一次命令讀取結束後,把連線fd的讀寫事件從epoll裡面刪除了,這是為啥呢?這裡我們看到命令處理結束後又把讀寫事件加回來了。應該是因為pika用的是非同步處理,一個連線的命令是非同步地交給執行緒池處理,如果同個連線發了2個命令,因為是非同步處理,沒有辦法保證2個命令滿足FIFO,即先來的命令需要先回復,後來的命令後回覆,redis是單執行緒模型,因此天然滿足,pika是多執行緒非同步處理,所以這裡在讀取了第一個命令後,把連線的讀寫事件刪除了,等前一個命令處理完了才加回來,讀取第二個命令來處理。

連線的fd加進epoll後,fd可寫了,那麼epoll會返回可寫事件,用RedisConn::SendReply來發送響應給客戶端,如果寫完了會把fd的寫事件給刪掉,如果沒寫完,則等fd可寫了會繼續觸發寫事件來寫回復。

 

  • 5.總結

通過上面的分析可以知道,監聽執行緒是用來監聽新的連線,連線來了會交由WorkerThread處理,已建立連線的請求會由WorkerThread封裝成Task交給執行緒池ThreadPool處理,ThreadPool處理完了後,還是由WorkerThread來回復。WorkerThread就是做接收訊息,回覆訊息的,而ThreadPool只是處理訊息,不涉及接收和回覆的IO操作。這3者的關係大概如下圖所示:

&n