Java執行緒安全佇列操作
阿新 • • 發佈:2019-01-10
題目如下:
使用 wait notify 實現一個佇列,佇列有2個方法,add 和 get 。add方法往佇列中新增元素,get方法往佇列中獲得元素。佇列必須是執行緒安全的。如果get執行時,佇列為空,執行緒必須阻塞等待,直到有佇列有資料。如果add時,佇列已經滿,則add執行緒要等待,直到佇列有空閒空間。
實現這麼一個佇列,並寫一個測試程式碼,使他工作在多執行緒的環境下,證明,它的工作是正確的。給出程式和執行的截圖。
問題分析:依據以上題目要求,多執行緒操作時將佇列鎖定即可。本人使用了兩種方式去模擬一個佇列,一種是陣列另一種是連結串列形式的佇列。
測試程式碼如下:
// static ArrayQueue _queue = new ArrayQueue();
static LinkedQueue<Object> _queue = new LinkedQueue<>();
static int maxAccess = 10;
@Test
public void test02() throws Exception {
// 使用 wait notify 實現一個佇列,佇列有2個方法,add 和 get
// 。add方法往佇列中新增元素,get方法往佇列中獲得元素。佇列必須是執行緒安全的。
// 如果get執行時,佇列為空,執行緒必須阻塞等待,直到有佇列有資料。如果add時,佇列已經滿,
// 則add執行緒要等待,直到佇列有空閒空間。
// 實現這麼一個佇列,並寫一個測試程式碼,使他工作在多執行緒的環境下,證明,它的工作是正確的。給出程式和執行的截圖。
Thread a1 = new Thread(new addThread(), "AddThread-1");
Thread a2 = new Thread(new addThread(), "AddThread-2");
Thread a3 = new Thread(new addThread(), "AddThread-3");
a1.start();
a2.start();
a3.start();
Thread g1 = new Thread(new getThread(), "GetThread-1");
Thread g2 = new Thread(new getThread(), "GetThread-2");
Thread g3 = new Thread(new getThread(), "GetThread-3");
g1.start();
g2.start();
g3.start();
a1.join();
a2.join();
a3.join();
g1.join();
g2.join();
g3.join();
}
/**
* 連結串列佇列
* @author sunhf
*
*/
static class LinkedQueue<T> {
private Node<T> head; //頭節點
private Node<T> tail; //尾節點
public void add(T obj) {
if(head == null) {
head = new Node<T>(obj, tail);
tail = head;
} else {
Node<T> next = new Node<T>(obj, null);
tail.setNext(next);
tail = next;
}
}
public Object poll() {
T result = head.getContent();
head = head.getNext();
return result;
}
public int size() {
int length = 0;
Node<T> temp = head;
while(true) {
if(temp == null) {
return length;
} else {
temp = temp.getNext();
length++;
}
}
}
class Node<T> {
private T content;
private Node<T> next; //下一個節點
public Node(T content, Node<T> next) {
super();
this.content = content;
this.next = next;
}
public T getContent() {
return content;
}
public void setContent(T content) {
this.content = content;
}
public Node<T> getNext() {
return next;
}
public void setNext(Node<T> next) {
this.next = next;
}
}
}
/**
* 陣列佇列
* @author sunhf
*
*/
static class ArrayQueue {
private Object[] _queue = new Object[10];
private int index = 0;
public void add(Object obj) {
if(index == _queue.length) {
Object[] temp = new Object[_queue.length];
System.arraycopy(_queue, 0, temp, 0, _queue.length * 2); // 佇列擴容
_queue = temp;
}
_queue[index] = obj;
index++;
}
public Object poll() {
Object rs = _queue[0]; // 消費佇列中第一個元素
Object[] temp = new Object[_queue.length];
System.arraycopy(_queue, 1, temp, 0, _queue.length - 1); // 向前移動佇列
_queue = temp;
index--;
return rs;
}
public int size() {
for(int i = 0; i < _queue.length; i++) {
if(_queue[i] == null) {
return i;
}
}
return _queue.length;
}
}
class addThread implements Runnable {
@Override
public void run() {
while (true) {
synchronized (_queue) {
System.out.println("生產者->" + _queue.size());
if (_queue.size() == maxAccess) {
try {
System.out.println("消費佇列已滿,等待消費……");
_queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
_queue.add(_queue.size() + 1);
_queue.notify();
}
}
}
}
}
class getThread implements Runnable {
@Override
public void run() {
while (true) {
synchronized (_queue) {
if (_queue.size() == 0) {
try {
System.out.println("消費佇列為空,等待生產……");
_queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println("消費者->" + _queue.poll());
_queue.notify();
}
}
}
}
}