1. 程式人生 > >《Linux高性能服務器編程》學習總結(十三)——多進程編程

《Linux高性能服務器編程》學習總結(十三)——多進程編程

truct 同步 客戶端 event dwr ram author end 讀寫

  在多進程編程中,我們用fork系統調用創建子進程,值得註意的是,fork函數復制當前進程並在內核進程表中創建一個新的表項,其堆、棧指針,標誌寄存器的值都和父進程相同,但是其ppid被設置成父進程pid,信號位圖被清除。而子進程代碼和父進程完全相同,其數據也會復制自父進程,但是其復制過程是寫時復制,即父子任意進程對數據執行寫操作時才會復制,首先是缺頁中斷,然後操作系統給子進程分配空間並復制數據。此外,創建子進程後父進程中打開的文件描述符在子進程中也是默認打開的,其文件描述符引用計數加一,父進程的用戶根目錄,當前工作目錄等變量的引用計數均會加一。

  僵屍進程是多進程編程中需要了解和避免的情況,僵屍進程的產生是由於子進程先於父進程退出,而此時父進程沒有正常接收子進程退出狀態,導致子進程脫離父進程存在於操作系統中,還占據了原有的進程描述符和資源,造成了資源的浪費。避免僵屍進程的產生共有三種方法:第一種是處理SIGCHLD信號,當子進程退出時會向父進程發送SIGCHLD信號,我們可以指明信號處理函數來進行處理;第二種是拖管法,由父進程創建一個中間進程,再由這個中間進程創建子進程後直接退出,這樣子進程變成了孤兒進程,會由init進程托管,由init進程負責回收其資源,但是這樣就破壞了父子進程之間的血緣關系;第三種就是阻塞法,調用wait函數等待子進程退出,缺點就是會使父進程進入阻塞狀態。對於wait函數,其進入阻塞狀態顯然是我們所不允許的,所以waitpid函數就解決了這個問題。

  在多進程編程中,一個最為重要的需要解決的問題就是進程間通信IPC,我們常用的IPC方法有:管道、信號量、共享內存和消息隊列。其系統調用較為簡單,我們只來簡單說明一下這四種方式的區別和異同。首先管道是我們在前文就提到過的,而且講過了再網絡編程中經常使用socketpair函數創建一個雙向管道,管道分為無名管道和有名管道,無名管道只能用於有親緣關系的進程之間進行通信,而且只能用低級文件編程庫中的讀寫函數,而有名管道就是創建一個管道文件,通過文件進行信息交流,所以沒有親緣關系的限制。信號量是用來實現多進程之間的互斥與同步的,其本質是一個整形的數,我們用pv操作來對其進行操作,如果是二進制的信號量,則可以用信號量保證對於臨界資源來講同一時刻只有一段代碼能對其進行訪問。共享內存是在內存空間創建或獲取一段空間來讓各個進程進行共享,通過這段空間進行信息交流,而這種方式就比較靈活,可以自定其數據結構,完成各式各樣的功能。消息隊列就是一個能存放消息的隊列,各個進程將消息發送到消息隊列中,並指明要發送的對象,其他程序可以直接監聽這個隊列,收取發給自己的消息。幾種進程間通信的方式都很普遍,我們來看一個用共享內存實現的聊天室服務器程序:

  

  1 /*************************************************************************
  2     > File Name: 13-4.cpp
  3     > Author: Torrance_ZHANG
  4     > Mail: [email protected]
  5     > Created Time: Wed 14 Feb 2018 04:18:04 PM PST
  6  ***********************************************************************
*/ 7 8 #include"head.h" 9 using namespace std; 10 11 #define USER_LIMIT 5 12 #define BUFFER_SIZE 1024 13 #define FD_LIMIT 65535 14 #define MAX_EVENT_NUMBER 1024 15 #define PROCESS_LIMIT 65536 16 17 struct client_data { 18 sockaddr_in address; 19 int connfd; 20 pid_t pid; 21 int pipefd[2]; 22 }; 23 24 static const char* shm_name = "/my_shm"; 25 int sig_pipefd[2]; 26 int epollfd; 27 int listenfd; 28 int shmfd; 29 char* share_mem = 0; 30 client_data* users = 0; 31 int* sub_process = 0; 32 int user_count = 0; 33 bool stop_child = false; 34 35 int setnonblocking(int fd) { 36 int old_option = fcntl(fd, F_GETFL); 37 int new_option = old_option | O_NONBLOCK; 38 fcntl(fd, F_SETFL, new_option); 39 return old_option; 40 } 41 42 void addfd(int epollfd, int fd) { 43 epoll_event event; 44 event.data.fd = fd; 45 event.events = EPOLLIN | EPOLLET; 46 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); 47 setnonblocking(fd); 48 } 49 50 void sig_handler(int sig) { 51 int save_errno = errno; 52 int msg = sig; 53 send(sig_pipefd[1], (char*)&msg, 1, 0); 54 errno = save_errno; 55 } 56 57 void addsig(int sig, void(*handler)(int), bool restart = true) { 58 struct sigaction sa; 59 memset(&sa, 0, sizeof(sa)); 60 sa.sa_handler = handler; 61 if(restart) sa.sa_flags |= SA_RESTART; 62 sigfillset(&sa.sa_mask); 63 assert(sigaction(sig, &sa, NULL) != -1); 64 } 65 66 void del_resource() { 67 close(sig_pipefd[0]); 68 close(sig_pipefd[1]); 69 close(listenfd); 70 close(epollfd); 71 shm_unlink(shm_name); 72 delete [] users; 73 delete [] sub_process; 74 } 75 76 void child_term_handler(int sig) { 77 stop_child = true; 78 } 79 80 int run_child(int idx, client_data* users, char* share_mem) { 81 epoll_event events[MAX_EVENT_NUMBER]; 82 int child_epollfd = epoll_create(5); 83 assert(child_epollfd != -1); 84 int connfd = users[idx].connfd; 85 addfd(child_epollfd, connfd); 86 int pipefd = users[idx].pipefd[1]; 87 addfd(child_epollfd, pipefd); 88 int ret; 89 addsig(SIGTERM, child_term_handler, false); 90 91 while(!stop_child) { 92 int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1); 93 if((number < 0) && (errno != EINTR)) { 94 printf("epoll failure\n"); 95 break; 96 } 97 for(int i = 0; i < number; i ++) { 98 int sockfd = events[i].data.fd; 99 if((sockfd == connfd) && (events[i].events & EPOLLIN)) { 100 memset(share_mem + idx * BUFFER_SIZE, 0, BUFFER_SIZE); 101 ret = recv(connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0); 102 if(ret < 0) { 103 if(errno != EAGAIN) { 104 stop_child = true; 105 } 106 } 107 else if(ret == 0) { 108 stop_child = true; 109 } 110 else { 111 send(pipefd, (char*)&idx, sizeof(idx), 0); 112 } 113 } 114 else if((sockfd == pipefd) && (events[i].events & EPOLLIN)) { 115 int client = 0; 116 ret = recv(sockfd, (char*)&client, sizeof(client), 0); 117 if(ret < 0) { 118 if(errno != EAGAIN) { 119 stop_child = true; 120 } 121 } 122 else if(ret == 0) { 123 stop_child = true; 124 } 125 else { 126 send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0); 127 } 128 } 129 else continue; 130 } 131 } 132 close(connfd); 133 close(pipefd); 134 close(child_epollfd); 135 return 0; 136 } 137 138 int main(int argc, char** argv) { 139 if(argc <= 2) { 140 printf("usage: %s ip_address port_number\n", basename(argv[0])); 141 return 1; 142 } 143 const char* ip = argv[1]; 144 int port = atoi(argv[2]); 145 146 int ret = 0; 147 struct sockaddr_in address; 148 bzero(&address, sizeof(address)); 149 address.sin_family = AF_INET; 150 inet_pton(AF_INET, ip, &address.sin_addr); 151 address.sin_port = htons(port); 152 153 listenfd = socket(AF_INET, SOCK_STREAM, 0); 154 assert(listenfd >= 0); 155 156 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 157 assert(ret != -1); 158 159 ret = listen(listenfd, 5); 160 assert(ret != -1); 161 162 user_count = 0; 163 users = new client_data[USER_LIMIT + 1]; 164 sub_process = new int[PROCESS_LIMIT]; 165 for(int i = 0; i < PROCESS_LIMIT; i ++) { 166 sub_process[i] = -1; 167 } 168 epoll_event events[MAX_EVENT_NUMBER]; 169 epollfd = epoll_create(5); 170 assert(epollfd != -1); 171 addfd(epollfd, listenfd); 172 173 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sig_pipefd); 174 assert(ret != -1); 175 setnonblocking(sig_pipefd[1]); 176 addfd(epollfd, sig_pipefd[0]); 177 178 addsig(SIGCHLD, sig_handler); 179 addsig(SIGTERM, sig_handler); 180 addsig(SIGINT, sig_handler); 181 addsig(SIGPIPE, SIG_IGN); 182 bool stop_server = false; 183 bool terminate = false; 184 185 shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666); 186 assert(shmfd != -1); 187 ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE); 188 assert(ret != -1); 189 190 share_mem = (char*)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0); 191 assert(share_mem != MAP_FAILED); 192 close(shmfd); 193 194 while(!stop_server) { 195 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 196 if((number < 0) && (errno != EINTR)) { 197 printf("epoll failure\n"); 198 break; 199 } 200 for(int i = 0; i < number; i ++) { 201 int sockfd = events[i].data.fd; 202 if(sockfd == listenfd) { 203 struct sockaddr_in client_address; 204 socklen_t client_addrlength = sizeof(client_address); 205 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 206 if(connfd < 0) { 207 printf("errno is: %d\n", errno); 208 continue; 209 } 210 if(user_count >= USER_LIMIT) { 211 const char* info = "too many users\n"; 212 printf("%s", info); 213 send(connfd, info, strlen(info), 0); 214 close(connfd); 215 continue; 216 } 217 users[user_count].address = client_address; 218 users[user_count].connfd = connfd; 219 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd); 220 assert(ret != -1); 221 pid_t pid = fork(); 222 if(pid < 0) { 223 close(connfd); 224 continue; 225 } 226 else if(pid == 0) { 227 close(epollfd); 228 close(listenfd); 229 close(users[user_count].pipefd[0]); 230 close(sig_pipefd[0]); 231 close(sig_pipefd[1]); 232 run_child(user_count, users, share_mem); 233 munmap((void*)share_mem, USER_LIMIT* BUFFER_SIZE); 234 exit(0); 235 } 236 else { 237 close(connfd); 238 close(users[user_count].pipefd[1]); 239 addfd(epollfd, users[user_count].pipefd[0]); 240 users[user_count].pid = pid; 241 sub_process[pid] = user_count; 242 user_count ++; 243 } 244 } 245 else if((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) { 246 int sig; 247 char signals[1024]; 248 ret = recv(sig_pipefd[0], signals, sizeof(signals), 0); 249 if(ret == -1) continue; 250 else if(ret == 0) continue; 251 else { 252 for(int i = 0; i < ret; i ++) { 253 switch(signals[i]) { 254 case SIGCHLD: { 255 pid_t pid; 256 int stat; 257 while((pid = waitpid(-1, &stat, WNOHANG)) > 0) { 258 int del_user = sub_process[pid]; 259 sub_process[pid] = -1; 260 if((del_user < 0) || (del_user > USER_LIMIT)) { 261 continue; 262 } 263 epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0); 264 close(users[del_user].pipefd[0]); 265 users[del_user] = users[-- user_count]; 266 sub_process[users[del_user].pid] = del_user; 267 } 268 if(terminate && user_count == 0) { 269 stop_server = true; 270 } 271 break; 272 } 273 case SIGTERM: 274 case SIGINT: { 275 printf("kill all the child now\n"); 276 if(user_count == 0) { 277 stop_server = true; 278 break; 279 } 280 for(int i = 0; i < user_count; i ++) { 281 int pid = users[i].pid; 282 kill(pid, SIGTERM); 283 } 284 terminate = true; 285 break; 286 } 287 default : break; 288 } 289 } 290 } 291 } 292 else if(events[i].events & EPOLLIN) { 293 int child = 0; 294 ret = recv(sockfd, (char*)&child, sizeof(child), 0); 295 printf("read data from child accross pipe\n"); 296 if(ret == -1) continue; 297 else if(ret == 0) continue; 298 else { 299 for(int j = 0; j < user_count; j ++) { 300 if(users[j].pipefd[0] != sockfd) { 301 printf("send data to child accross pipe\n"); 302 send(users[j].pipefd[0], (char*)&child, sizeof(child), 0); 303 } 304 } 305 } 306 } 307 } 308 } 309 del_resource(); 310 return 0; 311 }

