1. 程式人生 > >並行原理分析(libcurl+epoll)

並行原理分析(libcurl+epoll)

yar是一個輕量級的php rpc框架。有意思的是它的並行,其實就是libcurl作為網路庫提供http請求,然後用epoll做為事件監聽來實現整個非同步並行呼叫的。在此基礎上,就是如何利用zend api來對整個邏輯的封裝了。我們先拋開zend api,單獨看看libcurl 結合 epoll 是如何來做到非同步並行呼叫的。 
先大致熟悉一下libcurl,官網http://curl.haxx.se/libcurl/c/libcurl.html。 
1、Easy interface 
如何發起一個curl呼叫?直接上碼:

#include<stdio.h>
#include<curl/curl.h>
int main(int argc, char *argv[]) { CURL *curl; CURLcode res; curl_global_init(CURL_GLOBAL_ALL); curl = curl_easy_init(); if(curl) { curl_easy_setopt(curl, CURLOPT_URL, "http://google.com"); res = curl_easy_perform(curl); if(res != CURLE_OK) { fprintf
(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res)); } curl_easy_cleanup(curl); } curl_global_cleanup(); return 0; }

大致3個步驟: 
1)、curl_easy_init :獲得curl的處理控制代碼。 
2)、curl_easy_setopt :用來針對網路傳輸過程中的一些選項設定,比如訪問的地址、超時、設定請求回撥、資料回寫變數等等,詳情去檢視官網。 
3)、curl_easy_perform:執行curl請求。 
到這裡你可以知道,一個curl控制代碼就是一個請求,既然是網路請求,你肯定猜到libcurl都為每個curl控制代碼都維護一個socket fd。 
可以看出這種呼叫方式的特點是最簡單,同步呼叫,但也效率低下。

2、Multi interface 
Multi interface是基於多個curl控制代碼來實現併發的,大致原理是把多個curl_easy_init 建立的curl加入一個stack裡面,然後並行去執行stack裡面的curl控制代碼,來達到並行的目的。 
對於Multi interface,一般有兩種使用方法,一種是curl_multi_perform 結合select / poll 方式呼叫,雖然能實現非同步,但終究還是基於select/poll這種模型,需主動輪詢fd事件,在特定場合下效率還是沒法保證。第二種是”multi_socket”方式,即利用curl_multi_socket_action來替代curl_multi_perform來執行請求,順便可以獲得請求狀態的變化,以方便用事件監聽的方式來掌控請求讀寫狀態,事件監聽的方式有很多成熟的庫,出了名的比如libevent,libev,libuv等,咱們等會兒用epoll來說,yar用的也是epoll。 
咱們來討論multi_socket + epoll方式,直接上碼:

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include <sys/epoll.h>
#include<curl/curl.h>

typedef struct _global_info {
    int epfd;
    CURLM *multi;
} global_info;

typedef struct _easy_curl_data {
    CURL *curl;
    char data[1024] = {0};
} easy_curl_data;

typedef struct _multi_curl_sockinfo {
    curl_socket_t fd;
    CURL *cp;
} multi_curl_sockinfo;

char curl_cb_data[1024] = {0};

