1. 程式人生 > >linux多執行緒程式設計(C):訊號量實現的執行緒安全佇列

linux多執行緒程式設計(C):訊號量實現的執行緒安全佇列

用訊號量實現的執行緒安全佇列。 簡單有用的示例程式, 比起互斥量的實現在多執行緒時效率更好。 cir_queue.h
  1. /*
  2.  * \File
  3.  * cir_queue.h
  4.  * \Brief
  5.  * circular queue
  6.  */
  7. #ifndef __CIR_QUEUE_H__
  8. #define __CIR_QUEUE_H__
  9. #define QUE_SIZE 8
  10. typedef int DataType;
  11. typedef struct cir_queue_t
  12. {
  13.   DataType data[QUE_SIZE];
  14.   int front;
  15.   int rear;
  16.   int count;
  17. }cir_queue_t;
  18. extern sem_t queue_sem; 
  19. void init_cir_queue(cir_queue_t* q);
  20. int
     is_empty_cir_queue(cir_queue_t* q);
  21. int is_full_cir_queue(cir_queue_t* q);
  22. void push_cir_queue(cir_queue_t* q, DataType x);
  23. DataType pop_cir_queue(cir_queue_t* q);
  24. DataType top_cir_queue(cir_queue_t* q);
  25. void destroy_cir_queue(cir_queue_t* q);
  26. void print_queue(cir_queue_t* q);
  27. #endif
main.c
  1. /*
  2.  * \File
  3.  * main.c
  4.  * \Breif
  5.  * Thread-safe circular-queue implemented by semaphore
  6.  * \Author
  7.  * Hank.yan
  8.  */
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <unistd.h>
  12. #include <string.h>
  13. #include <pthread.h>
  14. #include <semaphore.h>
  15. #include "cir_queue.h"
  16. void* thread_queue(void *arg);
  17. /*
  18.  * \Func
  19.  * main
  20.  */
  21. int main(int argc, char* argv[])
  22. {
  23.   int res;
  24.   cir_queue_t cq;
  25.   DataType e;
  26.   pthread_t a_thread, b_thread;
  27.   void* thread_result;
  28.   init_cir_queue(&cq);
  29.   push_cir_queue(&cq, 1);
  30.   push_cir_queue(&cq, 2);
  31.   push_cir_queue(&cq, 3);
  32.   print_queue(&cq);
  33.   res = pthread_create(&a_thread, NULL, thread_queue, (void*)&cq);
  34.   if (res != 0)
  35.   {
  36.     perror("Thread creation failed.");
  37.     exit(EXIT_FAILURE);
  38.   }
  39.   e = pop_cir_queue(&cq);    
  40.   e = pop_cir_queue(&cq);    
  41.   print_queue(&cq);
  42.   push_cir_queue(&cq, 9);
  43.   push_cir_queue(&cq, 100);
  44.   print_queue(&cq);
  45.   res = pthread_create(&b_thread, NULL, thread_queue, (void*)&cq);
  46.   if (res != 0)
  47.   {
  48.     perror("Thread creation failed.");
  49.     exit(EXIT_FAILURE);
  50.   }
  51.   e = pop_cir_queue(&cq);    
  52.   push_cir_queue(&cq, 20);
  53.   print_queue(&cq);
  54.   printf("Waiting for thread to finish...\n");
  55.   res = pthread_join(a_thread, &thread_result);
  56.   if (res != 0)
  57.   {
  58.     perror("Thread join failed.");
  59.     exit(EXIT_FAILURE);
  60.   }
  61.   print_queue(&cq);
  62.   printf("Waiting for thread to finish...\n");
  63.   res = pthread_join(b_thread, &thread_result);
  64.   if (res != 0)
  65.   {
  66.     perror("Thread join failed.");
  67.     exit(EXIT_FAILURE);
  68.   }
  69.   destroy_cir_queue(&cq);
  70.   printf("Thread joined, it returned %s\n", (char*)thread_result); 
  71.   exit(EXIT_SUCCESS);
  72. }
  73. void *thread_queue(void *cirqueue)
  74. {
  75.   int flag;
  76.   DataType element;
  77.   print_queue((cir_queue_t*)cirqueue);
  78.   flag = is_empty_cir_queue((cir_queue_t*)cirqueue);
  79.   print_queue((cir_queue_t*)cirqueue);
  80.   element = pop_cir_queue((cir_queue_t*)cirqueue);
  81.   element = pop_cir_queue((cir_queue_t*)cirqueue);
  82.   print_queue((cir_queue_t*)cirqueue);
  83.   push_cir_queue((cir_queue_t*)cirqueue, 5);
  84.   print_queue((cir_queue_t*)cirqueue);
  85.   push_cir_queue((cir_queue_t*)cirqueue, 99);
  86.   push_cir_queue((cir_queue_t*)cirqueue, 1000);
  87.   push_cir_queue((cir_queue_t*)cirqueue, 88);
  88.   print_queue((cir_queue_t*)cirqueue);
  89.   pthread_exit("Thank you for the cpu time.");
  90. }
