多執行緒基礎7 多執行緒與併發庫
多執行緒與併發庫
BlockingQueue佇列
BlockingQueue,如果BlockQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒.同樣,如果BlockingQueue是滿的,任何試圖往裡存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue裡有空間才會被喚醒繼續操作.
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTest { public static void main(String[] args) { final BlockingQueue queue = new ArrayBlockingQueue(3); for(int i=0;i<2;i++){ new Thread(){ public void run(){ while(true){ try { Thread.sleep((long)(Math.random()*1000)); System.out.println(Thread.currentThread().getName() + "準備放資料!"); queue.put(1); System.out.println(Thread.currentThread().getName() + "已經放了資料," + "佇列目前有" + queue.size() + "個數據"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } new Thread(){ public void run(){ while(true){ try { //將此處的睡眠時間分別改為100和1000,觀察執行結果 Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "準備取資料!"); queue.take(); System.out.println(Thread.currentThread().getName() + "已經取走資料," + "佇列目前有" + queue.size() + "個數據"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
ReentrantReadWriteLock讀寫鎖
1WriteLock 是完全互斥的當獲得WirteLock的時候此時其他執行緒不能讀也不能寫
2當獲得ReadLock的時候其他執行緒只能讀不能寫。
ReentrantReadWriteLock 和 ReentrantLock 沒有關係
ReentrantReadWriteLock會使用兩把鎖來解決問題,一個讀鎖,一個寫鎖
執行緒進入讀鎖的前提條件:
沒有其他執行緒的寫鎖,
沒有寫請求或者有寫請求,但呼叫執行緒和持有鎖的執行緒是同一個
執行緒進入寫鎖的前提條件:
沒有其他執行緒的讀鎖
沒有其他執行緒的寫鎖
class Queue3{ private Object data = null;//共享資料,只能有一個執行緒能寫該資料,但可以有多個執行緒同時讀該資料。 private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public void get(){ rwl.readLock().lock();//上讀鎖,其他執行緒只能讀不能寫 System.out.println(Thread.currentThread().getName() + " be ready to read data!"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "have read data :" + data); rwl.readLock().unlock(); //釋放讀鎖,最好放在finnaly裡面 } public void put(Object data){ rwl.writeLock().lock();//上寫鎖,不允許其他執行緒讀也不允許寫 System.out.println(Thread.currentThread().getName() + " be ready to write data!"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); rwl.writeLock().unlock();//釋放寫鎖 } }
簡單快取案例
package com.thread;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class CacheDemo {
private Map<String, Object> map = new HashMap<String, Object>();//快取器
private ReadWriteLock rwl = new ReentrantReadWriteLock();
public static void main(String[] args) {
}
public Object get(String id){
Object value = null;
rwl.readLock().lock();//首先開啟讀鎖,從快取中去取
try{
value = map.get(id);
if(value == null){ //如果快取中沒有釋放讀鎖,上寫鎖
rwl.readLock().unlock();
rwl.writeLock().lock();
try{
if(value == null){
value = "aaa"; //此時可以去資料庫中查詢,這裡簡單的模擬一下
}
}finally{
rwl.writeLock().unlock(); //釋放寫鎖
}
rwl.readLock().lock(); //然後再上讀鎖
}
}finally{
rwl.readLock().unlock(); //最後釋放讀鎖
}
return value;
}
}
CountDownLatch 控制執行緒順序
CountDownLatch是JAVA提供在java.util.concurrent包下的一個輔助類,可以把它看成是一個計數器,其內部維護著一個count計數,只不過對這個計數器的操作都是原子操作,同時只能有一個執行緒去操作這個計數器CountDownLatch cdOrder = new CountDownLatch(count) ; 傳入的count 表示能控制幾個執行緒。 呼叫物件上的cdOrder.await()方法那麼呼叫者就會一直阻塞在這裡。cdOrder .cutDown()方法,來使計數減1;如果count為0時阻塞的執行緒才能繼續執行
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountdownLatchTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch cdOrder = new CountDownLatch(1);
final CountDownLatch cdAnswer = new CountDownLatch(3);
for(int i=0;i<3;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
System.out.println("執行緒" + Thread.currentThread().getName() +
"正準備接受命令");
cdOrder.await();
System.out.println("執行緒" + Thread.currentThread().getName() +
"已接受命令");
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"迴應命令處理結果");
cdAnswer.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將釋出命令");
cdOrder.countDown();
System.out.println("執行緒" + Thread.currentThread().getName() +
"已傳送命令,正在等待結果");
cdAnswer.await();
System.out.println("執行緒" + Thread.currentThread().getName() +
"已收到所有響應結果");
} catch (Exception e) {
e.printStackTrace();
}
service.shutdown();
}
}
CyclicBarrier 允許一些執行緒相互等待
CyclicBarrier 允許一組執行緒互相等待,直到所有執行緒到達公共屏障點(CyclicBarrier .await())後繼續才繼續執行,cb.getNumberWaiting() 返回有幾個執行緒到達 屏障點
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CyclicBarrier cb = new CyclicBarrier(3);
for(int i=0;i<3;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println(cb.getParties()+"執行緒" + Thread.currentThread().getName()+
"即將到達地點1,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
cb.await();
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將到達地點2,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
cb.await();
Thread.sleep((long)(Math.random()*10000));
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}
Exchanger 兩個執行緒交換資料
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
service.execute(new Runnable(){
public void run() {
try {
String data1 = "zxx";
System.out.println("執行緒" + Thread.currentThread().getName() +
"正在把資料" + data1 +"換出去");
Thread.sleep((long)(Math.random()*10000));
String data2 = (String)exchanger.exchange(data1);
System.out.println("執行緒" + Thread.currentThread().getName() +
"換回的資料為" + data2);
}catch(Exception e){
}
}
});
service.execute(new Runnable(){
public void run() {
try {
String data1 = "lhm";
System.out.println("執行緒" + Thread.currentThread().getName() +
"正在把資料" + data1 +"換出去");
Thread.sleep((long)(Math.random()*10000));
String data2 = (String)exchanger.exchange(data1);
System.out.println("執行緒" + Thread.currentThread().getName() +
"換回的資料為" + data2);
}catch(Exception e){
}
}
});
}
}
Semaphore訊號量 控制執行緒併發數量
Semaphore是一件可以容納N人的房間,如果人不滿就可以進去,如果人滿了,就要等待有人出來。對於N=1的情況,稱為binary semaphore。一般的用法是,用於限制對於某一資源的同時訪問。
sp.acquire(); 表示進入 房間 sp.release(); 表示離開房間
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3);
for(int i=0;i<10;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("執行緒" + Thread.currentThread().getName() +
"進入,當前已有" + (3-sp.availablePermits()) + "個併發");
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執行緒" + Thread.currentThread().getName() +
"即將離開");
sp.release();
}
};
service.execute(runnable);
}
}
}
ThreadLocal 執行緒安全
ThreadLocal通過為每個執行緒提供一個獨立的變數副本解決了變數併發訪問的衝突問題。在很多情況下,ThreadLocal比直接使用synchronized同步機制解決執行緒安全問題更簡單,更方便,且結果程式擁有更高的併發性。
public class ThreadLocalTest {
final static ThreadLocal<String> threadLocal=new ThreadLocal<String>();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
threadLocal.set("1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("執行緒1 "+threadLocal.get());
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
threadLocal.set("2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("執行緒2 "+threadLocal.get());
}
}).start();
}
}
執行緒安全的list 和 set
java.util.concurrent.CopyOnWriteArrayList
java.util.concurrent.CopyOnWriteArraySet
這兩個集合 中 原始碼的 add()方法是有加鎖的 讀的時候沒有
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}