1. 程式人生 > >libev+nanomsg實現多執行緒通訊及事件輪詢例項demo

libev+nanomsg實現多執行緒通訊及事件輪詢例項demo

概述:

       在我們剛接觸程式編碼的時候,我們要輪詢資料有沒有發過來,我們最多的可能還是使用while+sleep這樣的組合,這對於處理來說是一個效率很低的方法同時還消耗cpu,那麼在多執行緒程式設計中使用libev+nanomsg會不會提高效率呢。下面例項主要工作如下:A B C 三個執行緒通過nanomsg通訊,A執行緒作為主執行緒,控制中樞,B C請求均通過A.那麼在實際應用中,比如B模組是接收客戶請求並解析傳送控制命令的,C模組是負責幹活的,A是總控,所有控制命令通過A下發到別的模組,這樣維護都很方便。那看下怎麼實現的吧。

例項Demo:

#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <ev.h>
#include <pthread.h>
#include <nanomsg/pair.h>
#include <nanomsg/nn.h>


/***************************************************************************************************
動態庫:libev nanomsg
概述:A B C 三個執行緒通過nanomsg通訊,A執行緒作為主執行緒,控制中樞,B C請求均通過A.
demo示範:
    A為指令處理模組
    B為指令接收模組
    C為指令執行模組

    B -> A   開燈
    A -> C   開燈
    C :      執行開燈
    C -> A   OK
    A -> B   OK
總結:
    這只是簡單的測試使用例子,你可以通過在這個框架的基礎上做更多的功能,對於多執行緒程式設計這將是一個不
錯的選擇.
***************************************************************************************************/

typedef struct {
    int n;          //nanomsg socket
    int s;          //nanomsg recieve fd
}nanomsg_info_t;

typedef struct {
    nanomsg_info_t ab;
    nanomsg_info_t ac;
}Aloop_ctrl_t;

typedef struct {
    nanomsg_info_t ba;
}Bloop_ctrl_t;

typedef struct {
    nanomsg_info_t ca;
}Cloop_ctrl_t;

/*獲取系統時間列印*/
uint32_t print_timenow()
{
    time_t now;
    struct tm *tm_now;
    time(&now);
    tm_now = localtime(&now);
    uint32_t times = tm_now->tm_hour * 3600 + tm_now->tm_min * 60 + tm_now->tm_sec;
    printf("[%02d:%02d:%02d]\r\n", tm_now->tm_hour, tm_now->tm_min, tm_now->tm_sec);
    return times;
}

/*****************************************子執行緒C相關**********************************************/

static void watcher_c_cb (struct ev_loop *loop ,struct ev_io *w, int revents)
{
    void *user_data = ev_userdata(loop);
    Cloop_ctrl_t *Cloop_ctrl = (Cloop_ctrl_t *)user_data;
    uint8_t *dat = NULL;
    uint32_t bytes = nn_recv(Cloop_ctrl->ca.n, &dat, NN_MSG, NN_DONTWAIT);
    if (bytes <= 0) {
        return;
    }
    printf("C:%s (A->C)\r\n", (char *)dat);
    nn_freemsg(dat);
    //接收成功,傳送OK
    char *str = "OK";
    uint8_t *udata = nn_allocmsg(3, 0);
    if (NULL != udata) {
        memcpy(udata, str, 3);
        nn_send(Cloop_ctrl->ca.n, &udata, NN_MSG, NN_DONTWAIT);
    }
    
}

int C_nanomsg_init(Cloop_ctrl_t *Cloop_ctrl)
{
    Cloop_ctrl->ca.n = nn_socket(AF_SP, NN_PAIR);
    if (Cloop_ctrl->ca.n < 0) {
        return -1;
    }
    if (nn_connect(Cloop_ctrl->ca.n, "inproc://c2a_loop") < 0) {
        return -1;
    }
    size_t size = sizeof(size_t);
    if (nn_getsockopt(Cloop_ctrl->ca.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&Cloop_ctrl->ca.s, &size) < 0) {
        return -1;
    }
    return 0;
}

struct ev_loop* C_loop_init(Cloop_ctrl_t *Cloop_ctrl)
{
    static struct ev_io watcher_c;
    struct ev_loop *loop = ev_loop_new(EVBACKEND_EPOLL);
    if (NULL == loop) {
        printf("create C loop failed\r\n");
        return NULL;
    }
    ev_io_init (&watcher_c, watcher_c_cb, Cloop_ctrl->ca.s, EV_READ);
    ev_io_start (loop, &watcher_c);
    return loop;
}

