1. 程式人生 > >爬蟲(三) redis&分散式爬蟲

爬蟲(三) redis&分散式爬蟲

思維導圖

redis

redis, 稱為記憶體資料庫, 以key-value的形式存放資料, 是一個非關係型資料庫

redis 提供類豐富的資料型別, 其有 string list map set sortSet 五種資料型別

redis 的資料型別指的是value的資料型別, key都是String型別的

1. 持久化

  • RDB (預設開啟): 是一種基於快照機制來實現的持久化的方案, 可以把快照看做是照相機的照片, 可以將redis的某一個時刻記錄下來, 然後儲存到本地磁碟上, 一般情況下 快照檔案都比較小 只有幾kb
    • 優點: 快照檔案一般很小, 適合於做災難恢復
    • 缺點: 資料丟失的風險
  • AOF: 是一種基於日誌機制的持久化方案, 可以將使用者操作的命令, 整體的都儲存下來, 儲存到一個本地磁碟上, 而一般這個檔案比較龐大, 最大有時候好幾T的檔案
    • 優點: 資料的丟失的風險比較小
    • 缺點: 不適合做災難恢復
  • RDB: redis預設是打開了RDB的
save 900 1     : 在 900秒之內, 如果有1個數據傳送了改變, 就會執行一次儲存
save 300 10    : 在 300秒之內, 如果有10個數據傳送了改變, 就會執行一次儲存
save 60 10000  : 在 60秒之內, 如果有10000 個數據傳送了改變, 就會執行一次儲存

會發生資料丟失了, 極端: 在不到5分鐘的時間內, 丟失 9999個數據

  • AOF: redis 預設是關閉了AOF機制的
appendonly yes  :  是否開啟AOF機制   yes 已經開啟  no 表示未開啟
appendfsync everysec :  AOF的儲存機制  [ always everysec no ]
  • always: 總是,
    • 會將使用者每一次的命令都會執行一次儲存的操作
      • 缺點: 大大的影響redis的效能
  • everysec: 每秒執行
    • 會每秒鐘執行一次儲存操作
  • no : redis不去主動的進行儲存
    • 依賴於作業系統進行儲存操作(linux 大約 30分鐘)
      • 缺點: 會丟失30分鐘的資料

一般來redis的是不會宕機的, 除非redis伺服器的記憶體爆滿


jedis

jedis, 是一款用於連線redis的java API

1. 連線redis

		//1. 建立jedis物件
        Jedis jedis = new Jedis("192.168.4.200", 6379);
        //2. 檢測是否是互通的
        String pong = jedis.ping();

2. API

2.1 redis

驗證密碼 jedis.auth(“password”)
連線 jedis.connect()
斷開連線 jedis.disconnect()
獲取redis中所有key jedis.keys("*")
獲取redis中特定的key jedis.key(“pattern”)
移除redis中的key jedis.del(“key1”…)
移除key的生存時間 jedis.persist(“key1”)
檢查key是否存在 jedis.exists(“key1”)
給key改名 jedis.rename(“oldKeyName”,“newKeyName”)
返回key對應的value型別 jedis.type(“key1”)
清空redis中所有的key jedis.flushAll()
返回redis中key的個數 jedis.dbSize()

2.2 String

  • 特點: 儲存所有的字元和字串
  • 應用場景: 做快取使用
新增資料 jedis.set(“name”,“張三”)
獲取value jedis.get(“name”)
value自增+1 jedis.incr(“age”)
value自減-1 jedis.decr(“age”)
value增指定值 jedis.incrBy(“100”, 10L)
value拼接 jedis.append(“name”,“xixi”)
存多條資料 jedis.mset(“aaa”,“123”,“bbb”,“456”…)
取多條資料 jedis.mget(“aaa”,“bbb”…)
設定key有效期 jedis.setex(“sex”,5,“man”)
給存在的key設定有效期 jedis.expire(“sex”,20)
檢視key的有效期 jedis.ttl(“sex”);

2.3 list

  • 特點: 相當於java中linkList, 是一個連結串列的結構
  • 應用場景: 做任務佇列,
    • 在java中客戶端提供了執行緒安全獲取集合資料的方式
