1. 程式人生 > >Scrapy框架中的Pipeline組件

Scrapy框架中的Pipeline組件

object OS @class ror inter setting ima utf8 encoding

簡介

在下圖中可以看到items.py與pipeline.py,其中items是用來定義抓取內容的實體;pipeline則是用來處理抓取的item的管道
技術分享圖片
Item管道的主要責任是負責處理有蜘蛛從網頁中抽取的Item,他的主要任務是清晰、驗證和存儲數據。當頁面被蜘蛛解析後,將被發送到Item管道,並經過幾個特定的次序處理數據。每個Item管道的組件都是有一個簡單的方法組成的Python類。獲取了Item並執行方法,同時還需要確定是否需要在Item管道中繼續執行下一步或是直接丟棄掉不處理。簡而言之,就是通過spider爬取的數據都會通過這個pipeline處理,可以在pipeline中不進行操作或者執行相關對數據的操作。

管道的功能

1.清理HTML數據
2.驗證解析到的數據(檢查Item是否包含必要的字段)
3.檢查是否是重復數據(如果重復就刪除)
4.將解析到的數據存儲到數據庫中

Pipeline中的操作

process_item(item, spider)
每一個item管道組件都會調用該方法,並且必須返回一個item對象實例或raise DropItem異常。被丟掉的item將不會在管道組件進行執行。此方法有兩個參數,一個是item,即要處理的Item對象,另一個參數是spider,即爬蟲。
此外,我們也可以在類中實現以下方法
open_spider(spider)當spider執行的時候將調用該方法

close_spider(spider)當spider關閉的時候將調用該方法

定制自己的Pipeline組件:

1.生成json數據

class JsonWithEncodingPipeline(object):
    def __init__(self):
        self.file=codecs.open('article.json', 'w', encoding="utf-8")
    def process_item(self, item, spider):
        lines=json.dumps(dict(item), ensure_ascii=
False) + '\n' self.file.write(lines) return item def spider_closed(self, spider): self.file.close()

2.操作mysql關系數據庫

class MysqlPipeline(object):
    def __init__(self):
        self.conn=MySQLdb.connect('localhost', 'root', '*****', 'article_spider', charset="utf8", use_unicode=True)
        self.cursor=self.conn.cursor()

    def process_item(self, item, spider):
        insert_sql="""
            insert into article_items(title, url, url_object_id , create_date)
            VALUES(%s, %s, %s, %s)
        """
        self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))
        self.conn.commit()

3.異步操作mysql關系數據庫

# 異步處理關系數據庫
class MysqlTwistedPipline(object):
    def __init__(self, dbpool):
        self.dbpool=dbpool

    @classmethod
    def from_settings(cls, settings):
        dbparms=dict(
            host=settings["MYSQL_HOST"],    #這裏要在settings中事先定義好
            db=settings["MYSQL_DBNAME"],
            user=settings["MYSQL_USER"],
            passwd=settings["MYSQL_PASSWORD"],
            charset="utf8",
            cursorclass=MySQLdb.cursors.DictCursor,
            use_unicode=True,
        )
        dbpool=adbapi.ConnectPool("MySQLdb", **dbparms)

        return cls(dbpool)

    def process_item(self, item, spider):
    # 使用twisted將mysql插入變成異步執行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error)


    def handle_error(self, failure, item, spider):
       #處理異步插入的異常
        print(failure)

    def do_insert(self, cursor, item):
        #執行具體的插入
        insert_sql = """
                    insert into article_items(title, url, url_object_id , create_date)
                    VALUES(%s, %s, %s, %s)
                """
        self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))

4.數據去重

from scrapy.exceptions import DropItem

class DuplicatesPipeline(object):

    def __init__(self):
        self.ids_seen = set()

    def process_item(self, item, spider):
        if item['id'] in self.ids_seen:
            raise DropItem("Duplicate item found: %s" % item)
        else:
            self.ids_seen.add(item['id'])
            return item

使用組件

# Configure item pipelines
# See https://doc.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
   # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
  # 'scrapy.pipelines.images.ImagesPipeline': 1,
   'ArticleSpider.pipelines.MysqlPipeline': 1,
   # 'ArticleSpider.pipelines.JsonExporterPipeline': 2,
   # 'ArticleSpider.pipelines.ArticleImagePipeline': 1
}

每個pipeline後面有一個數值,這個數組的範圍是0-1000,這個數值是這些在pipeline中定義的類的優先級,越小越優先。

Scrapy框架中的Pipeline組件