static int sock_cb (CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
{
    struct epoll_event ev = {0};

    global_info * g = (global_info *) cbp;
    multi_curl_sockinfo  *fdp = (multi_curl_sockinfo *) sockp;

    if (what == CURL_POLL_REMOVE) {
        if (fdp) {
            free(fdp);
        }
        epoll_ctl(g->epfd, EPOLL_CTL_DEL, s, &ev);
    } else {
        if (what == CURL_POLL_IN) {
            ev.events |= EPOLLIN;
        } else if (what == CURL_POLL_OUT) {
            ev.events |= EPOLLOUT;
        } else if (what == CURL_POLL_INOUT) {
            ev.events |= EPOLLIN | EPOLLOUT;
        }

        if (!fpd) {
            fpd = (multi_curl_sockinfo *)malloc(sizeof(multi_curl_sockinfo));
            fpd->fd = s;
            fpd->cp = e;

            epoll_ctl(g->epfd, EPOLL_CTL_ADD, s, &ev);
            curl_multi_assign(g->multi, s, &ev);
        }

    }
    return 0;
}

static void set_curl_opt(CURL *curl)
{
    //set curl options..
    curl_easy_setopt(curl, CURLOPT_WRITEDATA, curl_cb_data);
    //other options..
}

int main(int argc, char *argv[])
{
    char *urls[3] = {"https://google.com", "http://qq.com", "http://xxx.com"};

    curl_global_init(CURL_GLOBAL_ALL);
    global_info g;
    memset(&g, 0, sizeof(global_info));
    g.epfd = epoll_create(10);
    g.multi = curl_multi_init();

    int i=0;
    for(;i<3;i++) {
        CURL *curl;
        curl = curl_easy_init();
        set_curl_opt(curl);
        curl_multi_add_handle(g.multi, curl);
    }

    curl_multi_setopt(multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
    curl_multi_setopt(multi->cm, CURLMOPT_SOCKETDATA, &g);

    int running_count;
    struct epoll_event events[10]
    while (CURLM_CALL_MULTI_PERFORM == curl_multi_socket_action(g.multi, CURL_SOCKET_TIMEOUT, 0, &running_count));

    if (running_count) {
        do {
            nfds = epoll_wait(g.epfd, events, 10, 500);
            if(nfds > 0) {
                int z=0;
                for (;z<nfds; z++) {
                    if (events[i].events & EPOLLIN) {
                        curl_multi_socket_action(g.multi, CURL_CSELECT_IN, events[i].data.fd, &running_count);
                    } else if (events[i].events & EPOLLOUT) {
                        curl_multi_socket_action(g.multi, CURL_CSELECT_OUT, events[i].data.fd, &running_count);
                    }
                }
            }
        } while (running_count);
    }

    curl_global_cleanup();
    return 0;
}

以上程式碼就是libcurl + epoll實現非同步並行呼叫的模板,可以稍作修改就能放到你的專案中執行。 
如果你對上面的程式碼看完後沒啥感覺,這裡重點說一下: 
curl_multi_init 建立multi curl 控制代碼,這個控制代碼是來管理easy_curl_init 建立 curl控制代碼的,你可以理解為一個大池子,這個池子裡面存放著多個curl控制代碼。一個multi curl代表一個池子,多個multi curl 互不影響。之後multi curl 也需要設定一些選項,通過curl_multi_setopt 函式來設定,最關鍵的2個分別是CURLMOPT_SOCKETFUNCTION 選項和 CURLMOPT_TIMERFUNCTION選項。這兩個選項設定了sock calback 和 timer callback ,看名字你也大概猜出各自代表的意思,沒錯,我們知道一個curl控制代碼裡面維護著一個socket fd ,當這個fd狀態變化的時候是靠這個sock callback來通知的,而timer callback ,是libcurl用來向呼叫方通知超時回撥的。只需要設定一下函式指標就好。具體用法看官網。 
我們知道通過CURLMOPT_SOCKETFUNCTION 選項來設定sock callback來獲知sock的狀態了,這點非常重要,但是對於這個fd是何時讀、寫,對於libcurl來說它是無能為力的。我們需要一個能夠監聽事件的庫,這時候epoll就華麗登場了。epoll在這這裡如何使用?使其就是把sock calback 通知的sock fd 狀態,同步一份給epoll就行了(其實就是上面程式碼的sock_cb函式)。然後,剩下的就是epoll event loop了,epoll_wait 返回可用fd後,你就可以進行相關的讀寫操作了,讀寫操作用這個curl_multi_socket_action來搞定。還記得之前說的curl_easy_init 建立curl控制代碼吧?建立的時候是通過curl_easy_setopt 來設定選項的,其中CURLOPT_WRITEFUNCTION 和 CURLOPT_WRITEDATA 分別設定當curl執行完成後回撥函式和寫入變數,你可以在這個回撥函式裡面進行curl執行結果的處理。所以,當呼叫curl_multi_socket_action的時候,就會觸發這2個選項的作用了。一個完整的sock_multi + epoll的非同步執行呼叫就是這樣。

看到這裡,再回去看上面的程式碼,估計問題不大了。

OK,剩下的事情就是把這塊的邏輯整合zend api ,如果你熟悉php擴充套件的編寫,熟悉zend api,再結合上面的分析,然後去看yar原始碼,就暢通無阻了。想想要不要完整的把yar的原始碼分析寫出來,但一想到一堆zend api , 想想還是算了…不過有個很好的編寫PHP擴充套件的模板,請參照這個專案https://github.com/linkaisheng/edge,這是一個介面平臺的php框架,純C編寫,裡面用到了各種zend api ,包括類的建立、屬性操作、函式呼叫、垃圾回收、返回值、物件複製等等等等,為了弄清楚這些用法,我親身經歷了N多個segfault 才搞清楚並且執行正常起來,親測可用..你可以大膽參照上面的來。