左側插入 jedis.lpush(“list1”,“張三”,“李四”,“王五”…)
右側彈出 jedis.rpop(“list1”)
插入指定元素之後 jedis.linsert(“list1”,BinaryClient.LIST_POSITION.AFTER,“李四”,“趙六”)
插入指定元素之前 jedis.linsert(“list1”, BinaryClient.LIST_POSITION.BEFORE,“李四”,“陳七”);
擷取n個元素 jedis.ltrim(“list1”,0,2)
獲得list中多個value jedis.lrange(“list1”,0,-1) -1表示所有
返回列表的長度 jedis.llen(“list1”)
刪除count個值為value的元素 jedis.lrem(“list1”,0,“張三”)
設定連結串列中角標為index的值 jedis.lset(“list1”,0,“張三啊”)
尾部元素彈出並新增到頭部 jedis.rpoplpush(“list1”,“list1”)

2.4 map

  • 特點: 相當於java中hashMap集合
  • 應用場景: 可以儲存javaBean物件, 此種使用場景不多,可被String替代
新增資料 jedis.hset(“map1”,“name”,“張三”)
獲取資料 jedis.hget(“map1”,“name”)
獲取所有 jedis.hgetAll(“map1”)
獲取所有key jedis.hkeys(“map1”)
獲取所有value jedis.hvals(“map1”)
value自增 jedis.hincrBy(“map1”,“age”,10)
刪除key jedis.hdel(“map1”,“sex”)

2.5 set

  • 特點: 唯一, 無序
  • 應用場景: 集合運算
    • 例如去重複的操作
新增資料 jedis.sadd(“set1”,“張三”,“李四”…)
獲取資料 jedis.smembers(“set1”)
set是否存在該value jedis.sismember(“set1”,“張三”)
獲取set中value個數 jedis.scard()
兩個set的交集 jedis.sinter(“set1”,“set2”)
兩個set的並集 jedis.sunion(“set1”,“set2”)
兩個set的差集 jedis.sdiff(“set1”,“set2”)
兩個set的差集合併到另一個set中 jedis.sdiffstore(“set3”,“set1”,“set2”)

2.6 zSet

  • 特點:唯一, 有序
  • 應用場景: 一般用來做排行榜
新增資料 jedis.zadd(“zset”,100,“張三”)
升序獲取資料 jedis.zrange(“zset”,0,-1)
降序獲取資料 jedis.zrevrange(“zset”,0,-1)
獲取set中value的排名 jedis.zrank(“zset”,“張三”)
獲取set中value的分數(score) jedis.zscore(“zset”,“張三”)
set中value的score自增 jedis.zincrby(“zset”,10,“張三”)

3.jedis連線池

    //jedis的連線池
    @Test
    public void jedisForJedisPool(){
        //1. 建立連線池的配置物件
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(100);
        config.setMaxIdle(20);//最大的閒時的數量
        config.setMinIdle(5); //最小閒時
        //2. 建立jeids的連線池物件
        JedisPool jedisPool = new JedisPool(config,"192.168.72.142",6379);

        //3. 獲取連線
        Jedis jedis = jedisPool.getResource();

        System.out.println(jedis.ping());

        //4. 歸還連線
        jedis.close();


    }

分散式爬蟲

1. 概述

分散式: 指的是將一個程式, 或者說一個業務, 拆分成不同的子專案或者子程式, 進行分開部署

叢集: 指的是將一個程式或者說一個業務, 重複部署多次

一般情況下 叢集 和 分散式 會同時出現

  • 通俗描述:

    • 小飯店原來只有一個廚師,切菜洗菜備料炒菜全乾。

      後來客人多了,廚房一個廚師忙不過來,又請了個廚師,兩個廚師都能炒一樣的菜,這兩個廚師的關係是叢集。

      為了讓廚師專心炒菜,把菜做到極致,又請了個配菜師負責切菜,備菜,備料,廚師和配菜師的關係是分散式。

      一個配菜師也忙不過來了,又請了個配菜師,兩個配菜師關係是叢集。

  • 從剛才的案例中, 請分析出, 叢集和分散式能解決什麼樣的問題?

      1. 主要解決單節點壓力過大
      1. 提高程式碼的複用性
      1. 降低模組間或者各個子系統的耦合性

