1. 程式人生 > >jdk提供的阻塞佇列BlockingQueue下的五個實現簡單操作以及介紹

jdk提供的阻塞佇列BlockingQueue下的五個實現簡單操作以及介紹

package cn.yarne.com.base.test;

import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.Vector; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock;

import cn.yarne.com.base.test.BlockingQueueTest.Comp; import cn.yarne.com.base.test.BlockingQueueTest.DelayTest;

public class BlockingQueueTest {

    /**      * ArrayBlockingQueue是一個有界佇列 需要了解的是put,add,offer的區別:      * put 如果加不進去會阻塞,直到加進去為止 add 如果加不進去會丟擲錯誤 offer 如果加不進去會返回false      * @throws InterruptedException      */     private static void ArrayBlockingQueueTest() throws InterruptedException {         ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(5);         arrayBlockingQueue.add("a");         arrayBlockingQueue.add("b");         arrayBlockingQueue.add("c");         arrayBlockingQueue.add("d");         arrayBlockingQueue.add("e");         new Thread(() -> {             try {                 Thread.sleep(3000);                 String take = arrayBlockingQueue.take();                 System.out.println(take);             } catch (InterruptedException e) {                 // TODO Auto-generated catch block                 e.printStackTrace();             }         }).start();         arrayBlockingQueue.put("f");

        for (Iterator iterator = arrayBlockingQueue.iterator(); iterator.hasNext();) {             String string = (String) iterator.next();             System.out.println(string);         }     }

    /**      * LinkedBlockingQueueTest是一個無界的阻塞佇列,但是也可以給界限,其他的和ArrayBlockingQueue相似      */     public static void LinkedBlockingQueueTest() {         LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>();         linkedBlockingQueue.add("a");         linkedBlockingQueue.add("b");         linkedBlockingQueue.add("c");         boolean offer = linkedBlockingQueue.offer("d");         System.out.println(offer);         linkedBlockingQueue.offer("e");     }

    /**      * SynchronousQueue是一個沒有緩衝的佇列,這個佇列生產者生產的資料會直接被消費者進行消費      * 也就是說,如果沒有消費者(take)方法,那麼這個生產者是不能生產(put)資料的,會一直阻塞      */     public static void SynchronousQueueTest() throws InterruptedException {         SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();         System.out.println(synchronousQueue.size());         new Thread(() -> {             try {                 Thread.sleep(3000);                 String take = synchronousQueue.take();                 System.out.println(take + "取出");             } catch (InterruptedException e) {                 // TODO Auto-generated catch block                 e.printStackTrace();             }

        }).start();         new Thread(() -> {             try {                 synchronousQueue.put("新增");                 System.out.println(synchronousQueue.size());             } catch (InterruptedException e) {                 // TODO Auto-generated catch block                 e.printStackTrace();             }

        }).start();     }

    class Comp implements Comparable<Comp> {         private String name;         private int id;

        public Comp(String name, int id) {             super();             this.name = name;             this.id = id;         }

        public String getName() {             return name;         }

        public void setName(String name) {             this.name = name;         }

        public int getId() {             return id;         }

        public void setId(int id) {             this.id = id;         }

        @Override         public int compareTo(Comp o) {             return this.id - o.getId();         }

        @Override         public String toString() {             return "Comp [name=" + name + ", id=" + id + "]";         }

    }

    /**      * PriorityBlockingQueue 是一個基於優先順序的阻塞佇列      * 說到優先順序肯定是需要排序,所以說想要實現排序,傳入的物件必須實現Comparable介面      * 這個佇列的排序是在add或者take的時候,每次把最高優先順序的一個放在第一位,準備被下一次take出去      *       *       * @throws InterruptedException      */     public static void PriorityBlockingQueueTest() throws InterruptedException {         BlockingQueueTest blockingQueueTest = new BlockingQueueTest();         PriorityBlockingQueue<Comp> priorityBlockingQueue = new PriorityBlockingQueue<>();

        Comp comp2 = blockingQueueTest.new Comp("李四", 8);         Comp comp1 = blockingQueueTest.new Comp("張三", 3);         Comp comp3 = blockingQueueTest.new Comp("王五", 6);         Comp comp4 = blockingQueueTest.new Comp("王五", 7);

        priorityBlockingQueue.add(comp1);         priorityBlockingQueue.add(comp2);         priorityBlockingQueue.add(comp3);         priorityBlockingQueue.add(comp4);

        for (Iterator iterator = priorityBlockingQueue.iterator(); iterator.hasNext();) {             Comp comp = (Comp) iterator.next();             System.out.println(comp.toString());         }

        Comp take = priorityBlockingQueue.take();         Comp take2 = priorityBlockingQueue.take();         System.out.println(take.toString());         System.out.println(take2.toString());

    }

    class DelayTest implements Delayed {         private int id;         private long time;

        public DelayTest(int id, long time) {             super();             this.id = id;             this.time = time;         }

        public int getId() {             return id;         }

        public void setId(int id) {             this.id = id;         }

        public long getTime() {             return time;         }

        public void setTime(long time) {             this.time = time;         }

        @Override         public int compareTo(Delayed o) {             DelayTest delayTest = (DelayTest) o;             return this.id - delayTest.id;         }

        @Override         public long getDelay(TimeUnit unit) {             long ti=this.time-System.currentTimeMillis();             System.out.println(ti);             return ti;         }

        @Override         public String toString() {             return "DelayTest [id=" + id + ", time=" + time + "]";         }     }

    /**      * DelayQueue 是一個帶有延時時間的佇列,元素只有時間到了,take的時候才能取到元素      * 所以DelayQueue中存入的元素必須實現Delayed介面,主要判斷時間是否到期的方法是      * getDelay() 所以這個方法中的返回值必須是一個不斷減少的值,例如lthis.time-System.currentTimeMillis();      * 否則的話會導致take方法等不到過期的元素,所以會阻塞      * 這是一個沒有大小限制的佇列,使用場景很多,比如快取超時處理,任務超時的處理,空閒連線的關閉      * @throws InterruptedException      */     public static void DelayQueueTest() throws InterruptedException {         DelayQueue<DelayTest> delayQueue = new DelayQueue<>();         BlockingQueueTest blockingQueueTest = new BlockingQueueTest();         DelayTest delayTest1 = blockingQueueTest.new DelayTest(3, System.currentTimeMillis()+3000);         DelayTest delayTest2 = blockingQueueTest.new DelayTest(5, System.currentTimeMillis()+5000);         DelayTest delayTest3 = blockingQueueTest.new DelayTest(10, System.currentTimeMillis()+10000);         delayQueue.add(delayTest2);         delayQueue.add(delayTest3);         delayQueue.add(delayTest1);

        for (Iterator iterator = delayQueue.iterator(); iterator.hasNext();) {             DelayTest delayTest = (DelayTest) iterator.next();             System.out.println(delayTest.toString());         }         DelayTest take = delayQueue.take();         System.out.println(take.toString());         DelayTest take1 = delayQueue.take();         System.out.println(take1.toString());         DelayTest take2 = delayQueue.take();         System.out.println(take2.toString());     }          public static void main(String[] args) throws InterruptedException {              }

}