1. 程式人生 > >c++實現多對多生產者消費者和socket連用

c++實現多對多生產者消費者和socket連用

背景:服務端實現一個多對多的生產者消費者模式,監聽某個埠,一旦有client連入,將socket存入佇列。通知消費者程序進行消費。在消費者程序中,拿到客戶端的socket,接收客戶端的資訊,並將接收到的資料返回服務端。

難點:鎖,server main函式如何生成多對多的執行緒(這是個大坑,放的位置或邏輯不對極易退化成一對一模式,在實踐中,本人將監聽放入生產者函式中,進行迴圈監聽,main函式類比網上的設計,結果導致多個客戶端連結時,當第一個client處在輸入狀態,另起client時,即使後面的client輸入完成也得不到響應,得等待第一個client輸入處理結束,改了許久才實現,僅以此記錄自己踩過的坑)

pthread_t tid_produce[THREAD_NUM], tid_consume[THREAD_NUM];
for(int i=0; i < THREAD_NUM; i++){
     pthread_create(&tid_produce[i], NULL, &produce, NULL);
}
for(int i=0; i<THREAD_NUM; i++){
    pthread_create(&tid_consume[i], NULL, &consume, NULL);
}
for(int i=0; i<THREAD_NUM; i++){
     pthread_join(tid_produce[i], NULL);
}
for(int i=0; i<THREAD_NUM; i++){
     pthread_join(tid_consume[i], NULL);
}

客戶端程式碼【簡單的socket程式設計】

#include <iostream>
#include <unistd.h>
#include <strings.h>
#include<string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <netdb.h>

#define PORT 9090
#define MAXDATASIZE 100

char receiveM[100];
char sendM[100];

int main(int argc, char *argv[]) {
    int fd, numbytes;
    struct hostent *he;
    struct sockaddr_in server;
    if (argc != 2) {
        std::cout<<"Usage args <IP Address>"<<std::endl;
        exit(1);
    }
    // 通過函式 gethostbyname()獲得字串形式的ip地址,並賦給he
    if ((he = gethostbyname(argv[1])) == NULL) {
        std::cout<<"gethostbyname() error"<<std::endl;
        exit(1);
    }
    // 產生套接字fd
    if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        std::cout<<"socket() error"<<std::endl;
        exit(1);
    }
    bzero(&server, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = htons(PORT);
    server.sin_addr = *((struct in_addr *) he->h_addr);
    if (connect(fd, (struct sockaddr *) &server, sizeof(struct sockaddr)) == -1) {
        std::cout<<"connect() error"<<std::endl;
        exit(1);
    }
    // 向伺服器傳送資料
    std::cout<<"send message to server:";
    fgets(sendM, 100, stdin);
    int send_le;
    send_le = strlen(sendM);
    sendM[send_le - 1] = '\0';
    send(fd, sendM, strlen(sendM), 0);
    // 從伺服器接收資料
    if ((numbytes = recv(fd, receiveM, MAXDATASIZE, 0)) == -1) {
        std::cout<<"recv() error";
        exit(1);
    }
    std::cout<<"receive message from server:"<<receiveM<<std::endl;
    close(fd);
}

多對多生產者消費者程式碼

#include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

static const int bufferSize = 4; // Item buffer size.
//static const int totalProducts = 10;   // How many items we plan to produce.

struct ItemRepository {
    void *item_buffer[bufferSize];
    size_t read_position;
    size_t write_position;
    size_t produced_item_counter;
    size_t consumed_item_counter;
    std::mutex mtx;
    std::mutex produced_item_counter_mtx;
    std::mutex consumed_item_counter_mtx;
    std::condition_variable repo_not_full;
    std::condition_variable repo_not_empty;
} gItemRepository;

typedef struct ItemRepository ItemRepository;


void ProduceItem(ItemRepository *ir, void *item) {
    std::unique_lock <std::mutex> lock(ir->mtx);
    while (((ir->write_position + 1) % bufferSize) == ir->read_position) {
        std::cout << "Producer is waiting for an empty slot...\n";
        (ir->repo_not_full).wait(lock);
    }

    (ir->item_buffer)[ir->write_position] = item;
    (ir->write_position)++;
    if (ir->write_position == bufferSize)
        ir->write_position = 0;

    (ir->repo_not_empty).notify_all();
    lock.unlock();
}

