1. 程式人生 > >執行緒池運用例項——一次錯誤的多執行緒程式設計以及修復過程

執行緒池運用例項——一次錯誤的多執行緒程式設計以及修復過程

寫在前面的話 

寫下這篇文章只為了回顧之前在實際工作中犯的一個極其二逼的錯誤,用我的經歷來提示後來者,諸位程式大神,大牛,小牛們看到此文笑笑即可,輕拍輕拍。。。

1 背景

有這麼一個需求,我們的系統(後面簡稱:A系統)需要在後臺執行一個報表匯出任務,在這個任務的執行過程中需要通過CORBA呼叫其他系統(後面簡稱:B系統)的一個(也有可能是多個)介面去查詢報表,待結果返回後,將這些結果寫入Excel。這個需求是不是很簡單?套用網上一些FutureTask或者執行緒池的例子一兩小時就能搞定這個需求。當時我也是這樣認為的,可誰想,這是一個巨大的坑….

2 初始設計

用過CORBA的同學會知道,如同資料庫連線一樣,CORBA的連線數也是是有限的,如果一個介面呼叫的時間過長,就會長時間佔用CORBA有限的連線數,當這種長時間的同步呼叫過多時就會造成整個系統CORBA呼叫的阻塞,進而造成系統停止響應。由於查詢操作很耗時,為了避免這種情況的發生,這個介面被設計成了一個非同步介面。任務的執行流程就會是這樣:任務開始執行,接著呼叫這個介面並且通過CORBA向B系統訂閱一個事件,然後任務進入等待狀態,當B系統執行完成後,會向A系統傳送一個事件告知執行的結果,任務收到事件後重新開始執行直到結束,如圖:

既然說到了事件,那麼很自然而然的就想到了使用回撥的方式去響應事件,並且為了避免事件超時(也就是長時間沒有接收到事件)導致任務長時間等待,我還使用了一個定時的任務去檢查任務的狀態。所以我的程式看起來就像這樣:

IEventFuture.java

public interface IEventFuture {
    void onEventReceived(Event event);
}

ExportRptTask.java

public class ExportRptTask implements Callable<Void>, IEventFuture {
    private static final int INITIALIZED = 0;
    private static final int RUNNING = 1;
    private static final int COMPLETED = 2;
    private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
    private Date lastUpdate = new Date();
    private volatile int state = INITIALIZED;

    private Timer timer = new Timer();
    private SystemBSer systemBSer = new SystemBSer();

    private int eventId = -1;

    @Override
    public Void call() throws Exception {
        this.state = RUNNING;
        try {
            systemBSer.doQuery();
            subscribeEvent();
            startTaskTimeoutMonitorTask();
            Future future = createEventFuture();
            future.get();
        } catch (Throwable t) {
            onTaskError(t);
        } finally {
            EventManager.unsubscribe(this.eventId);
            timer.cancel();
        }
        return null;
    }

    @Override
    public void onEventReceived(Event event) {
        this.lastUpdate = new Date();
// start to write excel
// .....
// end to write excel
        this.state = COMPLETED;
    }

    private void subscribeEvent() {
        this.eventId = EventManager.subscribe(this);
    }

    private Future createEventFuture() {
        FutureTask<Void> listenFuture = new FutureTask<Void>(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                while (state != COMPLETED) {

                }
                return null;
            }
        });

        new Thread(listenFuture).start();
        return listenFuture;
    }

    private void startTaskTimeoutMonitorTask() {
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {

                if (state != COMPLETED || new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
                    onTaskTimeout();
                }
            }
        }, 0, 15 * 60 * 1000);
    }

    private void onTaskTimeout() {
        // do something on task timeout.
        //   ....
        // end

        // set task to completed to end task.
        this.state = COMPLETED;
    }

    private void onTaskError(Throwable t) {
// do something to handle error.
    }
}

3 升級改進

由於做這個需求的關係,我開始閱讀一些關於JAVA多執行緒程式設計的一下教程,在閱讀到關於閉鎖的內容時,我突然靈光一現,這玩意不正好可以代替我那個醜陋的使用迴圈來讓任務進入等待狀態的實現麼?然後我的程式就變成了這樣:

ExportRptTask.java

public class ExportRptTask implements Callable<Void>, IEventFuture {
    private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
    private Date lastUpdate = new Date();

    private CountDownLatch endGate = new CountDownLatch(1);
    private Timer timer = new Timer();
    private SystemBSer systemBSer = new SystemBSer();

    private int eventId = -1;

    @Override
    public Void call() throws Exception {
        try {
            systemBSer.doQuery();
            subscribeEvent();
            endGate.await();
            startTaskTimeoutMonitorTask();
        } catch (Throwable t) {
            onTaskError(t);
        } finally {
            EventManager.unsubscribe(this.eventId);
            timer.cancel();
        }
        return null;
    }

    @Override
    public void onEventReceived(Event event) {
        this.lastUpdate = new Date();
// start to write excel
// .....
// end to write excel
        this.endGate.countDown();
    }

