1. 程式人生 > >多執行緒設計模式——Producer-Consumer生產者消費者模式

多執行緒設計模式——Producer-Consumer生產者消費者模式

這些都是根據我最近看的《Java實戰指南多執行緒程式設計(設計模式篇)》所得整理。

模式名稱

Producer-Consumer生產者消費者模式

模式面對的問題

有的執行緒的結果是另外一些執行緒的原料,也就是說,一些執行緒是生產者,另外一些執行緒是消費者,消費者需要生產者生產的東西才能正常執行,協調兩者的關係成了一個大的問題。

解決思路

有一箇中間的儲存位置,用來儲存生產者生產出來的東西,稱之為通道。
Producer生產者
Product生產者所生產的任務
Channel通道的抽象
BlockingQueueChannel基於阻塞佇列的Channel實現
Consumer消費者

Created with Raphaël 2.1.0ClientClientProducterProducterproductproductChannelChannel1service()2creat3put()45

例子程式碼

某內容管理系統需要支援對文件附件中的檔案進行全文檢索。改系統中,附件會被上傳到專用的檔案伺服器上,對附件進行全文檢索的功能模組也是部署在檔案伺服器上的。
模式主類

public class AttachmentProcessor {
    private final String ATTACHMENT_STORE_BASE_DIR = "/home/viscent/tmp/attachments/"
; //模式角色Producer-Consumer.Channer priuvate final Channer<File> channer = new BlockingQueueChannel<File>( new ArrayBlockingQueue<File>(200)); //模式角色Producer-Consumer.Consumer private final AbstractTerminatableThread indexingThread = new AbstractTerminatableThread(){ @Override
protected void doRun()throws Exception{ File file =null; file = channel.take(); try{ indexFile(file); }catch(Exception e){ e.printStackTrace(); }finally{ terminationToken.reservations.decrementAndGet(); } } //根據制定檔案生成全文搜尋所需的索引檔案 private void indexFile(File file) throws Exception{ //省略與模式無關的程式碼 //模擬生成索引檔案的時間消耗 Radom rnd =new Random(); try{ Thread.sleep(rnd.nextInt(100)); }catch(InterruptedException e){ ; } } }; public void init(){ indexingThread.start(); } public void shutdown(){ indexingThread.terminate(); } public void saveAttachment(InputStream in, String documentId,String originalFileName)throws IOException{ File file = saveAsFile(in,documentId,originalFileName); try{ channel.put(file); }catch(InterruptedException e){ ; } indexingTread.terminationToken.reservations.incrementAndGet(); } private FTPClient initFTPClient(String ftpServer,String userName, String password) throws Exception{ FTPClient ftpClient = new FTPClient(); FTOClientConfig config = new FTPClientConfig(); ftpClient.config(config); int reply; ftpClient.connect(ftpServer); System.out.print(ftpClient.getReplyString()); reply = ftpClient.getReplyCode(); if(!dirName.equals(file.getCanoicalFile().getParent())){ throw new SecurityException("Invalid originalFileName:"+originalFileName); } BufferedOutputStream bos =null; BufferedInputStream bis = new BufferedInputStream(in); byte[]buf = new byte[2048]; int len = -1; try{ bos =new BufferedOutputStream(new FileOutputSteram(file)); while((len = bis.read(buf) > 0)){ bos.write(buf,0,len); } bos.flush(); }finally{ try{ bis.close(); }catch(IOException e){ ; } try{ if(null != bos){ bos.close(); } }catch(IOException e){ ; } } ftpClient.setFileType(FTP.ASCII_FILE_TYPE); return ftpClient; } }

Channel介面

public interface Channel<P> {
    //從通道中取一個"產品"。
    P take() throws InterruptedException;

    //往通道里面儲存一個"產品"。
    void put(P product) throws InterruptedException;
}

BlockingQueueChannel類

public class BlockingQueueChannel<P> implements Channel<P> {
    private final BlockingQueue<P>queue;
    public BlockingQueueChannel(BlockingQueue<P>queue){
        this.queue = queue;
    }

    @Override
    public P take() throws InterruptedException{
        return queue.take();
    }


    @Override
    public void put(P product) throws InterruptedException{
        queue.put(product);
    }
}

模式的評價與實現考量

生產這消費者模式是一個經典的執行緒模式嗎,但是它也有一些容易出現的問題:
1. 管道積壓:生產者消費者模式中消費者的處理能力往往低於生產這的處理能力,會出現管道擠壓的現象。處理這種現象,有集中方法:使用有界阻塞佇列,佇列到一定數量就不在生產,等待消費;使用有流量控制的無界阻塞佇列,線上程的時間分配時對生產者的時間進行限制來平衡。
2. 工作竊取演算法:如果是多個消費者從管道中取得產品,會出現執行緒安全的問題,所以會有一個通道例項對應多個佇列例項來處理。
3. 執行緒的停止:整個模式也可以看做一個執行緒,這個執行緒的停止會比一般的執行緒要複雜一些,需要注意處理。
4. 高效能高可靠性:這裡的示例程式碼是一個比較一般的實現,如果有較高的要求,可以考慮Producer-Consumer模式實現庫LMAX Disruptor:https://github.com/LMAX-Exchange/disruptor