多執行緒設計模式:第三篇 - 生產者-消費者模式和讀寫鎖模式
一,生產者-消費者模式
生產者-消費者模式是比較常見的一種模式,當生產者和消費者都只有一個的時候,這種模式也被稱為 Pipe模式,即管道模式。
生產者-消費者模式中通過 Channel 即通道來互相傳遞資料,那麼資料在通道中以什麼樣的順序傳遞,這裡在設計時需要考慮,一般的實現包括如下三種方式:
- 佇列——順序傳遞
- 棧——倒序傳遞
- 優先佇列——根據權重/優先權來傳遞
Channel 通道可以通過 juc 包中的 BlockingQueue 來實現,這樣省去了自己實現 Queue 時的 wait/notify 操作。一個簡單的生產者-消費者模型舉例:在該例子中,生產者是負責生成蛋糕並放到桌子上,消費者就負責從桌子上拿蛋糕吃,這裡桌子作為資料傳輸的通道,其程式碼實現如下:
/** * @author koma <[email protected]> * @date 2018-10-18 */ public class Table { private final Queue<String> queue; private final int count; public Table(int count) { this.count = count; queue = new LinkedList<>(); } public synchronized void put(String cake) throws InterruptedException { System.out.println(Thread.currentThread().getName()+" puts "+cake); while (queue.size() >= count) { wait(); } queue.offer(cake); notifyAll(); } public synchronized String take() throws InterruptedException { while (queue.size() <= 0) { wait(); } String cake = queue.poll(); notifyAll(); System.out.println(Thread.currentThread().getName()+" takes "+cake); return cake; } }
生產者,消費者以及啟動類程式碼如下:
/** * @author koma <[email protected]> * @date 2018-10-18 */ public class Main { public static void main(String[] args) { Table table = new Table(3); new MakerThread("MakerThread-1", table, 314151).start(); new MakerThread("MakerThread-2", table, 523242).start(); new MakerThread("MakerThread-3", table, 716151).start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } new EaterThread("EaterThread-1", table, 625341).start(); new EaterThread("EaterThread-2", table, 525349).start(); new EaterThread("EaterThread-3", table, 225841).start(); } } public class EaterThread extends Thread { private final Random random; private final Table table; public EaterThread(String name, Table table, long seed) { super(name); this.table = table; this.random = new Random(seed); } @Override public void run() { try { while (true) { String cake = table.take(); Thread.sleep(random.nextInt(1000)); } } catch (InterruptedException e) { e.printStackTrace(); } } } public class MakerThread extends Thread { private final Random random; private final Table table; private static int id = 0; public MakerThread(String name, Table table, long seed) { super(name); this.table = table; this.random = new Random(seed); } @Override public void run() { try { while (true) { Thread.sleep(random.nextInt(1000)); String cake = "[ Cake No."+nextId()+" by "+getName()+" ]"; table.put(cake); } } catch (InterruptedException e) { e.printStackTrace(); } } private static synchronized int nextId() { return id++; } }
如果 Table 類選擇使用 juc 包中的 BlockingQueue 來實現,則非常簡單。常見的 BlockingQueue 包括基於連結串列實現的 LinkedBlockingQueue,基於陣列實現的 ArrayBlockingQueue,以及帶有優先順序的 PriorityBlockingQueue。這裡我們使用基於連結串列的 BlockingQueue,改寫之後程式碼如下:
/**
* @author koma <[email protected]>
* @date 2018-10-18
*/
public class Table {
private final LinkedBlockingQueue<String> queue;
private final int count;
public Table(int count) {
this.count = count;
queue = new LinkedBlockingQueue<>();
}
public synchronized void put(String cake) throws InterruptedException {
System.out.println(Thread.currentThread().getName()+" puts "+cake);
queue.put(cake);
}
public synchronized String take() throws InterruptedException {
System.out.println(Thread.currentThread().getName()+" takes "+cake);
return queue.take();
}
}
二,讀寫鎖模式
讀寫鎖模式是一種把讀操作和寫操作分開來考慮的模式,在這種模式下一個例項包括兩類鎖:讀鎖和寫鎖。寫鎖可由多個執行緒同時獲取,而讀鎖只能由一個執行緒同時獲取。而且規定在執行寫操作時不能進行讀寫,在執行讀操作時不能進行寫。
一般來說,執行互斥處理會降低程式效能,但是如果把讀寫操作分開來考慮則可以提高程式效能。
下面的示例程式碼,使用讀寫鎖模式來實現對 Data 類的讀寫操作,其中最關鍵的是 ReadWriteLock 類,該類使用到了不可變模式。
/**
* @author koma <[email protected]>
* @date 2018-10-18
*/
public class Main {
public static void main(String[] args) {
Data data = new Data(10);
new ReaderThread(data).start();
new ReaderThread(data).start();
new ReaderThread(data).start();
new ReaderThread(data).start();
new ReaderThread(data).start();
new ReaderThread(data).start();
new WriterThread(data, "ABCDEFGHIJKLMNOPQRSTUVWXYZ").start();
new WriterThread(data, "abcdefghijklmnopqrstuvwxyz").start();
}
}
public class Data {
private final char[] buffer;
private final ReadWriteLock lock = new ReadWriteLock();
public Data(int size) {
this.buffer = new char[size];
for (int i = 0; i < size; i++) {
buffer[i] = '*';
}
}
public char[] read() throws InterruptedException {
try {
lock.readLock();
return doRead();
} finally {
lock.readUnlock();
}
}
public void write(char c) throws InterruptedException {
try {
lock.writeLock();
doWrite(c);
} finally {
lock.writeUnlock();
}
}
private void doWrite(char c) {
for (int i = 0; i < buffer.length; i++) {
buffer[i] = c;
slowly();
}
}
private char[] doRead() {
char[] newBuf = new char[buffer.length];
for (int i = 0; i < buffer.length; i++) {
newBuf[i] = buffer[i];
}
slowly();
return newBuf;
}
private void slowly() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ReaderThread extends Thread {
private final Data data;
public ReaderThread(Data data) {
this.data = data;
}
@Override
public void run() {
try {
while (true) {
char[] readBuf = data.read();
System.out.println(Thread.currentThread().getName()+" reads "+String.valueOf(readBuf));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class WriterThread extends Thread {
private final static Random random = new Random();
private final Data data;
private final String filler;
private int index = 0;
public WriterThread(Data data, String filler) {
this.data = data;
this.filler = filler;
}
@Override
public void run() {
try {
while (true) {
char c = nextChar();
data.write(c);
Thread.sleep(random.nextInt(3000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private char nextChar() {
char c = filler.charAt(index);
index++;
if (index >= filler.length()) {
index = 0;
}
return c;
}
}
public final class ReadWriteLock {
private int readingReaders = 0; //正在執行讀操作的執行緒數
private int waitingWriters = 0; //等待寫鎖的執行緒數
private int writingWriters = 0; //正在執行寫操作的執行緒數
private boolean preferWriter = true; //是否寫入優先
public synchronized void readLock() throws InterruptedException {
while (writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
wait();
}
readingReaders++;
}
public synchronized void readUnlock() {
readingReaders--;
preferWriter = true;
notifyAll();
}
public synchronized void writeLock() throws InterruptedException {
waitingWriters++;
try {
while (readingReaders > 0 || writingWriters > 0) {
wait();
}
} finally {
waitingWriters--;
}
writingWriters++;
}
public synchronized void writeUnlock() {
writingWriters--;
preferWriter = false;
notifyAll();
}
}
讀寫鎖模式利用了讀操作不修改例項狀態的特性,這樣多個讀操作執行緒之間就不存在衝突,因此不用做同步處理,從而提供程式效能。但是效能的提升不是絕對的,需要實際測量,同時也需要考慮以下兩個場景要求:
- 讀取操作耗時的操作,當讀取操作很簡單時,單執行緒模式相比而言更加簡單高效
- 讀取頻率比寫入頻率高的操作,當寫入頻率較高時,寫入操作頻繁的打斷讀取操作,讀寫鎖模式的優越性降低
1,鎖的含義
synchronized 是用於獲取例項的鎖。Java 中每個物件的例項都持有一個鎖,但同一個鎖同時只能由一個執行緒持有。這種結構是 Java 規範規定的,JVM 也是這麼實現的。這種鎖稱為物理鎖。
在讀寫鎖模式中,這裡的鎖與 synchronized 獲取的鎖是不一樣的,這並不是 Java 規範規定的鎖,而是由開發人員自己實現的,這種鎖稱為邏輯鎖。
ReadWriteLock 類中有讀鎖和寫鎖,但這是邏輯鎖,這兩個邏輯鎖共用同一個由 synchronized 獲取的 ReadWriteLock 類例項的物理鎖。這就是為什麼我們在 ReadWriteLock 類中的加鎖、解鎖方法上都聲明瞭 synchronized 關鍵字的緣故,因為它們最終要共用同一把物理鎖,而同一把物理鎖同一時間只能由一個執行緒持有,這種規定保證了邏輯鎖的實現。
2,Java 中的讀寫鎖
在 juc 包中提供了實現了讀寫鎖模式的 ReadWriteLock 介面和 ReentrantReadWriteLock 實現類。通過 ReentrantReadWriteLock 改寫之後的 Data 類程式碼如下:
/**
* @author koma <[email protected]>
* @date 2018-10-18
*/
public class Data {
private final char[] buffer;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data(int size) {
this.buffer = new char[size];
for (int i = 0; i < size; i++) {
buffer[i] = '*';
}
}
public char[] read() throws InterruptedException {
readLock.lock();
try {
Thread.sleep(10000);
return doRead();
} finally {
readLock.unlock();
}
}
public void write(char c) throws InterruptedException {
writeLock.lock();
try {
doWrite(c);
} finally {
writeLock.unlock();
}
}
//其它方法不變