1. 程式人生 > >Async 異步轉同步詳細流程解釋

Async 異步轉同步詳細流程解釋

push 模型 ack 如何 parallel 沒有 callback 代碼 class

安裝

npm install async --save

地址

https://github.com/caolan/async

Async的內容主要分為三部分

  1. 流程控制: 簡化九種常見的流程的處理
  2. 集合處理:如何使用異步操作處理集中的數據
  3. 工具類:幾個常用的工具類

本文主要介紹流程控制部分,後續內容持續更新,由於node.js是異步編程模型,有許多在同步編程中很容易做到的事情,現在就會變的很麻煩,並且存在很多的callback。但是,Async的流程控制給我們coder帶來了許多便利。


1.series(task, [callback])(多個函數依次執行,之間沒有數據交換)

有多個異步函數需要依次調用,一個完成之後才能執行下一個。各函數之間沒有數據交換,僅僅需要保證其順序執行。這時可以使用series。

js代碼

step1(function (err, v1) {
    step2(function(err, v2){
        step3(function(err,v3){
            //code with the value [v1|v2|v3] or err
        });
    });
});

從上面的代碼中可以看到,這些嵌套還是比較深的,如果操作更加復雜,那麽會讓代碼的可讀性降低。此外,在代碼中忽略了對每一層err的處理,否則還要加上if(err) return callback(err); 那就更麻煩了。

對於這種情況,我們可以使用async來處理

var async = require(‘async‘);
async.series([
    function(callback){
        step1(function(err, v1){
            //code with v1
            callback(err, v1);
        });
    },
    function(callback){step2(……)},
    function(callback){step3(……)}
],function(err, values){
    //code with the value [v1|v2|v3] or the err
});

上述async的詳細解釋為

  1. 依次執行一個函數數組中的每個函數,每一個函數執行完成之後才能執行下一個函數。
  2. 如果任何一個函數向它的回調函數中傳了一個error,則後面的函數都不會被執行,並且將會立刻將該error以及已經執行了的函數的結果,傳給series中最後的那個callback。
  3. 將所有的函數執行完後(沒有出錯),則會把每個函數傳給其回調函數的結果合並為一個數組,傳給series最後的那個callback。
  4. 還可以以json的形式提供tasks。每一個屬性都會被當作函數來執行,並且結果也會以json形式傳給series中最後的那個callback。這種方式可讀性更高

註: 多個series調用之間是不分先後的,因為series本身也是異步調用。

2.parallel(tasks,[callback])(多個函數並行執行)

並行執行多個函數,每個函數都是立刻執行,不需要等待其他函數先執行。傳給最終callback的數組中的數據按照tasks聲明的順序,而不是執行完成的順序

如果某個函數出錯,則立刻將err和已經執行完的函數的結果值傳給parallel最終的callback。其它為執行完的函數的值不會傳到最終數據,但要占個位置。

同時支持json形式的tasks,其最終callback的結果也為json形式。

正常執行的代碼如下:

async.parallel([
    function(callback){t.fire(‘f400‘, callback, 400)},
    function(callback){t.fire(‘f200‘, callback, 200)},
    function(callback){t.fire(‘f300‘, callback, 300)}
],function(err, results){
    log(err); //->undefined
    log(results); //->[‘f400‘, ‘f200‘, ‘f300‘]
});

中途出錯的代碼如下:

async.parallel([
    function(callback){t.fire(‘f400‘, callback, 400)},
    function(callback){t.fire(‘f200‘, callback, 200)},
    function(callback){t.err(‘e300‘, callback, 300)}
], function(err, results){
    log(err); //->e300
    log(results); //->[, ‘f200‘, undefined]
});

3.waterfall(tasks, [callback])(多個函數依次執行,且前一個的輸出為後一個的輸入)

與series相似,按順序依次執行多個函數。不同之處,每一個函數產生的值,都將傳給下一個函數,如果中途出錯,後面的函數將不會執行,錯誤信息以及之前產生的結果,都傳給waterfall最終的callback。

這個函數的名字為waterfall(瀑布),可以想象瀑布從上到下,承上啟下,有點類似於linux中的pipes。 註意該函數不支持json格式的tasks。

async.waterfall([
    function(callback){log(‘start‘), callback(null, 3)},
    function(n, callback){log(n), t.inc(n, cb);/*inc為類似於i++的函數*/},
    function(n, callback){log(n), t.fire(n*n, cb);}
], function(err, results){
    log(err);
    log(results);
});

/**
    output

    start
    3
    4
    err: null
    results: 16
 */

4.auto(tasks, [callback])(多個函數有依賴關系, 有的並行執行,有的一次執行)

auto可以彌補parallel和series中的不足

例如我要完成下面的事情

  1. 從某處取得數據
  2. 在硬盤上建立一個新的目錄
  3. 將數據寫入到目錄下某文件
  4. 發送郵件