技術分享圖片

  服務器接收到客戶端數據時,通過管道告知專門為其他服務器服務的子進程向對應客戶端發送數據。

  我們知道,當父進程創建子進程之後,父進程中打開的文件描述符仍然保持打開,所以文件描述符可以很方便地從父進程傳遞到子進程。需要特別註意的是進程描述符的傳遞並不是單純地傳送一個值,而是要在接收進程中創建一個新的文件描述符,並且該文件描述符和發送進程中被傳遞的文件描述符指向內核中的同一個文件表項。那麽問題就來了,我們如何從子進程將文件描述符傳送給父進程呢?推廣來說,如何在不相關的兩個進程之間傳送文件描述符呢?這時我們就需要利用UNIX域socket在進程間傳遞特殊的輔助數據,我們來看一個例子:

 1 /*************************************************************************
 2     > File Name: 13-5.cpp
 3     > Author: Torrance_ZHANG
 4     > Mail: [email protected]
 5     > Created Time: Tue 27 Feb 2018 03:35:13 AM PST
 6  ************************************************************************/
 7 
 8 #include"head.h"
 9 using namespace std;
10 
11 static const int CONTROL_LEN = CMSG_LEN(sizeof(int));
12 
13 void send_fd(int fd, int fd_to_send) {
14     struct iovec iov[1];
15     struct msghdr msg;
16     char buf[0];
17 
18     iov[0].iov_base = buf;
19     iov[0].iov_len = 1;
20     msg.msg_name = NULL;
21     msg.msg_namelen = 0;
22     msg.msg_iov = iov;
23     msg.msg_iovlen = 1;
24 
25     cmsghdr cm;
26     cm.cmsg_len = CONTROL_LEN;
27     cm.cmsg_level = SOL_SOCKET;
28     cm.cmsg_type = SCM_RIGHTS;
29     *(int *)CMSG_DATA(&cm) = fd_to_send;
30     msg.msg_control = &cm;
31     msg.msg_controllen = CONTROL_LEN;
32 
33     sendmsg(fd, &msg, 0);
34 }
35 
36 int recv_fd(int fd) {
37     struct iovec iov[1];
38     struct msghdr msg;
39     char buf[0];
40 
41     iov[0].iov_base = buf;
42     iov[0].iov_len = 1;
43     msg.msg_name = NULL;
44     msg.msg_namelen = 0;
45     msg.msg_iov = iov;
46     msg.msg_iovlen = 1;
47     
48     cmsghdr cm;
49     msg.msg_control = &cm;
50     msg.msg_controllen = CONTROL_LEN;
51     recvmsg(fd, &msg, 0);
52 
53     int fd_to_read = *(int*)CMSG_DATA(&cm);
54     return fd_to_read;
55 }
56 
57 int main() {
58     int pipefd[2];
59     int fd_to_pass = 0;
60     int ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, pipefd);
61     assert(ret != -1);
62 
63     pid_t pid = fork();
64     assert(pid >= 0);
65 
66     if(pid == 0) {
67         close(pipefd[0]);
68         fd_to_pass = open("test.txt", O_RDWR, 0666);
69         send_fd(pipefd[1], (fd_to_pass > 0) ? fd_to_pass : 0);
70         close(fd_to_pass);
71         exit(0);
72     }
73 
74     close(pipefd[1]);
75     fd_to_pass = recv_fd(pipefd[0]);
76     char buf[1024];
77     memset(buf, 0, sizeof(buf));
78     read(fd_to_pass, buf, 1024);
79     printf("I got fd %d and data %s\n", fd_to_pass, buf);
80     close(fd_to_pass);
81 }

技術分享圖片

《Linux高性能服務器編程》學習總結(十三)——多進程編程