    private void subscribeEvent() {
        this.eventId = EventManager.subscribe(this);
    }

    private void startTaskTimeoutMonitorTask() {
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {

                if (new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
                    onTaskTimeout();
                }
            }
        }, 0, 15 * 60 * 1000);
    }

    private void onTaskTimeout() {
// do something on task timeout.
//   ....
// end

// set task to completed to end task.
        this.endGate.countDown();
    }

    private void onTaskError(Throwable t) {
// do something to handle error.
    }
}

4 問題浮現

正在我為我使用高大上的閉鎖代替迴圈沾沾自喜的時候,測試大爺告訴我,任務經常莫名其妙的失敗,並且日誌中沒有任何異常。開始,這讓我覺得很不可思議,因為我已經在call()方法處處理了所有的異常,任務失敗時至少也應該有個日誌啥的吧。這個問題一直困擾著我,直到有一天分析日誌我突然發現任務執行的工作執行緒(也就是call()方法所在的執行緒)和接收到事件後的回撥並不是同一個執行緒。這就意味著在查詢到報表結果後,所有寫Excel,分發結果等等的操作都是在事件回撥的執行緒中執行的,那麼一旦這裡發生異常原來call()中的catch塊自然無法捕獲,然後異常就被莫名其妙的吞掉了。好吧,我承認我之前對執行緒池也就瞭解點皮毛,對多執行緒也僅僅是有個概念,想當然的認為線上程池中可以Hold住任務的一切,包括響應這個任務在執行過程中建立的其他執行緒執行時發生的異常。而且更嚴重的是按照原來的實現,只有當整個任務執行完成(包括寫完Excel)後,才會釋放那個閉鎖,所以一旦事件回調發生異常,那麼整個任務都無法終止。線上程池中發生一個任務永遠無法終止的後果,你懂的。

5 重新設計

痛定思痛,我決定重新梳理這個任務的流程。這個需求的難點就是在如何監聽並響應B系統給我們傳送的事件,實際上,這是一個很經典的生產者–消費者問題,而阻塞佇列正好是解決這類問題的利器。重新設計的事件響應流程就變成:當B系統傳送事件的時候,事件回撥執行緒會往阻塞佇列裡面填充一個事件。在另一方面,任務呼叫完B系統的查詢介面後,就開始從阻塞佇列中取事件,當事件佇列為空的時候,取事件的執行緒(也就是執行緒池執行任務的工作執行緒)會被阻塞。並且,阻塞佇列的取操作可以設定超時時間,所以當取到的事件物件為空時,就意味著事件超時了,這樣就省去了使用定時任務定時檢查任務狀態的工作。重新設計的程式是這樣的:

EventProxy.java

public class EventProxy implements IEventFuture {
    private static final BlockingQueue<Event> eventQueue = new ArrayBlockingQueue<Event>(10);
    private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;

    @Override
    public void onEventReceived(Event event) {
        eventQueue.offer(event);
    }

    public Event getEvent() throws InterruptedException {
        return eventQueue.poll(TASK_TIME_OUT_TIME, TimeUnit.MILLISECONDS);
    }
}

ExportRptTask.java

public class ExportRptTask3 implements Callable<Void> {

    private SystemBSer systemBSer = new SystemBSer();
    private EventProxy eventProxy = new EventProxy();

    private int eventId = -1;

    @Override
    public Void call() throws Exception {
        try {
            systemBSer.doQuery();
            subscribeEvent();

            Event event = eventProxy.getEvent();
            if (event != null) {
                processEvent(event);
            } else {
                onTaskTimeout();
            }
        } catch (Throwable t) {
            onTaskError(t);
        } finally {
            EventManager.unsubscribe(this.eventId);
        }
        return null;
    }

    private void subscribeEvent() {
        this.eventId = EventManager.subscribe(eventProxy);
    }

    private void processEvent(Event event) {
// do something on receive event.
    }

    private void onTaskTimeout() {
// do something on task timeout.
//   ....
// end
    }

    private void onTaskError(Throwable t) {
// do something to handle error.
    }
}

6 總結

相信各位併發程式設計的大牛們能在一瞬間就可以把我的程式(包括改進後的)批得體無完膚,不過我還是想分享下我在這個過程中的收穫。

  • 在動手寫程式前,請先理解你的需求,特別是要注意用已有的模型去識別問題,在本例中,我就是沒有識別響應事件的流程其實是個生產者–消費者問題導致了後面的錯誤
  • 請充分的瞭解你需要使用的技術和工具。比如,使用執行緒池你就要了解執行緒池的工作原理,這樣你才能正確的使用這些技術。做技術切忌想當然。
  • 在使用執行緒池時,重要的操作儘量放在任務的主執行緒中執行(也就是call()/run()方法所在的執行緒),否則執行緒池本身難以對任務進行控制。
  • 如果一定要在任務中再建立新的執行緒,請確保任務主執行緒是任務最後退出的執行緒。切忌不要使用外部執行緒直接呼叫任務類的方法,在本例中我就犯了這樣的錯誤。