void *ConsumeItem(ItemRepository *ir) {
    void *data;
    std::unique_lock <std::mutex> lock(ir->mtx);
    while (ir->write_position == ir->read_position) {
        std::cout << "Consumer is waiting for items...\n";
        (ir->repo_not_empty).wait(lock);
    }

    data = (ir->item_buffer)[ir->read_position];
    (ir->read_position)++;
    if (ir->read_position >= bufferSize)
        ir->read_position = 0;
    (ir->repo_not_full).notify_all();
    lock.unlock();

    return data;
}

void *ProducerTask(void *args) {
    std::unique_lock <std::mutex> lock(gItemRepository.produced_item_counter_mtx);
    if (gItemRepository.produced_item_counter < totalProducts) {
        ++(gItemRepository.produced_item_counter);
        ProduceItem(&gItemRepository, args);
        std::cout << "Producer thread  is producing the " << gItemRepository.produced_item_counter
                  << "^th item" << std::endl;
    }
    lock.unlock();
}

void *ConsumerTask(void *args) {
    void *item;
    std::unique_lock <std::mutex> lock(gItemRepository.consumed_item_counter_mtx);
    if (gItemRepository.consumed_item_counter < totalProducts) {
        item = ConsumeItem(&gItemRepository);
        ++(gItemRepository.consumed_item_counter);
        std::cout << "Consumer thread  is consuming the " << gItemRepository.consumed_item_counter << "^th item"
                  << std::endl;
    }
    lock.unlock();
    int fd = *(int *) item;
    char buf[100];
    if (recv(fd, buf, 100, 0) == -1) {//receive data
        exit(1);
    }
    std::cout << "receive msg**************" << buf << std::endl;
    std::string res = buf;
    send(fd, res.c_str(), res.length(), 0);
    std::cout << "send finished..." << std::endl;
}

void InitItemRepository(ItemRepository *ir) {
    ir->write_position = 0;
    ir->read_position = 0;
    ir->produced_item_counter = 0;
    ir->consumed_item_counter = 0;
}

server函式端

#include <iostream>
#include <string.h>
#include <strings.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include<pthread.h>
#include "configParams.h"
#include "producerConsumerModel.h"

#define BACKLOG 1
using std::cout;
using std::endl;
using std::string;
using std::exception;

static const int THREAD_NUM = 4;
int main(int argc, char *argv[]) {
    string ip = "*.*.*.*";// change it
    int port = 9090;
    cout << ip << "  " << port << endl;
    int listenfd;
    struct sockaddr_in server;
    int connectfd;
    struct sockaddr_in client;
    socklen_t sin_size;
    sin_size = sizeof(struct sockaddr_in);

    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        exit(1);
    }
    int opt = SO_REUSEADDR;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    bzero(&server, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = htons(port);
    server.sin_addr.s_addr = htonl(INADDR_ANY);//local any ip(one machine has many network card,one card has many ip)
    // bind
    if (bind(listenfd, (struct sockaddr *) &server, sizeof(struct sockaddr)) == -1) {
        exit(1);
    }
    // listen
    if (listen(listenfd, BACKLOG) == -1) {
        exit(1);
    }
    //----------must notice as follow way------------------------
    InitItemRepository(&gItemRepository);
    while (1) {
        // accept
        pthread_t pdthread, cmthread; //define a pthread
        if ((connectfd = accept(listenfd, (struct sockaddr *) &client, &sin_size)) == -1) {
            exit(1);
        }
        pthread_create(&pdthread, NULL, ProducerTask, (void *) &connectfd);
        pthread_create(&cmthread, NULL, ConsumerTask, NULL);
    }
    close(listenfd);

}

參考文獻

http://www.cnblogs.com/haippy/p/3252092.html
https://github.com/forhappy/Cplusplus-Concurrency-In-Practice/blob/master/zh/chapter11-Application/11.1%20Producer-Consumer-solution.md