cir_queue.c
  1. /*
  2.  * \File
  3.  * cir_queue.c
  4.  */
  5. #include <stdio.h>
  6. #include <stdlib.h>
  7. #include <unistd.h>
  8. #include <string.h>
  9. #include <pthread.h>
  10. #include <semaphore.h>
  11. #include "cir_queue.h"
  12. sem_t queue_sem;
  13. /*
  14.  * \Func
  15.  *
  16.  */
  17. void init_cir_queue(cir_queue_t *q)
  18. {    
  19.   int res;
  20.   /* Create semaphore */
  21.   res = sem_init(&queue_sem, 0, QUE_SIZE);
  22.   if (res != 0)
  23.   {
  24.     perror("Semaphore init failed.\n");
  25.     exit(EXIT_FAILURE);
  26.   }
  27.   memset(q->data, 0, QUE_SIZE*sizeof(DataType));
  28.   q->front = q->rear = 0;
  29.   q->count = 0;
  30. }
  31. /*
  32.  * \Func
  33.  *
  34.  */
  35. int is_empty_cir_queue(cir_queue_t *q)
  36. {
  37.   int empty_flag;
  38.   sem_wait(&queue_sem);    
  39.   empty_flag = q->front == q->rear;
  40.   sem_post(&queue_sem);
  41.   return empty_flag;
  42. }
  43. /*
  44.  * \Func
  45.  *
  46.  */
  47. int is_full_cir_queue(cir_queue_t *q)
  48. {
  49.   int full_flag;
  50.   sem_wait(&queue_sem);    
  51.   full_flag = q->rear == QUE_SIZE - 1 + q->front;
  52.   sem_post(&queue_sem);
  53.   return full_flag;
  54. }
  55. /*
  56.  * \Func
  57.  *
  58.  */
  59. void push_cir_queue(cir_queue_t *q, DataType x)
  60. {
  61.   if (is_full_cir_queue(q))
  62.   {
  63.     printf("queue overflow.\n");
  64.     return ;
  65.   }
  66.   sem_wait(&queue_sem);    
  67.   q->count++;
  68.   q->data[q->rear] = x;
  69.   q->rear = (q->rear+1) % QUE_SIZE;
  70.   sem_post(&queue_sem);
  71. }
  72. /*
  73.  * \Func
  74.  *
  75.  */
  76. DataType pop_cir_queue(cir_queue_t *q)
  77. {
  78.   DataType temp;
  79.   if (is_empty_cir_queue(q))
  80.   {
  81.     printf("queue empty.\n");
  82.     return 0;
  83.   }
  84.   sem_wait(&queue_sem);    
  85.   temp = q->data[q->front];
  86.   q->data[q->front] = 0;
  87.   q->count--;
  88.   q->front = (q->front+1) % QUE_SIZE;
  89.   sem_post(&queue_sem);
  90.   return temp;
  91. }
  92. /*
  93.  * \Func
  94.  *
  95.  */
  96. DataType top_cir_queue(cir_queue_t *q)
  97. {
  98.   DataType x; 
  99.   if (is_empty_cir_queue(q))
  100.   {
  101.     printf("queue is empty.\n");
  102.     return 0;
  103.   }
  104.   sem_wait(&queue_sem);    
  105.   x = q->data[q->front];
  106.   sem_post(&queue_sem);
  107.   return x;
  108. }
  109. void destroy_cir_queue(cir_queue_t *q)
  110. { 
  111.   sem_destroy(&queue_sem);
  112.   return;    
  113. }
  114. void print_queue(cir_queue_t* q)
  115. {
  116.   int index;
  117.   if (is_empty_cir_queue(q))
  118.   {
  119.     printf("queue is empty.\n");
  120.     return;
  121.   }
  122.   sem_wait(&queue_sem);    
  123.   printf("QUEUE: ");
  124.   for (index = 0; index < QUE_SIZE; index++)
  125.   {
  126.     printf(" %d ", q->data[index]);
  127.   }
  128.   printf("\n");
  129.   sem_post(&queue_sem);
  130.   return;
  131. }

makefile
  1. OBJECTS = main.o cir_queue.o
  2. CC = gcc
  3. CFLAGS = -D_REENTRANT -lpthread --Wall
  4. thrd_safe_queue: $(OBJECTS)
  5.   $(CC) $(CFLAGS) -o thrd_safe_queue $(OBJECTS)
  6. main.o: cir_queue.h
  7. cir_queue.o: cir_queue.h
  8. .PHONY:clean
  9. clean:
  10.   rm thrd_safe_queue $(OBJECTS)