Java基礎之多執行緒及併發庫
實際上關於多執行緒的基礎知識,前面自己已經總結過一部分,但是每一個階段對於同樣知識點的學習側重點是不一樣的,前面的Java基礎總結八之多執行緒(一)和 Java基礎總結九之多執行緒(二)是對JDK5以前多執行緒相關基礎知識的一個簡單總結,今天本文將偏重於JDK5提供的併發庫進行學習總結。
首先,從一個簡單的多執行緒demo引入(包括內容為JDK5之前的synchronized關鍵字及通過wait方法和notify方法控制的等待喚醒機制):
/** * @author: Is-Me-Hl * @Description: 傳統的多執行緒互斥和等待喚醒機制 * 給定兩個執行緒依次迴圈五十次,一個執行緒每次取資料十次,期間不可以被打斷,另一個每次取資料二十次,期間不可打斷,二者交替執行 */ public class Test { private static ThreadTest threadTest = new ThreadTest(); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 50; i++) { threadTest.thread1(i); } } }).start(); new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 50; i++) { threadTest.thread2(i); } } }).start(); } } class ThreadTest { private boolean flag; public synchronized void thread1(int i) { while (flag) { try { this.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } for (int j = 1; j <= 10; j++) { try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "第" + i + "次取到的資料為:" + j); } flag = true; this.notify(); } public synchronized void thread2(int i) { while (!flag) { try { this.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } for (int j = 1; j <= 100; j++) { try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "第" + i + "次取到的資料為:" + j); } flag = false; this.notify(); } }
通過使用synchronized關鍵字保證方法的原子性,通過Object的wait和notify方法的等待與喚醒機制保證其交替執行。以上就是傳統執行緒同步的低配版demo,是入門多執行緒的一個總結。那麼JDK5給開發者提供了些什麼呢?整整一個Concurrent併發庫,有Lock鎖物件,衍生而出的讀鎖,寫鎖,有Executors執行緒池物件等等...下面按照筆者想到什麼就總結什麼的思路來進行知識的總結(皮一下很開心~):
(一)原子操作類:
很經典的一個問題,i++是原子操作嗎?答案:顯然不是。原子操作:通俗的講,操作是不可分割的,要麼執行,要麼不執行,不會說執行到一半而被打斷。那i++為什麼不是原子操作呢?看著不就一條語句嗎?實際上是不是原子操作和是不是一條語句沒有必然的聯絡,i++實際上是多步操作,先把i的值拿過來,然後再加1,然後又賦值給i。所以是非原子操作。那麼如果想要這個語句變成原子操作是不是我這個語句還要加synchronized鎖起來呢?可以這麼做,但是JDK5開始有了更為簡便的方法。
從JDk5開始,Java提供了java.util.concurrent.atomic包供資料進行原子性操作。
(1)關於基本型別的原子更新類:AtomicBoolean、AtomicInteger、AtomicLong,下面以AtomicInteger舉例:
import java.util.concurrent.atomic.AtomicInteger; /** * * @author: Is-Me-Hl * @Description: 關於原子類操作 AtomicInteger類舉例。 */ public class Test2 { public static void main(String[] args) { AtomicInteger ai=new AtomicInteger(100); System.out.println("當前資料為: "+ai.get()); System.out.println("相加得到的資料為: "+ai.addAndGet(100)); System.out.println("當前值減去1返回之前的值為: "+ai.getAndDecrement()); System.out.println("當前值加上1返回之前的值為: "+ai.getAndIncrement()); } } //執行結果: 當前資料為: 100 相加得到的資料為: 200 當前值減去1返回之前的值為: 200 當前值加上1返回之前的值為: 199
上面是一個簡單的舉例,實際上只要有了API文件,裡面的方法自然也就能順利使用了。看到這,我想應該馬上就會有人說你這個demo只是應用,我還是沒有看到其是怎麼保證原子性更新的,請不要急,筆者在讀其他博主文章的時候發現了一篇相對詳細的文章介紹了這個包裡面的所有類,包括基本型別的原子更新和引用型別的原子更新。Java原子操作類彙總
(二)Executors執行緒池:
(1) newFixedThreadPool:建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
(2) newCachedThreadPool:建立一個可快取執行緒池,應用中存在的執行緒數可以無限大。
(3)newSingleThreadExecutor:建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
(4) newScheduledThreadPool:建立一個定長執行緒池,支援定時及週期性任務執行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/*
* Executors使用
*/
public class ThreadPoolTest {
public static void main(String[] args) {
// 第一種執行緒池
// ExecutorService threadPool =
// Executors.newFixedThreadPool(3);//固定大小的執行緒池
// 第二種執行緒池
// ExecutorService threadPool =
// Executors.newCachedThreadPool();//如果沒有可用的執行緒池=會自己建立新的執行緒出來供使用,過一定時間後自己自動銷燬
// 第三種執行緒池
ExecutorService threadPool = Executors.newSingleThreadExecutor();// 只有一個執行緒,類似單執行緒,但是好處是,該池中一直維持著一個執行緒,一個執行緒死了,有馬上產生一個
for (int i = 1; i <= 10; i++) {
final int task = i;
threadPool.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+ " is looping " + i + " for task " + task);
}
}
});
}
// threadPool.shutdownNow();//與shutdown()的區別就是Now指的是馬上關閉執行緒池,不管你執行到哪裡,不帶now的是指等所有任務都執行完之後再關閉它,而不是一直等待任務進來
//可定期執行的
Executors.newScheduledThreadPool(3).schedule(new Runnable() {
@Override
public void run() {
System.out.println("boming");
}
}, 10, TimeUnit.SECONDS);
}
}
(三)Callable 和 Future
/*
* callable和future的應用
*/
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableAndFuture {
public static void main(String[] args) {
// 使用callable提交一個任務
ExecutorService threadPool = Executors.newSingleThreadExecutor();
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "hello";
}
});
System.out.println("等待結果:");
try {
System.out.println("拿到結果:" + future.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 使用callable一次性提交一組任務
ExecutorService threadPool2 = Executors.newFixedThreadPool(10);// 建立一個固定的大小的執行緒池
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
threadPool2);
for (int i = 1; i <= 10; i++) {
final int taskNum = i;
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(5000));
return taskNum;
}
});
}
for (int i = 1; i <= 10; i++) {
try {
System.out.println(completionService.take().get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Future是提交了Callable後返回的結果物件。通過get()方法能夠接收到返回的結果,當然在一組任務提交後,Future阻塞得到的結果可能是亂序的,誰先計算好返回給我結果,我就先拿到誰的結果。
(四)lock鎖物件
Lock提供了比使用synchronized方法和語句更廣泛的鎖定操作。它允許更靈活的結構化,可能具有完全不同的屬性,並且可以支援多個相關聯的Condition物件。 鎖是用於控制多個執行緒對共享資源訪問的工具。通常,對共享資源的所有訪問都要求首先獲取鎖,鎖提供了對共享資源的獨佔訪問,一次只能有一個執行緒可以獲取鎖。 但是,一些鎖可能允許併發訪問共享資源,如ReadWriteLock的讀鎖。 首先,下面先給出一個lock鎖的demo,之後再總結讀寫鎖的知識:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
*
* @author: Is-Me-Hl
* @date: 2018年11月8日
* @Description: 主執行緒和子執行緒使用lock鎖保證執行時候的互斥。
*/
public class Test2 {
private static MyThread myThread = new MyThread();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 100; i++) {
myThread.sub(i);
}
}
}).start();
for (int i = 1; i <= 100; i++) {
myThread.main(i);
}
}
}
class MyThread {
private Lock lock = new ReentrantLock();
public void sub(int i) {
lock.lock();
try {
for (int j = 1; j <= 10; j++) {
System.out.println(Thread.currentThread().getName() + "---" + i
+ "---" + j);
}
} finally {
lock.unlock();
}
}
public void main(int i) {
lock.lock();
try {
for (int j = 1; j <= 10; j++) {
System.out.println(Thread.currentThread().getName() + "---" + i
+ "---" + j);
}
} finally {
lock.unlock();
}
}
}
從上面的例子中可以看出來,lock鎖同樣能夠有synchronized的互斥效果,但是如果lock鎖僅僅有這樣的效果JDK為什麼還要再推出來呢?顯然lock鎖還有其他的作用,如能讓多個執行緒同時訪問共享變數的讀鎖。在高併發時,有效的提高了資料共享的效率。下面是讀寫鎖的demo,也算是通過讀寫鎖實現快取機制的一個demo:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/*
* 快取機制Demo,讀鎖與寫鎖的使用,一個不錯的讀寫鎖demo
* 讀鎖和寫鎖機制是什麼呢?讀和讀互不影響,讀和寫互斥,寫和寫互斥,提高讀寫的效率
*/
public class CacheDemo {
private static Map<String, Object> cache = new HashMap<>();
public static void main(String[] args) {
cache.put("key", null);
long start = System.currentTimeMillis();
for (int i = 0; i < 50; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i < 100000; i++) {
System.out.println("執行緒名:"
+ Thread.currentThread().getName()
+ " 執行緒所取得的資料:" + getData("key"));
}
}
}).start();
}
try {
Thread.sleep(240000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("時間花費:" + (end - start));
}
private static ReadWriteLock rwl = new ReentrantReadWriteLock();
public static Object getData(String key) {
// 這裡可以加入snychronized關鍵字進行執行緒互斥。但是這裡有更好的處理方法,
// 就是如果所有的執行緒過來都是讀的話,那麼,加個讀鎖讓所有執行緒進來就可以,沒有必要排斥其他一塊來讀資料的執行緒
// 好方法就是加讀寫鎖就可以了
rwl.readLock().lock();// 一進來就加上讀鎖,這樣所有讀的執行緒就都可以進來
Object value = null;
try {
value = cache.get(key);
if (value == null) {
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
if (value == null) {
value = "Is-Me-Hl";// 這裡模擬的是去從資料庫中取資料,存放到value中
}
} finally {
rwl.writeLock().unlock();
}
rwl.readLock().lock();
}
} finally {
rwl.readLock().unlock();
}
return value;
}
}
由上面的程式碼可以看出來,通過讀寫鎖能很好地實現快取,同時也能提高各路執行緒對共享資料的讀取速率。多個讀鎖不互斥,讀鎖和寫鎖互斥,寫鎖和寫鎖互斥,這個過程是由JVM自己控制的,開發者只需要上號相應的鎖就可以。
(五)Condition條件(條件佇列或條件變數)
Lock替換synchronized方法和語句的使用,Condition取代了物件監視器方法的使用。
Condition的功能類似在傳統執行緒技術中的Object.wait和Object.notify的功能。但是讀者在看到這句話的時候又可能會問,既然是一樣的,出現的意義在哪裡?在傳統的執行緒機制中一個監視器物件上只能有一路等待和通知,要想實現多路通知,必須巢狀多個同步監視器物件。因此Condition物件就派上了用場。(ps:一個Condition例項本質上要繫結到一個鎖。如果要獲得特定Condition例項,請使用lock物件的newCondition()方法。)
一個簡單的Condition應用的demo:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo {
public static void main(String[] args) {
final Business business = new Business();
new Thread(
new Runnable() {
@Override
public void run() {
for(int i=1;i<=50;i++){
business.sub(i);
}
}
}
).start();
for(int i=1;i<=50;i++){
business.main(i);
}
}
static class Business {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
private boolean bShouldSub = true;
public void sub(int i){
lock.lock();
try{
while(!bShouldSub){
try {
condition.await();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for(int j=1;j<=10;j++){
System.out.println("sub thread sequence of " + j + ",loop of " + i);
}
bShouldSub = false;
condition.signal();
}finally{
lock.unlock();
}
}
public void main(int i){
lock.lock();
try{
while(bShouldSub){
try {
condition.await();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for(int j=1;j<=100;j++){
System.out.println("main thread sequence of " + j + ",loop of " + i);
}
bShouldSub = true;
condition.signal();
}finally{
lock.unlock();
}
}
}
}
官方API文件中給出的一個通過lock鎖和Condition條件實現的阻塞佇列demo:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock(); try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally { lock.unlock(); }
}
public Object take() throws InterruptedException {
lock.lock(); try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally { lock.unlock(); }
}
}
需要注意的是官方demo中使用了兩個Condition,有什麼用呢?是多路通知的體現。自己上面所寫的demo實際上是不完善的,假如要喚醒指定的執行緒,官方demo是能做到的,而自己的demo會隨機喚醒一個,這個是不太完善的地方,也是官方文件的官方性的表現。另外一個注意點是Condition需要始終讓他放在迴圈中,防止虛假喚醒。總結:好的Coffee需要細品!(ps:這裡大概讀者就會有個思路,那我終於能理解阻塞佇列的實現了,以後我就這麼寫,那筆者告訴你,大可不必。這只是個demo,官方既然給了你實現,為什麼不給你封裝好個類供你使用呢?BlockingQueue阻塞佇列,這是個大知識點,筆者準備留著以後有時間學的相對完善,有好的demo再總結一下。如果您有需要了解請移步百度)
(六)Semaphere(與synchronized類似)
一個計數訊號量。在概念上,訊號量維持一組許可證。如果有必要,每個acquire()都會阻塞,直到許可證可用,然後才能使用它。每個release()新增許可證,潛在地釋放阻塞獲取方。但是,沒有使用實際的許可證物件; Semaphore只保留可用數量的計數,並相應地執行。
synchronized保證了,我管理的那部分程式碼同一時刻只有一個執行緒能訪問。Semaphore保證了,我管理的那部分程式碼同一時刻最多可以有n個執行緒訪問。Semaphore可以維護當前訪問自身的執行緒個數,並提供了同步機制。例子:停車場總共五個車位,但是我是門口看車場的大叔,我說,外面的十輛車聽著,裡面五個車位車位已經滿了,只有當裡面車位空了出來,你們再進來替換,至於誰進來,我給你們定個規則,公平或者非公平,然後你們依據規則進來,謝謝大家配合。(ps:案例demo略)
(七)CyclicBarrier
CyclicBarrier是一個同步工具類,它允許一組執行緒互相等待,直到到達某個公共屏障點。barrier在釋放等待執行緒後可以重用,所以稱它為迴圈(Cyclic)的屏障(Barrier)。例子:幼稚園小芳老師告訴小朋友們明天去郊遊,早上八點學校出發,那麼小朋友在八點前從自己的家,不同的方向,不同的地點,不同的時間點到達這裡,早到的同學要等晚到的同學,等到了八點,老師帶著同學們到了郊外,說大家自由活動,中午十二點回學校,那麼小朋友們有各自忙各自的,到十二點,大家都相互等著結伴回家。(ps:案例demo略)
(八)CountDownLatch
CountDownLatch是一個同步工具類,它允許一個或多個執行緒等待直到在其他執行緒中執行的一組操作完成的同步輔助。一個CountDownLatch用給定的計數初始化。await方法阻塞,直到由於countDown()方法的呼叫而導致當前計數達到零,之後所有等待執行緒被釋放,並且任何後續的await 呼叫立即返回。這是一個一次性的現象:計數無法重置。如果您需要重置計數的版本,請考慮使用CyclicBarrier 。舉例:一個賽跑比賽:裁判員吹響哨子,運動員們就開始跑,這是倒計時。在終點處,裁判一個人也可以等到所有的運動員們衝過終點,再報出比賽中的冠亞季軍。實際上,這個例子:也就說明多個執行緒可以等一個執行緒一聲號令就開始行動,那麼一個執行緒也可以等所以執行緒都已經下達了命令,該執行緒開始行動。(ps:案例demo略)
(九)Exchanger
Exchanger是一個同步工具類,執行緒可以在成對內配對和交換元素的同步點。每個執行緒在輸入exchange方法時提供一些物件,與合作者執行緒匹配,並在返回時接收其合作伙伴的物件。交換器可以被視為一個的雙向形式SynchronousQueue。交換器在諸如遺傳演算法和管道設計的應用中可能是有用的。 舉例:一個黑幫大哥和軍販子要交易,一個帶上錢一個帶上武器,約定好某個時間點在某個地方,啪,瞬間交換。(ps:案例demo略)
(十)同步集合類
在之前的Java基礎總結五之集合中總結樂意系列集合,典型的有hashMap,hashSet等等,但是之前也強調了這些是執行緒不安全的,也介紹了要獲得執行緒安全的集合我們可以使用Collections工具類,將執行緒不安全的集合傳入,得到的就是安全的集合:實現方式是通過代理的方式:(原始碼如下)
public static <T> Collection<T> synchronizedCollection(Collection<T> c) {
return new SynchronizedCollection<>(c);
}
static <T> Collection<T> synchronizedCollection(Collection<T> c, Object mutex) {
return new SynchronizedCollection<>(c, mutex);
}
/**
* @serial include
*/
static class SynchronizedCollection<E> implements Collection<E>, Serializable {
private static final long serialVersionUID = 3053995032091335093L;
final Collection<E> c; // Backing Collection
final Object mutex; // Object on which to synchronize
SynchronizedCollection(Collection<E> c) {
this.c = Objects.requireNonNull(c);
mutex = this;
}
SynchronizedCollection(Collection<E> c, Object mutex) {
this.c = Objects.requireNonNull(c);
this.mutex = Objects.requireNonNull(mutex);
}
public int size() {
synchronized (mutex) {return c.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return c.isEmpty();}
}
public boolean contains(Object o) {
synchronized (mutex) {return c.contains(o);}
}
public Object[] toArray() {
synchronized (mutex) {return c.toArray();}
}
public <T> T[] toArray(T[] a) {
synchronized (mutex) {return c.toArray(a);}
}
public Iterator<E> iterator() {
return c.iterator(); // Must be manually synched by user!
}
public boolean add(E e) {
synchronized (mutex) {return c.add(e);}
}
public boolean remove(Object o) {
synchronized (mutex) {return c.remove(o);}
}
public boolean containsAll(Collection<?> coll) {
synchronized (mutex) {return c.containsAll(coll);}
}
public boolean addAll(Collection<? extends E> coll) {
synchronized (mutex) {return c.addAll(coll);}
}
public boolean removeAll(Collection<?> coll) {
synchronized (mutex) {return c.removeAll(coll);}
}
public boolean retainAll(Collection<?> coll) {
synchronized (mutex) {return c.retainAll(coll);}
}
public void clear() {
synchronized (mutex) {c.clear();}
}
public String toString() {
synchronized (mutex) {return c.toString();}
}
// Override default methods in Collection
@Override
public void forEach(Consumer<? super E> consumer) {
synchronized (mutex) {c.forEach(consumer);}
}
@Override
public boolean removeIf(Predicate<? super E> filter) {
synchronized (mutex) {return c.removeIf(filter);}
}
@Override
public Spliterator<E> spliterator() {
return c.spliterator(); // Must be manually synched by user!
}
@Override
public Stream<E> stream() {
return c.stream(); // Must be manually synched by user!
}
@Override
public Stream<E> parallelStream() {
return c.parallelStream(); // Must be manually synched by user!
}
private void writeObject(ObjectOutputStream s) throws IOException {
synchronized (mutex) {s.defaultWriteObject();}
}
}
那麼JDK5之後,併發庫提供了滿足併發安全需求的集合:
這些集合都是安全的,也在JDK5之後併發中常用。至於使用的demo,我想讀者如果能學到併發這裡,自己看看API文件,百度一下的技能還是具備的,作為自己總結的筆記,自己就懶懶地忽略掉了~
注:以上文章僅是個人學習過程總結,若有不當之處,望不吝賜教。