1. 程式人生 > >JAVA併發程式設計:阻塞佇列-ArrayBlockingQueue

JAVA併發程式設計:阻塞佇列-ArrayBlockingQueue

生活

有很多的不快樂,其實是源自不滿足,而不滿足,很多時候是源自於心不定,而心不定則是因為不清楚究竟自己要什麼,不清楚要什麼的結果就是什麼都想要,結果什麼都沒得到。

生產者消費者模式

生產者和消費者問題是執行緒模型中一個經典問題:
生產者和消費者在同一個時間段內共用一塊記憶體區域,由生產者在這塊記憶體區域建立消費者需要的資料,由消費者取走資料並消費。

對於生產者消費者模型的應用例項,JDK1.5提供了阻塞佇列。
下面來看下ArrayBlockingQueue,這是一個有界的,陣列結構的阻塞佇列。

ArrayBlockingQueue 成員組成

先來看下ArrayBlockingQueue的成員組成:

	//元素容器
    final Object[] items;
	//出隊索引
    int takeIndex;
	//入隊索引
    int putIndex;
    //佇列中元素個數
    int count;
    //鎖
    final ReentrantLock lock;
	//出隊條件
    private final Condition notEmpty;
	//入隊條件
    private final Condition notFull;

組成明瞭,出隊和入隊各有一個條件,互不干擾。

ArrayBlockingQueue 之建立

//建立指定長度的非公平 有界陣列阻塞佇列
   public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }


//可以指定fair為true建立公平的佇列
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

//可以傳入集合 構建一個初始化佇列
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

ArrayBlockingQueue 之生產

不阻塞生產:

//呼叫offer,如果滿了就報錯
    public boolean add(E e) {
        return super.add(e);
    }
    //
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
           //滿了就返回false;
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

阻塞生產:

//阻塞一定時間,不能入隊就返回false,可以中斷
 public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            insert(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

// 一直阻塞直到入隊或者被中斷
 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
        //當佇列滿時等待
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }

核心入隊方法:

private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        //入隊成功後喚醒等到在NotEmpty上的執行緒
        notEmpty.signal();
    }

ArrayBlockingQueue 之消費

不阻塞消費:

//  沒有元素返回null,有就返回並移除佇列中該元素
 public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : extract();
        } finally {
            lock.unlock();
        }
    }

 

//不阻塞返回資料 不移除佇列
public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : itemAt(takeIndex);
        } finally {
            lock.unlock();
        }
    }

阻塞消費:

//一直阻塞值到出隊 或者被中斷
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }
    //阻塞消費,超時後返回null,可以中斷
 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return extract();
        } finally {
            lock.unlock();
        }
    }
    

出隊核心方法:

 private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        //移除元素後並沒有整體往前挪,只是索引+1
        takeIndex = inc(takeIndex);
        --count;
        //出隊成功後喚醒等待在NotFull上的執行緒
        notFull.signal();
        return x;
    }

例項

public class ABQTest {

    public static class Bread{
        String name;

        String price;

        public Bread(String name, String price) {
            this.name = name;
            this.price = price;
        }

        @Override
        public String toString() {
            return String.format("[麵包:%s,價格:%s]",name,price);
        }
    }


    public static class Producer implements  Runnable{
        private ArrayBlockingQueue<Bread> queue;
        private Bread bread;
        private String name;
        private CountDownLatch latch;


        @Override
        public void run() {

            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                Thread.sleep(Long.valueOf(new Random().nextInt(1000)));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                queue.put(bread);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(String.format("name:%s,生產:%s",name,bread.toString()));
        }

        public Producer(ArrayBlockingQueue<Bread> queue, Bread bread, String name, CountDownLatch latch) {
            this.queue = queue;
            this.bread = bread;
            this.name = name;
            this.latch = latch;
        }
    }

    public static class Consumer implements  Runnable{
        private ArrayBlockingQueue<Bread> queue;
        private String name;
        private CountDownLatch latch;


        @Override
        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(Long.valueOf(new Random().nextInt(1000)));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Bread bread = null;
            try {
                bread=queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(String.format("name:%s,買了:%s",name,bread.toString()));
        }

        public Consumer(ArrayBlockingQueue<Bread> queue, String name, CountDownLatch latch) {
            this.queue = queue;
            this.name = name;
            this.latch = latch;
        }
    }

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(1);
        ArrayBlockingQueue<Bread> queue = new ArrayBlockingQueue<Bread>(4);
        new Thread(new Producer(queue,new Bread("肉鬆","10"),"趙",latch)).start();
        new Thread(new Producer(queue,new Bread("蛋黃","7"),"錢",latch)).start();
        new Thread(new Producer(queue,new Bread("火腿","15"),"孫",latch)).start();
        new Thread(new Producer(queue,new Bread("蔬菜","5"),"李",latch)).start();
        new Thread(new Producer(queue,new Bread("香腸","8"),"周",latch)).start();
        new Thread(new Consumer(queue,"A",latch)).start();
        new Thread(new Consumer(queue,"B",latch)).start();
        new Thread(new Consumer(queue,"C",latch)).start();
        new Thread(new Consumer(queue,"D",latch)).start();
        new Thread(new Consumer(queue,"E",latch)).start();

        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        latch.countDown();

    }

}

name:錢,生產:[麵包:蛋黃,價格:7]
name:周,生產:[麵包:香腸,價格:8]
name:A,買了:[麵包:蛋黃,價格:7]
name:B,買了:[麵包:香腸,價格:8]
name:趙,生產:[麵包:肉鬆,價格:10]
name:E,買了:[麵包:肉鬆,價格:10]
name:孫,生產:[麵包:火腿,價格:15]
name:C,買了:[麵包:火腿,價格:15]
name:D,買了:[麵包:蔬菜,價格:5]
name:李,生產:[麵包:蔬菜,價格:5]