1. 程式人生 > >Java執行緒安全佇列操作

Java執行緒安全佇列操作

題目如下:
使用 wait notify 實現一個佇列,佇列有2個方法,add 和 get 。add方法往佇列中新增元素,get方法往佇列中獲得元素。佇列必須是執行緒安全的。如果get執行時,佇列為空,執行緒必須阻塞等待,直到有佇列有資料。如果add時,佇列已經滿,則add執行緒要等待,直到佇列有空閒空間。
實現這麼一個佇列,並寫一個測試程式碼,使他工作在多執行緒的環境下,證明,它的工作是正確的。給出程式和執行的截圖。

問題分析:依據以上題目要求,多執行緒操作時將佇列鎖定即可。本人使用了兩種方式去模擬一個佇列,一種是陣列另一種是連結串列形式的佇列。
測試程式碼如下:

//   static ArrayQueue _queue = new ArrayQueue();
static LinkedQueue<Object> _queue = new LinkedQueue<>(); static int maxAccess = 10; @Test public void test02() throws Exception { // 使用 wait notify 實現一個佇列,佇列有2個方法,add 和 get // 。add方法往佇列中新增元素,get方法往佇列中獲得元素。佇列必須是執行緒安全的。 // 如果get執行時,佇列為空,執行緒必須阻塞等待,直到有佇列有資料。如果add時,佇列已經滿,
// 則add執行緒要等待,直到佇列有空閒空間。 // 實現這麼一個佇列,並寫一個測試程式碼,使他工作在多執行緒的環境下,證明,它的工作是正確的。給出程式和執行的截圖。 Thread a1 = new Thread(new addThread(), "AddThread-1"); Thread a2 = new Thread(new addThread(), "AddThread-2"); Thread a3 = new Thread(new addThread(), "AddThread-3"); a1.start(); a2.start(); a3.start(); Thread g1 = new
Thread(new getThread(), "GetThread-1"); Thread g2 = new Thread(new getThread(), "GetThread-2"); Thread g3 = new Thread(new getThread(), "GetThread-3"); g1.start(); g2.start(); g3.start(); a1.join(); a2.join(); a3.join(); g1.join(); g2.join(); g3.join(); } /** * 連結串列佇列 * @author sunhf * */ static class LinkedQueue<T> { private Node<T> head; //頭節點 private Node<T> tail; //尾節點 public void add(T obj) { if(head == null) { head = new Node<T>(obj, tail); tail = head; } else { Node<T> next = new Node<T>(obj, null); tail.setNext(next); tail = next; } } public Object poll() { T result = head.getContent(); head = head.getNext(); return result; } public int size() { int length = 0; Node<T> temp = head; while(true) { if(temp == null) { return length; } else { temp = temp.getNext(); length++; } } } class Node<T> { private T content; private Node<T> next; //下一個節點 public Node(T content, Node<T> next) { super(); this.content = content; this.next = next; } public T getContent() { return content; } public void setContent(T content) { this.content = content; } public Node<T> getNext() { return next; } public void setNext(Node<T> next) { this.next = next; } } } /** * 陣列佇列 * @author sunhf * */ static class ArrayQueue { private Object[] _queue = new Object[10]; private int index = 0; public void add(Object obj) { if(index == _queue.length) { Object[] temp = new Object[_queue.length]; System.arraycopy(_queue, 0, temp, 0, _queue.length * 2); // 佇列擴容 _queue = temp; } _queue[index] = obj; index++; } public Object poll() { Object rs = _queue[0]; // 消費佇列中第一個元素 Object[] temp = new Object[_queue.length]; System.arraycopy(_queue, 1, temp, 0, _queue.length - 1); // 向前移動佇列 _queue = temp; index--; return rs; } public int size() { for(int i = 0; i < _queue.length; i++) { if(_queue[i] == null) { return i; } } return _queue.length; } } class addThread implements Runnable { @Override public void run() { while (true) { synchronized (_queue) { System.out.println("生產者->" + _queue.size()); if (_queue.size() == maxAccess) { try { System.out.println("消費佇列已滿,等待消費……"); _queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { _queue.add(_queue.size() + 1); _queue.notify(); } } } } } class getThread implements Runnable { @Override public void run() { while (true) { synchronized (_queue) { if (_queue.size() == 0) { try { System.out.println("消費佇列為空,等待生產……"); _queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println("消費者->" + _queue.poll()); _queue.notify(); } } } } }