《實戰Java高併發程式設計》學習總結(2)
第3章 JDK併發包
1 synchronized的功能擴充套件:重入鎖。使用java.util.concurrent.locks.ReentrantLock類來實現。
import java.util.concurrent.locks.ReentrantLock; public class ReenterLock implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { // TODO Auto-generated method stub for(int j=0;j<100000000;j++){ lock.lock(); try{ i++; }finally{ lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { ReenterLock tl = new ReenterLock(); Thread t1 = new Thread(tl); Thread t2 = new Thread(tl); t1.start();t2.start(); t1.join();t2.join(); System.out.println(i); } }
重入鎖比synchronized靈活外,還提供一些高階功能
- 中斷響應。synchronized必須要等到鎖才能繼續執行,而重入鎖是支援執行緒可以被中斷。
import java.util.concurrent.locks.ReentrantLock; public class IntLock implements Runnable{ public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public IntLock(int lock){ this.lock = lock; } @Override public void run() { // TODO Auto-generated method stub try{ if(lock == 1){ lock1.lockInterruptibly(); try{ Thread.sleep(500); }catch(InterruptedException e){ } lock2.lockInterruptibly(); }else{ lock2.lockInterruptibly(); try{ Thread.sleep(500); }catch(InterruptedException e){ } lock1.lockInterruptibly(); } }catch(InterruptedException e){ e.printStackTrace(); }finally{ if(lock1.isHeldByCurrentThread()) lock1.unlock(); if(lock2.isHeldByCurrentThread()) lock2.unlock(); System.out.println(Thread.currentThread().getId()+":執行緒退出"); } } public static void main(String[] args) throws InterruptedException{ IntLock r1 = new IntLock(1); IntLock r2 = new IntLock(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start();t2.start(); Thread.sleep(1000); t2.interrupt(); } }
- 鎖申請等待限時,使用tryLock()
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; public class TimeLock implements Runnable { public static ReentrantLock lock = new ReentrantLock(); @Override public void run() { // TODO Auto-generated method stub try{ if(lock.tryLock(5,TimeUnit.SECONDS)){ //5秒內無法申請到鎖則失敗 Thread.sleep(6000); }else{ System.out.println("get lock failed"); } }catch(InterruptedException e){ e.printStackTrace(); }finally{ if(lock.isHeldByCurrentThread()) lock.unlock(); } } public static void main(String[] args){ TimeLock tl = new TimeLock(); Thread t1 = new Thread(tl); Thread t2 = new Thread(tl); t1.start(); t2.start(); } }
- 公平鎖,當引數fair為true時表示鎖為公平。但公平鎖的實現成本高效能相對不佳,預設鎖是非公平的
public ReentrantLock( boolean fair )
import java.util.concurrent.locks.ReentrantLock;
public class FairLock implements Runnable{
public static ReentrantLock fairLock = new ReentrantLock(true);
@Override
public void run() {
// TODO Auto-generated method stub
while(true){
try{
fairLock.lock();
System.out.println(Thread.currentThread().getName()+" 獲得鎖");
}finally{
fairLock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException{
FairLock fl = new FairLock();
Thread t1 = new Thread(fl,"Thread_t1");
Thread t2 = new Thread(fl,"Thread_t2");
t1.start();t2.start();
}
}
輸出 (交替獲取)
Thread_t1 獲得鎖
Thread_t2 獲得鎖
Thread_t1 獲得鎖
Thread_t2 獲得鎖
Thread_t1 獲得鎖
Thread_t2 獲得鎖
Thread_t1 獲得鎖
Thread_t2 獲得鎖
Thread_t1 獲得鎖
Thread_t2 獲得鎖
Thread_t1 獲得鎖
Thread_t2 獲得鎖
Thread_t1 獲得鎖
Thread_t2 獲得鎖
Thread_t1 獲得鎖
Thread_t2 獲得鎖
Thread_t1 獲得鎖
2 重入鎖ReentrantLock的幾個重要方法:
- lock( ) :獲得鎖,如果鎖已經被佔用則等待。
- lockInterruptibly( ):獲得鎖,但優先響應中斷
- tryLock( ):嘗試獲得鎖,如果成功返回true,失敗返回false。不等待立即返回
- tryLock( long time , TimeUnit unit ):在給定時間內嘗試獲得鎖
3 重入鎖的好搭檔:Condition條件。通過Lock介面的Condition newCondition( )方法可以生成一個與當前重入鎖繫結的Condition例項。利用Condition物件,可以讓執行緒在合適的時間等待或得到通知繼續執行。介面基本方法如下
- void await( ) throws InterruptedException // 當前執行緒等待,釋放鎖,其他執行緒用signal()或signalAll()時,執行緒重新獲取鎖繼續執行。或當執行緒被中斷時也能跳出等待
- void awaitUninterruptibly( ) // 跟await()基本相同,只是它不會在等待過程中響應中斷
- void awaitNanos(long nanosTimeout) throws InterruptedException
- boolean await(long time , TimeUnit unit) throws InterruptedException
- boolean awaitUntil(Date deadline) throws InterruptedException
- void signal( ) // 用於喚醒一個在等待中的執行緒
- void signalAll( )
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockCondition implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
@Override
public void run() {
// TODO Auto-generated method stub
try{
lock.lock();
condition.await(); // 等待通知
System.out.println("Thread is going on");
}catch(InterruptedException e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException{
ReentrantLockCondition tl = new ReentrantLockCondition();
Thread t1 = new Thread(tl);
t1.start();
Thread.sleep(2000);
lock.lock(); // 呼叫signal()前要求當前執行緒先獲取相關的鎖
condition.signal();
lock.unlock(); // 呼叫signal()後要求釋放當前鎖謙讓給喚醒的執行緒
}
}
注:呼叫signal()前要求當前執行緒先獲得相關的鎖,呼叫後也要釋放鎖給喚醒的執行緒。
4 允許多個執行緒同時訪問:訊號量(Semaphore) ,可以指定多個執行緒同時訪問某一個資源。函式如下
- public Semaphore(int permits) // 必須指定訊號量的准入數,每個執行緒只能申請一個許可。
- public Semaphore(int permits , boolean fair) // 第二個引數指定是否公平
- public void acquire( ) // 嘗試獲得一個准入許可,無法獲得則等待,直到有執行緒釋放一個許可或當前執行緒被中斷
- public void acquireUninterruptibly( ) // 跟acquire()基本一樣,只是不響應中斷
- public boolean tryAcquire( ) // 嘗試獲得一個准入許可,獲得則返回true,失敗則返回false。不會等待立即返回
- public boolean tryAcquire(long timeout , TimeUnit unit)
- public void release( ) // 用於線上程訪問資源結束後釋放一個許可
5 ReadWriteLock 讀寫鎖,要比重入鎖或內部鎖更針對
6 倒計時器:CountDownLatch ,是一個非常實用的多執行緒控制工具類。通常用來控制執行緒等待,讓一個某個執行緒等待直到倒計時結束再開始執行。建構函式如下
public CountDownLatch(int count) // count為計數器的計數個數
public class CountDownLatchDemo implements Runnable {
// 設定計數10的倒計時器
static final CountDownLatch end = new CountDownLatch(10);
static final CountDownLatchDemo demo = new CountDownLatchDemo();
@Override
public void run() {
// TODO Auto-generated method stub
try{
Thread.sleep(new Random().nextInt(10)*1000);
System.out.println("check complete");
// 倒計時器倒數一次
end.countDown();
}catch(InterruptedException e){
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException{
ExecutorService exec = Executors.newFixedThreadPool(10);
for(int i=0;i<10;i++){
exec.submit(demo);
}
// 倒計時開始,一直等待,只有倒計時器倒數完成後才會往下執行
end.await();
System.out.println("Fire!");
exec.shutdown();
}
}
7 迴圈柵欄:CyclicBarrier,是一種多執行緒併發控制實用工具。跟CountDownLatch類似,但更復雜強大。建構函式如下
public CyclicBarrier( int parties , Runnable barrierAction ) // parties為計數總數,barrierAction為計數完成後要去執行的任務
public class CyclicBarrierDemo {
public static class Soldier implements Runnable {
private String soldier;
private final CyclicBarrier cyclic;
Soldier(CyclicBarrier cyclic,String soldierName){
this.soldier = soldierName;
this.cyclic = cyclic;
}
@Override
public void run() {
// TODO Auto-generated method stub
try{
// 呼叫一次倒計時一次,夠計數總數後,cyclic會自動呼叫BarrierRun,繼續執行下面的操作,即doWork()
cyclic.await();
doWork();
// 又重新倒計時,呼叫一次倒計時一次,夠計數總數後,cyclic會自動呼叫BarrierRun,繼續執行下面的操作,即doWork()
cyclic.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
}
void doWork(){
try{
Thread.sleep(Math.abs(new Random().nextInt()%10000));
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println(soldier+": 任務完成");
}
}
public static class BarrierRun implements Runnable{
boolean flag;
int N;
public BarrierRun(boolean flag,int N){
this.flag = flag;
this.N = N;
}
@Override
public void run() {
// TODO Auto-generated method stub
if(flag){
System.out.println("司令: [士兵"+N+"個,任務完成!]");
}else{
System.out.println("司令: [士兵"+N+"個,集合完畢!]");
flag = true;
}
}
}
public static void main(String[] args) throws InterruptedException {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(10,new BarrierRun(flag,N));
System.out.println("集合隊伍!");
for(int i=0;i<N;i++){
System.out.println("士兵"+i+" 報道!");
allSoldier[i] = new Thread(new Soldier(cyclic,"士兵i"));
allSoldier[i].start();
}
}
}
8 執行緒阻塞工具類:LockSupport,是一種非常方便實用的執行緒阻塞工具。可以線上程內任意位置讓執行緒阻塞。
public class LockSupportDemo {
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");
public static class ChangeObjectThread extends Thread{
public ChangeObjectThread(String name){
super.setName(name);
}
@Override
public void run() {
// TODO Auto-generated method stub
synchronized(u){
System.out.println("in "+getName());
LockSupport.park(); // 阻塞當前執行緒
}
}
}
public static void main(String[] args) throws InterruptedException{
t1.start();
Thread.sleep(100);
t2.start();
LockSupport.unpark(t1); // 繼續執行
LockSupport.unpark(t2); // 繼續執行
t1.join();
t2.join();
}
}
LockSupport類為每個執行緒準備了一個許可,如果許可可用,park()函式會立即返回並消費這個許可,即標記該許可為不可用;如果不可用則會阻塞。unpack()則是將許可變成可用。這樣保證unpack()和pack()的順序總是相對穩定的。
pack(Object)函式還可以為當前執行緒設定一個阻塞物件。
LockSupport.pack()支援中斷,而且還不會丟擲InterruptedException異常。如下
public static class ChangeObjectThread extends Thread{
public ChangeObjectThread(String name){
super.setName(name);
}
@Override
public void run() {
// TODO Auto-generated method stub
synchronized(u){
System.out.println("in "+getName());
LockSupport.park();
if(Thread.interrupted()){
System.out.println(getName()+" 被中斷了");
}
}
System.out.println(getName()+" 執行結束");
}
}
public static void main(String[] args) throws InterruptedException{
t1.start();
Thread.sleep(100);
t2.start();
t1.interrupt();
LockSupport.unpark(t2);
}
}
9 執行緒池為了避免系統頻繁地建立和銷燬執行緒,並且複用建立地執行緒。線上程池中,有幾個活躍地執行緒,當需要執行緒時從中取一個空閒的執行緒。完成任務後再退回到執行緒池中。JDK提供了一套Executor框架幫助執行緒控制,其本質就是一個執行緒池。
以上成員均在java.util.concurrent包中,是JDK併發包的核心類。其中ThreadPoolExecutor表示一個執行緒池。Executors類則扮演執行緒池工廠的角色。通過Executors可以取得一個擁有特定功能的執行緒池。
Executor框架提供了各種型別的執行緒池。主要有以下工廠方法
- public static ExecutorService newFixedThreadPool( int nThreads) //返回一個固定執行緒數量的執行緒池,該執行緒池中的執行緒數量固定不變。有新任務時,池中若有空閒執行緒則立即執行。否則存在任務佇列中排隊等候
- public static ExecutorService newSingleThreadExecutor() //該方法返回一個只有一個執行緒的執行緒池。若多餘一個任務被提交到該執行緒池,任務會儲存到任務佇列排隊執行
- public static ExecutorService newCachedThreadPool() // 該方法返回一個可根據實際情況調整執行緒數量的執行緒池。執行緒數量不確定。如有空閒執行緒可複用優先使用。如沒有空閒則建立新的執行緒執行任務。
- public static ScheduledExecutorService newSingleThreadScheduledThreadPool() // 該方法返回一個ScheduledExecutorService物件,執行緒池大小為1。
- public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize ) // 該方法返回一個ScheduledExecutorService物件,但該執行緒池可以指定執行緒數量
例子
public class ThreadPoolDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(System.currentTimeMillis()+": Thread ID: "+Thread.currentThread().getId());
try{
Thread.sleep(1000);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
public static void main(String[] args){
MyTask task = new MyTask();
ExecutorService es = Executors.newFixedThreadPool(5);
//ExecutorService es = Executors.newSingleThreadExecutor();
//ExecutorService es = Executors.newCachedThreadPool();
for(int i=0;i<10;i++){
es.submit(task);
}
}
}
// 當使用newFixedThreadPool(5),輸出如下,5個執行緒複用
1535423404418: Thread ID: 9
1535423404418: Thread ID: 12
1535423404418: Thread ID: 11
1535423404418: Thread ID: 10
1535423404418: Thread ID: 8
1535423405420: Thread ID: 8
1535423405420: Thread ID: 9
1535423405420: Thread ID: 10
1535423405420: Thread ID: 11
1535423405420: Thread ID: 12
// 當使用newSingleThreadExecutor(),只有一個執行緒複用,輸出如下
1535423548649: Thread ID: 8
1535423549655: Thread ID: 8
1535423550656: Thread ID: 8
1535423551660: Thread ID: 8
1535423552665: Thread ID: 8
1535423553667: Thread ID: 8
1535423554669: Thread ID: 8
1535423555673: Thread ID: 8
1535423556678: Thread ID: 8
1535423557683: Thread ID: 8
// 當使用newCachedThreadPool(),執行緒不夠會建立新的,輸出如下
1535423614559: Thread ID: 8
1535423614559: Thread ID: 12
1535423614559: Thread ID: 9
1535423614559: Thread ID: 10
1535423614560: Thread ID: 14
1535423614559: Thread ID: 11
1535423614560: Thread ID: 13
1535423614560: Thread ID: 15
1535423614560: Thread ID: 16
1535423614560: Thread ID: 17
10 計劃任務,newScheduledThreadPool(),它返回一個ScheduledExecutorService物件,可以根據時間需要對執行緒進行排程,主要方法如下
public ScheduledFuture<?> schedule(Runnable command , long delay , TimeUnit unit ) // 會在給定時間對任務進行一次排程
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command , long initialDelay , long period , TimeUnit unit ) // 會對任務進行週期性排程,排程的頻率是一定的。是以上一個任務開始執行時間為起點,之後的period時間排程下一次任務
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command , long initialDelay , long delay , TimeUnit unit ) // 會對任務進行週期性排程,在上一個任務結束後再經過delay時間進行任務排程
跟其他執行緒池不用,ScheduledExecutorService並不一定會立即執行任務,它其實是起計劃任務的作用。它會在指定的時間,對任務進行排程
- 如下程式碼,任務會每隔2秒執行,但因為scheduleAtFixedRate()是以上一個任務執行開始時間來算,如果任務執行時間長於排程時間,週期會無效。
public class ScheduledExecutorServiceDemo {
public static void main(String[] args){
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
try{
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}, 0, 2, TimeUnit.SECONDS);
}
}
- 如下程式碼,scheduleWithFixedDelay()是以上一個任務的結束開始算,所以任務間都間隔2秒
public class ScheduledExecutorServiceDemo {
public static void main(String[] args){
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleWithFixedDelay(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
try{
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}, 0, 2, TimeUnit.SECONDS);
}
}
11 無論newFixedThreadPool,newSingleThreadExecutor,newCachedThreadPool()都只是ThreadPoolExecutor類的封裝。ThreadPoolExecutor的建構函式如下
public ThreadPoolExecutor( int corePoolSize , int maximumPoolSize , long keepAliveTime , TimeUnit unit , BlockingQueue<Runnable> workQueue , ThreadFactory threadFactory , RejectedExecutionHandler handler )
- corePoolSize 指定了執行緒池中的執行緒數量
- maximumPoolSize 指定了執行緒池中的最大執行緒數量
- keepAliveTime 當執行緒池執行緒數量超過了corePoolSize時,多餘的空閒執行緒的存活時間
- unit 是keepAliveTime的單位
- workQueue 任務佇列,被提交但尚未被執行的任務,是一個BlockingQueue介面物件,僅用於存放Runnable物件。
- threadFactory 執行緒工廠,用於建立執行緒
- handler 拒絕策略,當任務太多來不及處理,如何拒絕任務
在ThreadPoolExecutor的建構函式中可使用如下幾種BlockingQueue
- 直接提交的佇列,該功能由SynchronousQueue物件提供。SynchronousQueue是一個特殊的BlockingQueue。SynchronousQueue沒有容量,每個插入操作都要等待一個相應的刪除操作。如果使用SynchronousQueue,提交的任務不會被真實的儲存,而總是將新任務提交給執行緒執行,沒有空閒的執行緒則嘗試建立新的程序。當程序資料達到最大值則執行拒絕策略。所以使用SynchronousQueue佇列,通常要設定很大的maximumPoolSize值否則很容易執行拒絕策略。
- 有界的任務佇列,可以用ArrayBlockingQueue,它必須帶有一個表示佇列最大容量的引數。有新任務時,執行緒池的實際執行緒數小於corePoolSize,則優先建立新的執行緒;若大於則將新任務加入等待佇列。佇列若已滿,則在匯流排程數不大於maximumPoolSize的前提下建立新的執行緒執行任務。若大於則執行拒絕策略。
- 無界的任務佇列,可以用LinkedBlockingQueue類實現。除非系統資源耗盡,否則無界的任務佇列不會存在任務入列失敗。有新任務時,執行緒池的實際執行緒數小於corePoolSize,則優先建立新的執行緒;若大於則將新任務加入等待佇列。
- 優先任務佇列,可以用PriorityBlockingQueue類實現。它是一個無界佇列,可以根據任務自身的優先順序順序先後執行。
JDK內建提供了四種拒絕策略,這些策略都實現了RejectedExecutionHandler介面
- AbortPolicy策略:該策略直接丟擲異常,阻止系統正常工作。
- CallerRunsPolicy策略:只要執行緒池未關閉,該策略直接在呼叫者執行緒中,運行當前被丟棄的任務。
- DiscardOledestPolicy策略:丟棄即將被執行的任務,嘗試再次提交當前任務
- DiscardPolicy策略:丟棄無法處理的任務
12 ThreadPoolExecutor也是一個可以擴充套件的執行緒池,提供了beforeExecute(),afterExecute()和terminated()三個介面對執行緒池進行控制。
13 優化執行緒池執行緒數量
14 線上程池中尋找堆疊
public class DivTask implements Runnable {
int a , b;
public DivTask(int a, int b){
this.a = a;
this.b = b;
}
@Override
public void run() {
// TODO Auto-generated method stub
double re = a/b;
System.out.println(re);
}
public static void main(String[] args) throws InterruptedException,ExecutionException{
ThreadPoolExecutor pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,
0L,TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
for(int i=0;i<5;i++){
pools.submit(new DivTask(100,i)); // 第一種,出現異常不會列印異常錯誤
//pools.execute(new DivTask(100,i)); // 第二種,出現異常會列印部分堆疊資訊
/* 第三種,出現異常會列印部分堆疊資訊
Future re = pools.submit(new DivTask(100,i));
re.get();
*/
}
}
}
15 分而治之:Fork / Join框架。在JDK中,有一個ForkJoinPool執行緒池,對於fork()方法並不急於開啟執行緒,而是提交給ForkJoinPool執行緒池進行處理。
ForkJoinPool有一個重要的介面
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
可以向ForkJoinPool執行緒池提交一個ForkJoinTask任務,該任務就是支援fork()分解以及join()等待的任務。ForkJoinTask有兩個重要的子類RecursiveAction和RecursiveTask,分別表示沒有返回值的任務和可以攜帶返回值的任務。
16 JDK的併發容器,大部分在java.util.concurrent包中。
- ConcurrentHashMap:高效的併發HashMap,可以理解為執行緒安全的HashMap
- CopyOnWriteArrayList:在讀多寫少的場合,其效能要遠比Vector好
- ConcurrentLinkedQueue:高效的併發佇列,使用連結串列實現。可以看作一個執行緒安全的LinkedList
- BlockingQueue:這是一個介面,JDK內部通過連結串列,陣列等方式實現這個介面。表示阻塞佇列,非常適合用於資料共享的通道
- ConcurrentSkipListMap:跳錶的實現。這是一個Map,使用跳錶資料結構進行快速查詢。
java.util下的Vector是執行緒安全,另外Collections工具類可以幫助我們將任意集合包裝成執行緒安全的集合。
17 如果需要一個執行緒安全的HashMap,一種可行的方法是使用Collections.synchronizedMap()方法包裝我們的HashMap。如下
public static Map m = Collections.synchronizedMap(new HashMap()) ;
另一種更專業的併發HashMap是ConcurrentHashMap,它更適合多執行緒的場合。
18 ArrayList和Vector都是使用陣列作為內部實現,而Vector是執行緒安全,ArrayList不是。
第4章 鎖的優化及注意事項
1 有助於提高“鎖”效能的幾點建議
- 減少鎖持有時間,有助於降低鎖衝突的可能性
- 減小鎖粒度,即縮小鎖定物件的範圍
- 讀寫分離鎖來替換獨佔鎖
- 鎖分離,如讀寫分離
- 鎖粗化,即將所有的鎖操作整合為鎖的一次請求,減少對鎖的請求同步次數。
2 JDK內部的幾種”鎖“優化策略
- 鎖偏向,一個執行緒獲得了鎖,那麼鎖就進入偏向模式。當這個執行緒再次請求鎖時無須再做任何同步操作。
- 輕量級鎖,只是簡單地將物件頭部作為指標,指向持有鎖的執行緒堆疊的內部,來判斷一個執行緒是否持有物件鎖
- 自旋鎖,讓當前執行緒做幾個空迴圈(即自旋),如果可以得到鎖則進入臨界區,否則掛起
- 鎖清除,去掉不可能存在共享資源競爭的鎖
3 ThreadLocal,可以包裝非執行緒安全的物件
public class Test1 {
static ThreadLocal<SimpleDateFormat> tl = new ThreadLocal<SimpleDateFormat>();
public static class ParseDate implements Runnable{
int i = 0;
public ParseDate(int i){
this.i = i;
}
@Override
public void run() {
// TODO Auto-generated method stub
try{
if(tl.get() == null){
tl.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
Date t = tl.get().parse("2018-08-30 02:38:"+i%60);
System.out.println(i+": "+t);
}catch(ParseException e){
e.printStackTrace();
}
}
}
public static void main(String[] args){
ExecutorService es = Executors.newFixedThreadPool(10);
for(int i=0;i<1000;i++){
es.execute(new ParseDate(i));
}
}
}
4 ThreadLocal模式下會比多執行緒共享一個物件要快很多很多,如下例子快了30多倍
public class Test2 {
public static final int GEN_COUNT = 10000000;
public static final int THREAD_COUNT = 4;
static ExecutorService es = Executors.newFixedThreadPool(THREAD_COUNT);
public static Random rnd = new Random(123);
public static ThreadLocal<Random> tRnd = new ThreadLocal<Random>(){
protected Random initialValue(){
return new Random(123);
}
};
public static class RndTask implements Callable<Long>{
private int mode = 0;
public RndTask(int mode){
this.mode = mode;
}
public Random getRandom(){
if(mode == 0)
return rnd;
else if(mode == 1)
return tRnd.get();
else
return null;
}
@Override
public Long call() throws Exception {
// TODO Auto-generated method stub
long b = System.currentTimeMillis();
for(long i=0;i<GEN_COUNT;i++)
getRandom().nextInt();
long e = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName()+" spend "+(e-b)+" ms");
return e-b;
}
}
public static void main(String[] args) throws InterruptedException,ExecutionException{
Future<Long>[] futs = new Future[THREAD_COUNT];
for(int i =0 ;i<THREAD_COUNT;i++){
futs[i] = es.submit(new RndTask(0));
}
long totalTime = 0;
for(int i=0;i<THREAD_COUNT;i++){
totalTime += futs[i].get();
}
System.out.println("多執行緒訪問同一個Random例項:"+totalTime+" ms");
for(int i=0;i<THREAD_COUNT;i++){
futs[i] = es.submit(new RndTask(1));
}
totalTime = 0;
for(int i=0;i<THREAD_COUNT;i++){
totalTime += futs[i].get();
}
System.out.println("使用ThreadLocal包裝Random例項:"+totalTime+" ms");
es.shutdown();
}
}
輸出結果
pool-1-thread-1 spend 4911 ms
pool-1-thread-4 spend 4959 ms
pool-1-thread-3 spend 4961 ms
pool-1-thread-2 spend 4962 ms
多執行緒訪問同一個Random例項:19793 ms
pool-1-thread-2 spend 130 ms
pool-1-thread-4 spend 131 ms
pool-1-thread-3 spend 133 ms
pool-1-thread-1 spend 134 ms
使用ThreadLocal包裝Random例項:528 ms
5 一種無鎖的策略:比較交換的技術(CAS Compare And Swap) 來鑑別執行緒衝突,一旦檢測到衝突產生就重試當前操作直到沒有衝突為止。
CAS演算法的過程:共三個引數CAS(V , E , N)。V表示要更新的變數,E表示預期值,N表示新值。僅當V值等於E值時,才會將V值設為N。如果V值和E值不同則說明已經有其他執行緒做了更新,而當前執行緒什麼都不做。最後CAS返回當前V的真實值。
6 JDK併發包有一個atomic包,有個最常用的類AtomicInteger,它是可變且執行緒安全。還有AtomicLong,AtomicBoolean,AtomicReference。
public class AtomicIntegerDemo {
static AtomicInteger i = new AtomicInteger();
public static class AddThread implements Runnable{
@Override
public void run() {
// TODO Auto-generated method stub
for(int k=0;k<10000;k++)
i.incrementAndGet();
}
}
public static void main(String[] args) throws InterruptedException{
Thread[] ts = new Thread[10];
for(int k=0;k<10;k++){
ts[k] = new Thread(new AddThread());
}
for(int k=0;k<10;k++)
ts[k].start();
for(int k=0;k<10;k++)
ts[k].join();
System.out.println(i);
}
}
AtomicInteger.incrementAndGet()採用CAS操作給自己加1,同時返回當前值。
7 AtomicReference是指物件引用,它可以保證在修改物件引用時的執行緒安全性
8 原子陣列:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
9 可以讓普通變數也享受原子操作:AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater
public class AtomicIntegerFieldUpdaterDemo {
public static class Candidate{
int id;
volatile int score;
}
public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
public static AtomicInteger allScore = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException{
final Candidate stu = new Candidate();
Thread[] t = new Thread[10000];
for(int i=0;i<10000;i++){
t[i] = new Thread(){
public void run(){
if(Math.random() > 0.4){
scoreUpdater.incrementAndGet(stu);
allScore.incrementAndGet();
}
}
};
t[i].start();
}
for(int i=0;i<10000;i++)
t[i].join();
System.out.println("score="+stu.score);
System.out.println("allScore = "+allScore);
}
}
注意事項:
- Updater只能修改可見氛圍的變數,如score為private則無法修改
- 為了確保變數被正確的讀取,必須是volatile型別
- 由於CAS操作會通過物件例項的偏移量直接進行賦值,所以它不支援static欄位
第5章 並行模式與演算法
1 單例模式是一種物件建立模式,用於產生一個物件的具體例項,確保系統中一個類只產生一個例項。
public class Singleton{
private Singleton(){
System.out.println("Singleton is create");
}
private static Singleton instance = new Singleton();
public static Singleton getInstance(){
return instance;
}
}
注意:
- 建構函式被定義為private,無法隨便建立這個例項。
- instance物件必須是private並且static
- getInstance()定義為靜態函式
上面有一個明顯不足,Singleton建構函式或者是Singleton例項在什麼時候建立是不受控制。如下是改善後的程式碼
public class Singleton {
public static int k = 0;
private Singleton(){
System.out.println("Singleton is created");
}
private static class SingletonHolder{
private static Singleton instance = new Singleton();
}
public static Singleton getInstance(){
return SingletonHolder.instance;
}
public static void main(String[] args){
//Singleton t = Singleton.getInstance();
System.out.println(Singleton.k);
}
}
2 不變模式天生就是多執行緒友好的,它的核心思想是一個物件一旦被建立,則它的內部狀態將永遠不會發生改變。不變模式的主要使用場景需要滿足以下2個條件:
- 當物件建立後,其內部狀態和資料不再發生任何變化
- 物件需要被分享,被多執行緒頻繁訪問
不變模式的實現很簡單,只需要注意以下4點:
- 去除setter方法以及所有修改自身屬性的方法
- 將所有屬性設定為私有,並用final標記確保其不可修改
- 確保沒有子類可以過載修改它的行為
- 有一個可以建立完整物件的建構函式
public final class Product { // 確保無子類
private final String no;
private final String name;
private final double price;
public Product(String no,String name,double price){
this.no = no;
this.name = name;
this.price = price;
}
public String getNo(){
return no;
}
public String getName(){
return name;
}
public double getPrice(){
return price;
}
}
不變模式應用很廣泛,最典型的就是java.lang.String類。此外所有的元資料類包裝類都是不變模式實現的。如java.lang.String,java.lang.Boolean,java.lang.Byte,java.lang.Character,java.lang.Double,java.lang.Float,java.lang.Integer,java.lang.Long,java.lang.Short
3 生產者-消費者模式是一個經典的多執行緒設計模式。生產者執行緒負責提交使用者請求,消費者執行緒則負責具體處理生產者提交的任務。生產者和消費者之間則通過共享記憶體緩衝區進行通訊。生產者-消費者模式的核心元件是共享記憶體緩衝區。
PCData物件表示一個生產任務,或者相關任務資料。生產者物件和消費者物件均引用同一個BlockingQueue例項。生產者負責建立PCData物件,並將它加入BlockingQueue中,消費者則從BlockingQueue佇列中獲取PCData。
public final class PCData {
private final int intData;
public PCData(int d){
intData = d;
}
public PCData(String d){
intData = Integer.valueOf(d);
}
public int getData(){
return intData;
}
public String toString(){
return "data:"+intData;
}
}
//生產者
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<PCData> queue){
this.queue = queue;
}
@Override
public void run() {
// TODO Auto-generated method stub
PCData data = null;
Random r = new Random();
System.out.println("start producer id="+Thread.currentThread().getId());
try{
while(isRunning){
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());
System.out.println(data+" is put into queue");
if(!queue.offer(data,2,TimeUnit.SECONDS)){
System.out.println("failed to put data: " +data);
}
}
}catch(InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop(){
isRunning = false;
}
}
//消費者
public class Consumer implements Runnable {
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue){
this.queue = queue;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("start Consumer id="+Thread.currentThread().getId());
Random r = new Random();
try{
while(true){
PCData data = queue.take();
if(null != data){
int re = data.getData() * data.getData();
System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
}catch(InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
Producer p1 = new Producer(queue);
Producer p2 = new Producer(queue);
Producer p3 = new Producer(queue);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
ExecutorService es = Executors.newCachedThreadPool();
es.execute(p1);
es.execute(p2);
es.execute(p3);
es.execute(c1);
es.execute(c2);
es.execute(c3);
Thread.sleep(10*1000);
p1.stop();
p2.stop();
p3.stop();
Thread.sleep(3000);
es.shutdown();
}
}
4 BlockingQueue是使用鎖和阻塞等待來實現執行緒間的同步。對於高併發場合,它的效能並不是最優越。而ConcurrentLinkedQueue是一個高效能的佇列,它使用了無鎖的CAS操作。
5 Disruptor是個無鎖的快取框架,是由LMAX公司開發的一款高效的無鎖記憶體佇列。它使用無鎖的方式實現了一個環形佇列(RingBuffer),非常適合實現生產者和消費者模式。
public class PCData2 {
private long value;
public void set(long value){
this.value = value;
}
public long get(){
return value;
}
}
// 工廠
public class PCDataFactory implements EventFactory<PCData2> {
public PCData2 newInstance(){
return new PCData2();
}
}
//生產者
public class Producer2 {
private final RingBuffer<PCData2> ringBuffer;
public Producer2(RingBuffer<PCData2> ringBuffer){
this.ringBuffer = ringBuffer;
}
public void pushData(ByteBuffer bb){
long sequence = ringBuffer.next();
try{
PCData2 event = ringBuffer.get(sequence);
event.set(bb.getLong(0));
}finally{
ringBuffer.publish(sequence);
}
}
}
//消費者
public class Consumer2 implements WorkHandler<PCData2>{
@Override
public void onEvent(PCData2 arg0) throws Exception {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getId()+": Event: -- "
+arg0.get()*arg0.get()+" --");
}
}
//執行
public static void main(String[] args) throws Exception{
Executor executor = Executors.newCachedThreadPool();
PCDataFactory factory = new PCDataFactory();
int bufferSize = 1024;
Disruptor<PCData2> disruptor = new Disruptor<PCData2>(factory,bufferSize,executor,
ProducerType.MULTI,new BlockingWaitStrategy());
disruptor.handleEventsWithWorkerPool(
new Consumer2(),
new Consumer2(),
new Consumer2(),
new Consumer2());
disruptor.start();
RingBuffer<PCData2> ringBuffer = disruptor.getRingBuffer();
Producer2 p = new Producer2(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for(long l = 0;true;l++){
bb.putLong(0,l);
p.pushData(bb);
Thread.sleep(100);
System.out.println("add data "+l);
}
}
6 Futur模式,也是一種多執行緒開發常見的模式,它的核心思想是非同步呼叫。當一個函式執行很慢,可以先讓呼叫者立即返回,讓函式在後臺慢慢處理請求。
7 併發流水線
public class Msg {
public double i;
public double j;
public String orgStr = null;
}
public class Plus implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();
@Override
public void run() {
// TODO Auto-generated method stub
while(true){
try{
Msg msg = bq.take();
msg.j = msg.i+msg.j;
Multiply.bq.add(msg);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
public class Multiply implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();
@Override
public void run() {
// TODO Auto-generated method stub
while(true){
try{
Msg msg = bq.take();
msg.i = msg.i*msg.j;
Div.bq.add(msg);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
public class Div implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();
@Override
public void run() {
// TODO Auto-generated method stub
while(true){
try{
Msg msg = bq.take();
msg.i = msg.i/2;
System.out.println(msg.orgStr+"="+msg.i);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
public class PStreamMain {
public static void main(String[] args){
new Thread(new Plus()).start();
new Thread(new Multiply()).start();
new Thread(new Div()).start();
for(int i=1;i<=10;i++){
for(int j=1;j<=10;j++){
Msg msg = new Msg();
msg.i = i;
msg.j = j;
msg.orgStr = "(("+i+"+"+j+")*"+i+")/2";
Plus.bq.add(msg);
}
}
}
}
8 Java NIO是一套新的IO機制。