async.auto({
    getData: function(callback){
        setTimeout(function(){
            console.log(‘got data‘);
            callback(null, ‘mydata‘);
        }, 300);
    },
    makeFolder: function(callback){
        setTimeout(function() {
            console.log(‘made folder‘);
            callback(null, ‘myfolder‘);
        }, 200);
    },
    writeFile:[‘getData‘, ‘makeFolder‘, function(callback){
        setTimeout(function(){
            console.log(‘write file‘);
            callback(null, ‘myfile‘);
        }, 300);
    }],
    emailFiles: [‘writeFile‘, function(callback, results){
        log(‘send email‘);
        callback(null, results.writeFile);
    }]
},function(err, results){
    log(err); //->null
    log(results);
    //made folder
    //got data
    //write file
    //send email

    /*
    results{
        makeFolder: ‘myfolder‘,
        getData: ‘mydata‘,
        writeFile: ‘myfile‘,
        emailFiles: ‘myfile‘
    }
    */
});

5.whilst(test, fn, callback)(該函數的功能比較簡單,條件變量通常定義在外面,可供每個函數訪問。在循環中,異步調用時產生的值實際上被丟棄了,因為最後的callback只能傳入錯誤信息,另外,第二個函數fn需要接受一個函數的cb, 這個cb最終必需被執行,用於表示出錯或正常結束)

var count = 0;
async.whilst(
    //test
    function(){return count < 3;},
    function(cb){
        log(count);
        count++;
        setTimeout(cb, 1000);
    },
    function(err){
        //3s have passed
        log(err);
    }
);
/*
    0
    1
    2
    null
*/

6.until(test, fn, callback)(與whilst相似,但判斷條件相反)

var count_until = 0;
async.until(
    //test
    function(){ return count_until > 3;},
    function(cb){
        log(count_until);
        count_until++;
        setTimeout(cb, 1000);
    },
    function(err){
        //4s have passed
        log(err);
    }
);
/*
    0
    1
    2
    3
    null
*/

7.queue(可設定worker數量的隊列)

queue相當於一個加強版的parallel, 主要限制了worker數量,不再一次性全部執行。當worker數量不夠用時,新加入的任務將會排隊等候,直到有新的worker可用。

該函數有多個點可供回調,如worker用完時、無等候任務時、全部執行完時等。

//定義一個queue, 設worker數量為2
var q = async.queue(function(task, callback){
    log(‘worker is processing task: ‘ + task.name);
    task.run(callback);
}, 2);
//監聽:如果某次push操作後, 任務數將達到或超過worker數量時, 將調用該函數
q.saturated = function(){
    log(‘all workers to be used‘);
}
//監聽:當最後一個任務交給worker時,將調用該函數
q.empty = function(){
    log(‘no more tasks waiting‘);
}
//監聽:當所有任務都執行完以後,將調用該函數
q.drain = function(){
    log(‘all tasks have been processed‘);
}

//獨立加入兩個任務
q.push({name : ‘t1‘, run: function(cb){
    log(‘t1 is running, waiting tasks:‘ + q.length());
    t.fire(‘t2‘, cb, 400); //400ms後執行
}}, function(err){
    log(‘t1 executed‘);
});

log(‘pushed t1, waiting tasks:‘ + q.length());

q.push({name: ‘t2‘, run: function(cb){
    log(‘t2 is running, waiting tasks:‘ + q.length());
    t.fire(‘t2‘, cb, 200); //200ms後執行
}}, function(err){
    log(‘t2 executed‘);
});

log(‘pushed t2, waiting tasks:‘ + q.length());

/**
    pushed t1, waiting tasks:1
    all workers to be used
    pushed t2, waiting tasks:2
    worker is processing task : t1
    t1 is running, waiting tasks: 1
    no more tasks waiting
    worker is processing task : t2
    t2 is running, waiting tasks: 0
    t2 executed
    t1 executed
    all tasks have been processed
 */

8.iterator(tasks)(將幾個函數包裝為iterator)

將一組函數包裝成為一個iterator, 可通過next()得到以下一個函數為起點的新的iterator。該函數通常由async在內部使用,但如果需要時,也可在我們的代碼中使用它。

var iter = async.iterator([
    function(){console.log(‘111‘);},
    function(){console.log(‘222‘);},
    function(){console.log(‘333‘);}
]);

var it1 = iter();
it1();

其中還包括了next()方法。

9.nextTick(callback)(在nodejs與瀏覽器兩邊行為一致)

nextTick的作用和nodejs的nextTick一樣,都是把某個函數調用放在隊列的尾部,但在瀏覽器端,只能使用setTimeout(callback, 0),但這個方法有時候會讓其它高優先級的任務插到前面去。

所以提供了這個nextTick,讓同樣的代碼在服務器端和瀏覽器端表現一致。

var calls = [];
async.nextTick(function(){
    calls.push(‘two‘);
});
calls.push(‘one‘);
async.nextTick(function(){
    console.log(calls); //-> [‘one‘, ‘two‘]
})

上述內容為學習筆記,大部分內容摘抄自alsotang的github中的async_demo,網址

Async 異步轉同步詳細流程解釋