void *C_thread(void *arg)
{
    Cloop_ctrl_t Cloop_ctrl;
    if (C_nanomsg_init(&Cloop_ctrl) < 0) {
        printf("nanomsg init failed\r\n");
        return ;
    }
    struct ev_loop* Cloop = C_loop_init(&Cloop_ctrl);
    if (NULL == Cloop) {
        printf("Cloop init failed\r\n");
        return ;
    }
    ev_set_userdata(Cloop, &Cloop_ctrl);
    ev_run (Cloop, 0);
    return ;
}

/*****************************************子執行緒B相關**********************************************/

static void watcher_b_cb (struct ev_loop *loop ,struct ev_io *w, int revents)
{
    void *user_data = ev_userdata(loop);
    Bloop_ctrl_t *Bloop_ctrl = (Bloop_ctrl_t *)user_data;
    uint8_t *dat = NULL;
    uint32_t bytes = nn_recv(Bloop_ctrl->ba.n, &dat, NN_MSG, NN_DONTWAIT);
    if (bytes <= 0) {
        return;
    }
    printf("B:%s (A->B)\r\n\r\n", (char *)dat);
    nn_freemsg(dat);
}

static void watcher_timer_cb (struct ev_loop *loop ,struct ev_timer *w, int revents)
{
    static int i = 1;
    char send_data[128] = {0};
    void *user_data = ev_userdata(loop);
    Bloop_ctrl_t *Bloop_ctrl = (Bloop_ctrl_t *)user_data;
    sprintf(send_data, "Please turn on LED[%d]", i);
    i ++;
    int length = strlen(send_data) + 1;
    uint8_t *udata = nn_allocmsg(length, 0);
    if (NULL != udata) {
        memcpy(udata, send_data, length);
        nn_send(Bloop_ctrl->ba.n, &udata, NN_MSG, NN_DONTWAIT);
    }
    //如果定時器不重設,就會預設1秒進入一次回撥
    w->repeat = 10;
    ev_timer_again(loop, w);
}


int B_nanomsg_init(Bloop_ctrl_t *Bloop_ctrl)
{
    Bloop_ctrl->ba.n = nn_socket(AF_SP, NN_PAIR);
    if (Bloop_ctrl->ba.n < 0) {
        return -1;
    }
    if (nn_connect(Bloop_ctrl->ba.n, "inproc://b2a_loop") < 0) {
        return -1;
    }
    size_t size = sizeof(size_t);
    if (nn_getsockopt(Bloop_ctrl->ba.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&Bloop_ctrl->ba.s, &size) < 0) {
        return -1;
    }
    return 0;
}

struct ev_loop* B_loop_init(Bloop_ctrl_t *Bloop_ctrl)
{
    static struct ev_io watcher_b;
    static struct ev_timer watcher_timer;
    struct ev_loop *loop = ev_loop_new(EVBACKEND_EPOLL);
    if (NULL == loop) {
        printf("create loop failed\r\n");
        return NULL;
    }
    ev_io_init (&watcher_b, watcher_b_cb, Bloop_ctrl->ba.s, EV_READ);
    ev_timer_init(&watcher_timer, watcher_timer_cb, 5, 1);
    ev_io_start (loop, &watcher_b);
    ev_timer_start (loop, &watcher_timer);
    return loop;
}

void *B_thread(void *arg)
{
    Bloop_ctrl_t Bloop_ctrl;
    if (B_nanomsg_init(&Bloop_ctrl) < 0) {
        printf("nanomsg init failed\r\n");
        return ;
    }
    struct ev_loop* Bloop = B_loop_init(&Bloop_ctrl);
    if (NULL == Bloop) {
        printf("Bloop init failed\r\n");
        return ;
    }
    ev_set_userdata(Bloop, &Bloop_ctrl);
    ev_run (Bloop, 0);
    return ;
}

/*****************************************主執行緒A相關**********************************************/