分散式爬蟲思路

分散式爬蟲

2.代理IP

HttpHost proxy = new HttpHost("someproxy", port);
DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(proxy);
CloseableHttpClient httpclient = HttpClients.custom()
        .setRoutePlanner(routePlanner)
        .build();

httpClientUtils

private static String execute(HttpRequestBase request) {

		RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(5000)// 設定建立連線的最長時間
				.setConnectionRequestTimeout(5000)// 設定獲取連線的最長時間
				.setSocketTimeout(10 * 1000)// 設定資料傳輸的最長時間
				.build();
		request.setConfig(requestConfig);

		String html = null;

		// 從redis中獲取代理IP
		Jedis conn = JedisUtil.getConn();
		// 從右邊彈出一個元素之後,從新放回左邊
		List<String> ipkv = conn.brpop(0, "spider:ip");
		// CloseableHttpClient httpClient = getHttpClient();
		CloseableHttpClient httpClient = getProxyHttpClient(ipkv.get(1));
		try {
			CloseableHttpResponse res = httpClient.execute(request);
			if (200 == res.getStatusLine().getStatusCode()) {
				html = EntityUtils.toString(res.getEntity(), Charset.forName("utf-8"));
				//請求成功之後,將代理IP放回去,下次繼續使用
				conn.lpush("spider:ip", ipkv.get(1));
				conn.close();
			}
		} catch (Exception e) {
			System.out.println("請求失敗");
			// TODO 需要開發自動重試功能
			throw new RuntimeException(e);
		}
		return html;
	}
	private static PoolingHttpClientConnectionManager cm;
	private static CloseableHttpClient getProxyHttpClient(String ipkv) {
        
		String[] vals = ipkv.split(":");
		System.out.println(vals);
		HttpHost proxy = new HttpHost(vals[0], Integer.parseInt(vals[1]));
		DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(proxy);
		return HttpClients.custom().setConnectionManager(connectionManager).setRoutePlanner(routePlanner).build();
	}

其他

1. 下載圖片

public static void main(String[] args) throws Exception {

		// 1.指定圖片的地址
		String image = "http://www.itcast.cn/images/logo.png";
		// 2.使用httpclient獲取資料
		HttpGet httpGet = new HttpGet(image);
		// 3.發起請求
		CloseableHttpClient httpClient = HttpClients.createDefault();
		// 4.執行請求
		CloseableHttpResponse res = httpClient.execute(httpGet);
		// 5.獲取二進位制資料
		HttpEntity entity = res.getEntity();
		InputStream inputStream = entity.getContent();
		// 6.儲存圖片
		File logo = new File("d:/logo.png");
		BufferedOutputStream outputStreamWriter = new BufferedOutputStream(new FileOutputStream(logo));
		// 7.讀資料寫入到檔案
		byte[] buf = new byte[1024];
		int size;
		while (-1 != (size = inputStream.read(buf))) {
			outputStreamWriter.write(buf, 0, size);
		}
		// 8.打掃戰場
		outputStreamWriter.close();
		inputStream.close();
}

2. 下載視訊

public static void main(String[] args) throws Exception {
		// 1.指定視訊的地址
		String image = "http://124.205.69.164/mp4files/92110000062AEFFF/d.itheima.com:81/dc/czxy/1114lkfzf.mp4";
		// 2.使用httpclient獲取資料
		HttpGet httpGet = new HttpGet(image);
		// 3.發起請求
		CloseableHttpClient httpClient = HttpClients.createDefault();
		// 4.執行請求
		CloseableHttpResponse res = httpClient.execute(httpGet);
		// 5.獲取二進位制資料
		HttpEntity entity = res.getEntity();
		InputStream inputStream = entity.getContent();
		// 6.儲存圖片
		File logo = new File("c:/likaifu.mp4");
		BufferedOutputStream outputStreamWriter = new BufferedOutputStream(new FileOutputStream(logo));
		// 7.讀資料寫入到檔案
		byte[] buf = new byte[1024];
		int size;
		while ((size = inputStream.read(buf)) != -1 ) {
			outputStreamWriter.write(buf, 0, size);
		}
		// 8.打掃戰場
		outputStreamWriter.close();
		inputStream.close();
	}

