1. 程式人生 > >從零系列--node爬蟲利用進程池寫數據

從零系列--node爬蟲利用進程池寫數據

存儲 exit titles date [] += port 請求 如果

1、主進程

const http = require(‘http‘);
const fs = require(‘fs‘);
const cheerio = require(‘cheerio‘);
const request = require(‘request‘);
const makePool = require(‘./pooler‘)
const runJob = makePool(‘./worker‘)
var i = 0;
var url = "http://xxx.com/articles/"; 
//初始url 
let g = ‘‘;
function fetchPage(x) {     //封裝了一層函數
console.log(x) if(!x || x==‘‘){ g.next() return } startRequest(x); } function startRequest(x) { //采用http模塊向服務器發起一次get請求 return http.get(x, function (res) { var html = ‘‘; //用來存儲請求網頁的整個html內容 var titles = []; res.setEncoding(
‘utf-8‘); //防止中文亂碼 //監聽data事件,每次取一塊數據 res.on(‘data‘, function (chunk) { html += chunk; }); //監聽end事件,如果整個網頁內容的html都獲取完畢,就執行回調函數 res.on(‘end‘, function () { var $ = cheerio.load(html); //采用cheerio模塊解析html var time = new Date(); var
p = $(‘.content p‘) p.each((index,item)=>{ if($(item).find(‘strong‘).length) { var fex_item = { //獲取文章的標題 title: $(item).find(‘strong‘).text().trim(), //獲取文章發布的時間 time: time, //獲取當前文章的url link: $($(item).children(‘a‘).get(0)).attr(‘href‘), des:$(item).children().remove()&&$(item).text(), //i是用來判斷獲取了多少篇文章 i: index+1 }; runJob(fex_item,(err,data)=>{ if(err) console.error(‘get link error‘) console.log(‘get link ok‘) }) } }) g.next() }) }).on(‘error‘, function (err) { console.log(err); g.next() }); } function* gen(urls){ let len = urls.length; for(var i=0;i<len;i++){ yield fetchPage(urls[i]) } } function getUrl(x){ //采用http模塊向服務器發起一次get請求 http.get(x, function (res) { var html = ‘‘; //用來存儲請求網頁的整個html內容 var titles = []; res.setEncoding(‘utf-8‘); //防止中文亂碼 //監聽data事件,每次取一塊數據 res.on(‘data‘, function (chunk) { html += chunk; }); //監聽end事件,如果整個網頁內容的html都獲取完畢,就執行回調函數 res.on(‘end‘, function () { var $ = cheerio.load(html); //采用cheerio模塊解析html var time = new Date(); var lists = $(‘.articles .post-list li‘) var urls = []; lists.each(function(index,item){ if($(item).find(‘a‘).length) { var url = ‘http://xxxx.com‘+$($(item).children(‘a‘).get(0)).attr(‘href‘); if(url) urls.push(url); //主程序開始運行 } }) g = gen(urls) g.next() }) }).on(‘error‘, function (err) { console.log(err); }); } getUrl(url)

2、創建進程池

const cp = require(‘child_process‘)
const cpus = require(‘os‘).cpus().length;

module.exports =  function pooler(workModule){
  let awaiting = [],readyPool = [],poolSize = 0;
  return function doWork(job,cb){
    if(!readyPool.length&&poolSize>cpus)
      return awaiting.push([doWork,job,cb])

    let child = readyPool.length ? readyPool.shift():(poolSize++,cp.fork(workModule))
    let cbTriggered = false;
    child.removeAllListeners()
    .once(‘error‘,function(err){
      if(!cbTriggered){
        cb(err)
        cbTriggered = true
      }
      child.kill()
    })
    .once(‘eixt‘,function(){
      if(!cbTriggered)
      cb(new Error(‘childe exited with code:‘+code))
      poolSize--;
      let childIdx = readyPool.indexOf(child)
      if(childIdx > -1)readyPool.splice(childIdx,1)
    })
    .once(‘message‘,function(msg){
      cb(null,msg)
      cbTriggered = true
      readyPool.push(child)
      if(awaiting.length)setImmediate.apply(null,awaiting.shift())
    })
    .send(job)
  }
}

3、工作進程接受消息並處理內容

const fs = require(‘fs‘)
process.on(‘message‘,function(job){
  let _job = job
  let x = ‘TITLE:‘+_job.title+‘\n‘ + ‘LINK:‘+_job.link + ‘\n DES:‘+_job.des+‘\n SAVE-TIME:‘+_job.time
  
  fs.writeFile(‘../xx/data/‘ + _job.title + ‘.txt‘, x, ‘utf-8‘, function (err) {
      if (err) {
          console.log(err);
      }
  });
  process.send(‘finish‘)
})

從零系列--node爬蟲利用進程池寫數據