static void watcher_ab_cb (struct ev_loop *loop ,struct ev_io *w, int revents)
{
    void *user_data = ev_userdata(loop);
    Aloop_ctrl_t *Aloop_ctrl = (Aloop_ctrl_t *)user_data;
    uint8_t *dat = NULL;
    uint32_t bytes = nn_recv(Aloop_ctrl->ab.n, &dat, NN_MSG, NN_DONTWAIT);
    if (bytes <= 0) {
        return;
    }
    //轉發到C
    printf("A:%s (B->A)\r\n", (char *)dat);
    nn_send(Aloop_ctrl->ac.n, &dat, NN_MSG, NN_DONTWAIT);
}

static void watcher_ac_cb (struct ev_loop *loop ,struct ev_io *w, int revents)
{
    void *user_data = ev_userdata(loop);
    Aloop_ctrl_t *Aloop_ctrl = (Aloop_ctrl_t *)user_data;
    uint8_t *dat = NULL;
    uint32_t bytes = nn_recv(Aloop_ctrl->ac.n, &dat, NN_MSG, NN_DONTWAIT);
    if (bytes <= 0) {
        return;
    }
    //轉發到B
    printf("A:%s (C->A)\r\n", (char *)dat);
    nn_send(Aloop_ctrl->ab.n, &dat, NN_MSG, NN_DONTWAIT);
}

/*主事件nanomsg初始化*/
int A_nanomsg_init(Aloop_ctrl_t *Aloop_ctrl)
{
    //ab通訊的nanomsg初始化
    Aloop_ctrl->ab.n = nn_socket(AF_SP, NN_PAIR);
    if (Aloop_ctrl->ab.n < 0) {
        return -1;
    }
    if (nn_bind(Aloop_ctrl->ab.n, "inproc://b2a_loop") < 0) {
        return -1;
    }
    //獲取此埠的接收資料fd描述符
    size_t size = sizeof(size_t);
    if (nn_getsockopt(Aloop_ctrl->ab.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&Aloop_ctrl->ab.s, &size) < 0) {
        return -1;
    }
    //ac通訊的nanomsg初始化
    Aloop_ctrl->ac.n = nn_socket(AF_SP, NN_PAIR);
    if (Aloop_ctrl->ac.n < 0) {
        return -1;
    }
    if (nn_bind(Aloop_ctrl->ac.n, "inproc://c2a_loop") < 0) {
        return -1;
    }
    //獲取此埠的接收資料fd描述符
    if (nn_getsockopt(Aloop_ctrl->ac.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&Aloop_ctrl->ac.s, &size) < 0) {
        return -1;
    }
    return 0;
}

/*主事件迴圈初始化*/
struct ev_loop* A_loop_init(Aloop_ctrl_t *Aloop_ctrl)
{
    static struct ev_io watcher_ab, watcher_ac;
    struct ev_loop *loop = ev_loop_new(EVBACKEND_EPOLL);
    if (NULL == loop) {
        printf("create loop failed\r\n");
        return NULL;
    }
    //傳參
    ev_set_userdata(loop, Aloop_ctrl);
    //初始化
    ev_io_init (&watcher_ab, watcher_ab_cb, Aloop_ctrl->ab.s, EV_READ);
    ev_io_init (&watcher_ac, watcher_ac_cb, Aloop_ctrl->ac.s, EV_READ);
    ev_io_start (loop, &watcher_ab);
    ev_io_start (loop, &watcher_ac);
    return loop;
}

/**************************************************************************************************/

int main()
{
    pthread_t pb,pc;
    Aloop_ctrl_t Aloop_ctrl;
    if (A_nanomsg_init(&Aloop_ctrl) < 0) {
        printf("nanomsg init failed\r\n");
        return -1;
    }
    struct ev_loop* Aloop = A_loop_init(&Aloop_ctrl);
    if (NULL == Aloop) {
        printf("Aloop init failed\r\n");
        return -1;
    }
    //建立執行緒B
    if (0 != pthread_create(&pb, NULL, B_thread, NULL)) {
        printf("create pthread B failed\r\n");
        return -1;
    }
    //建立執行緒C
    if (0 != pthread_create(&pc, NULL, C_thread, NULL)) {
        printf("create pthread C failed\r\n");
        return -1;
    }
    //執行
    ev_run(Aloop, 0);
    return 0;
}

編譯執行:

//編譯
gcc -o ev_nanomsg ev_nanomsg.c -lev -lnanomsg -lpthread

//執行結果
A:Please turn on LED[1] (B->A)
C:Please turn on LED[1] (A->C)
A:OK (C->A)
B:OK (A->B)