3. 簡單驗證碼識別

  • Tesseract-OCR支援中文識別,並且開源和提供全套的訓練工具,是快速低成本開發的首選。

    • Tesseract的OCR引擎最先由HP實驗室於1985年開始研發,至1995年時已經成為OCR業內最準確的三款識別引擎之一。然而,HP不久便決定放棄OCR業務,Tesseract也從此塵封。數年以後,HP意識到,與其將Tesseract束之高閣,不如貢獻給開源軟體業,讓其重煥新生--2005年,Tesseract由美國內華達州資訊科技研究所獲得,並求諸於Google對Tesseract進行改進、消除Bug、優化工作。
    • Tesseract目前已作為開源專案釋出在Google Project,其專案主頁在這裡檢視
  • Tess4J則是Tesseract在Java PC上的應用

    http://tess4j.sourceforge.net/

 <!-- https://mvnrepository.com/artifact/net.sourceforge.tess4j/tess4j -->
        <dependency>
            <groupId>net.sourceforge.tess4j</groupId>
            <artifactId>tess4j</artifactId>
            <version>3.4.0</version>
        </dependency>
public static void main(String[] args) {
		File imageFile = new File("E:\\env\\tess4j\\input\\2017-12-09_153956.jpg");
		Tesseract tessreact = new Tesseract();
		// 需要指定訓練集 訓練集到 https://github.com/tesseract-ocr/tessdata 下載。
		tessreact.setDatapath("E:\\env\\tess4j\\tessdata");
		// 注意 預設是英文識別,如果做中文識別,需要單獨設定。
		tessreact.setLanguage("chi_sim");
		try {
			String result = tessreact.doOCR(imageFile);
			System.out.println(result);
		} catch (TesseractException e) {
			System.err.println(e.getMessage());
		}
	}

爬蟲程式碼

主伺服器解析列表頁

package com.hrh.master;

import com.hrh.utils.HttpClientUtils;
import com.hrh.utils.JedisUtils;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import redis.clients.jedis.Jedis;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;

/**
 * @author QuietHR
 * @create 2018/9/22
 **/
public class JDMaster {

    public static void main(String[] args) throws IOException, InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    Jedis jedis= JedisUtils.getJedis();

                    try {
                        Thread.sleep(1000);
                        //獲得隊列當前的大小
                        List<String> phonePids = jedis.lrange("phonePids", 0, -1);
                        System.out.println("當前redis中有"+phonePids.size()+"個pid");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    finally {
                        jedis.close();
                    }
            }
        }}).start();

        page();
    }
    public static void page() throws IOException, InterruptedException {
        //分頁查詢手機資料 共100頁
        for (int i = 1; i <=100 ; i++) {
            //京東分頁page為 1 3 5 7 .....
            //         對應第一頁 第二頁....
            String url="https://search.jd.com/Search?keyword=%E6%89%8B%E6%9C%BA&enc=utf-8&page="+(2*i-1);
            String html = HttpClientUtils.doGet(url);
            parseIndex(html);
        }
    }
    //解析手機列表頁
    private  static void parseIndex(String html) throws IOException, InterruptedException {
        Document document = Jsoup.parse(html);
        //手機列表
        Elements elements = document.select("#J_goodsList>ul>li");

        if(elements!=null||elements.size()!=0){
            for (Element element : elements) {
                //獲得每個li的pid
                String pid = element.attr("data-pid");
                //將pid放入佇列中
                Jedis jedis= JedisUtils.getJedis();
                jedis.lpush("phonePids",pid);
                jedis.close();
            }
        }
    }

}

從伺服器解析pid

package com.hrh.slave;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.hrh.dao.ProductDao;
import com.hrh.pojo.Product;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import redis.clients.jedis.Jedis;
import utils.HttpClientUtils;
import utils.JedisUtils;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author QuietHR
 * @create 2018/9/22
 **/
public class JDSlave {

    //建立dao物件
    static ProductDao productDao = new ProductDao();
    //建立執行緒池
    static ExecutorService threadPool = Executors.newFixedThreadPool(35);

    public static void main(String[] args) throws Exception {

        //開啟30個執行緒去解析手機列表頁獲得的pids
        for (int i = 1; i <=30; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run()