c++實現多對多生產者消費者和socket連用
阿新 • • 發佈:2018-12-04
背景:服務端實現一個多對多的生產者消費者模式,監聽某個埠,一旦有client連入,將socket存入佇列。通知消費者程序進行消費。在消費者程序中,拿到客戶端的socket,接收客戶端的資訊,並將接收到的資料返回服務端。
難點:鎖,server main函式如何生成多對多的執行緒(這是個大坑,放的位置或邏輯不對極易退化成一對一模式,在實踐中,本人將監聽放入生產者函式中,進行迴圈監聽,main函式類比網上的設計,結果導致多個客戶端連結時,當第一個client處在輸入狀態,另起client時,即使後面的client輸入完成也得不到響應,得等待第一個client輸入處理結束,改了許久才實現,僅以此記錄自己踩過的坑)
pthread_t tid_produce[THREAD_NUM], tid_consume[THREAD_NUM]; for(int i=0; i < THREAD_NUM; i++){ pthread_create(&tid_produce[i], NULL, &produce, NULL); } for(int i=0; i<THREAD_NUM; i++){ pthread_create(&tid_consume[i], NULL, &consume, NULL); } for(int i=0; i<THREAD_NUM; i++){ pthread_join(tid_produce[i], NULL); } for(int i=0; i<THREAD_NUM; i++){ pthread_join(tid_consume[i], NULL); }
客戶端程式碼【簡單的socket程式設計】
#include <iostream> #include <unistd.h> #include <strings.h> #include<string.h> #include <sys/types.h> #include <sys/socket.h> #include <stdlib.h> #include <netinet/in.h> #include <netdb.h> #define PORT 9090 #define MAXDATASIZE 100 char receiveM[100]; char sendM[100]; int main(int argc, char *argv[]) { int fd, numbytes; struct hostent *he; struct sockaddr_in server; if (argc != 2) { std::cout<<"Usage args <IP Address>"<<std::endl; exit(1); } // 通過函式 gethostbyname()獲得字串形式的ip地址,並賦給he if ((he = gethostbyname(argv[1])) == NULL) { std::cout<<"gethostbyname() error"<<std::endl; exit(1); } // 產生套接字fd if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { std::cout<<"socket() error"<<std::endl; exit(1); } bzero(&server, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(PORT); server.sin_addr = *((struct in_addr *) he->h_addr); if (connect(fd, (struct sockaddr *) &server, sizeof(struct sockaddr)) == -1) { std::cout<<"connect() error"<<std::endl; exit(1); } // 向伺服器傳送資料 std::cout<<"send message to server:"; fgets(sendM, 100, stdin); int send_le; send_le = strlen(sendM); sendM[send_le - 1] = '\0'; send(fd, sendM, strlen(sendM), 0); // 從伺服器接收資料 if ((numbytes = recv(fd, receiveM, MAXDATASIZE, 0)) == -1) { std::cout<<"recv() error"; exit(1); } std::cout<<"receive message from server:"<<receiveM<<std::endl; close(fd); }
多對多生產者消費者程式碼
#include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
static const int bufferSize = 4; // Item buffer size.
//static const int totalProducts = 10; // How many items we plan to produce.
struct ItemRepository {
void *item_buffer[bufferSize];
size_t read_position;
size_t write_position;
size_t produced_item_counter;
size_t consumed_item_counter;
std::mutex mtx;
std::mutex produced_item_counter_mtx;
std::mutex consumed_item_counter_mtx;
std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;
} gItemRepository;
typedef struct ItemRepository ItemRepository;
void ProduceItem(ItemRepository *ir, void *item) {
std::unique_lock <std::mutex> lock(ir->mtx);
while (((ir->write_position + 1) % bufferSize) == ir->read_position) {
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock);
}
(ir->item_buffer)[ir->write_position] = item;
(ir->write_position)++;
if (ir->write_position == bufferSize)
ir->write_position = 0;
(ir->repo_not_empty).notify_all();
lock.unlock();
}
void *ConsumeItem(ItemRepository *ir) {
void *data;
std::unique_lock <std::mutex> lock(ir->mtx);
while (ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock);
}
data = (ir->item_buffer)[ir->read_position];
(ir->read_position)++;
if (ir->read_position >= bufferSize)
ir->read_position = 0;
(ir->repo_not_full).notify_all();
lock.unlock();
return data;
}
void *ProducerTask(void *args) {
std::unique_lock <std::mutex> lock(gItemRepository.produced_item_counter_mtx);
if (gItemRepository.produced_item_counter < totalProducts) {
++(gItemRepository.produced_item_counter);
ProduceItem(&gItemRepository, args);
std::cout << "Producer thread is producing the " << gItemRepository.produced_item_counter
<< "^th item" << std::endl;
}
lock.unlock();
}
void *ConsumerTask(void *args) {
void *item;
std::unique_lock <std::mutex> lock(gItemRepository.consumed_item_counter_mtx);
if (gItemRepository.consumed_item_counter < totalProducts) {
item = ConsumeItem(&gItemRepository);
++(gItemRepository.consumed_item_counter);
std::cout << "Consumer thread is consuming the " << gItemRepository.consumed_item_counter << "^th item"
<< std::endl;
}
lock.unlock();
int fd = *(int *) item;
char buf[100];
if (recv(fd, buf, 100, 0) == -1) {//receive data
exit(1);
}
std::cout << "receive msg**************" << buf << std::endl;
std::string res = buf;
send(fd, res.c_str(), res.length(), 0);
std::cout << "send finished..." << std::endl;
}
void InitItemRepository(ItemRepository *ir) {
ir->write_position = 0;
ir->read_position = 0;
ir->produced_item_counter = 0;
ir->consumed_item_counter = 0;
}
server函式端
#include <iostream>
#include <string.h>
#include <strings.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include<pthread.h>
#include "configParams.h"
#include "producerConsumerModel.h"
#define BACKLOG 1
using std::cout;
using std::endl;
using std::string;
using std::exception;
static const int THREAD_NUM = 4;
int main(int argc, char *argv[]) {
string ip = "*.*.*.*";// change it
int port = 9090;
cout << ip << " " << port << endl;
int listenfd;
struct sockaddr_in server;
int connectfd;
struct sockaddr_in client;
socklen_t sin_size;
sin_size = sizeof(struct sockaddr_in);
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
exit(1);
}
int opt = SO_REUSEADDR;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&server, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr.s_addr = htonl(INADDR_ANY);//local any ip(one machine has many network card,one card has many ip)
// bind
if (bind(listenfd, (struct sockaddr *) &server, sizeof(struct sockaddr)) == -1) {
exit(1);
}
// listen
if (listen(listenfd, BACKLOG) == -1) {
exit(1);
}
//----------must notice as follow way------------------------
InitItemRepository(&gItemRepository);
while (1) {
// accept
pthread_t pdthread, cmthread; //define a pthread
if ((connectfd = accept(listenfd, (struct sockaddr *) &client, &sin_size)) == -1) {
exit(1);
}
pthread_create(&pdthread, NULL, ProducerTask, (void *) &connectfd);
pthread_create(&cmthread, NULL, ConsumerTask, NULL);
}
close(listenfd);
}
參考文獻
http://www.cnblogs.com/haippy/p/3252092.html
https://github.com/forhappy/Cplusplus-Concurrency-In-Practice/blob/master/zh/chapter11-Application/11.1%20Producer-Consumer-solution.md