前言
这是 Scrapy 系列学习文章之一,本章主要介绍 Item Pipeline 的相关的内容;
本文为作者的原创作品,转载需注明出处;
简介
当数据被 Scrapy 爬虫爬取并转换为 Item 以后,将会被送给 Item Pipeline 进一步处理,Item Pipeline 包含多个组件,这些组件将会一个接一个按顺序执行;
每一个 Item Pipeline 组件都是实现了一个简单方法的 Python class;这些方法接受 Item 作为参数,然后对它进行处理;处理的结果决定该 Item 是否进入下一个 Item Pipeline 继续处理还是就此丢弃;
有关 Item Pipeline 的典型用法,
- 清洗 HTML 数据
- 对爬取的数据进行验证,比如检查某些 items 是否包含特定的 fields 等
- 检查是否有重复的 items,如果有,可以选择丢弃
- 保存爬取的数据到数据库中
撰写你自己的 item pipeline
每一个 item pipeline 组件都是一个 Python class,下面来看看 item pipeline 所包含的方法,备注,下面的方法只有 process_item(self, item, spider) 是必须
实现的,其余的都是可选的;
process_item(self, item, spider)
1 | process_item(self, item, spider) |
该方法被每一个 item pipeline 组件所调用,process_item 必须返回以下其中的任意一个对象
- 一个 dict
- 一个 Item 对象或者它的子类对象
- 一个 Twisted Deferred 对象
- 一个 DropItem exception;如果返回此异常,则该 item 将不会被后续的 item pipeline 所继续访问;
参数
- item( or a dict ) - 被爬取的 item
- spider - 用来爬取该 item 的 spider
备注,该方法是每一个 Item pipeline 必须
实现的方法,其余的方法都是可选
的;
open_spider(self, spider)
1 | open_spider(self, spider) |
该方法当 spider 被打开时调用
close_spider(self, spider)
该方法是当 spider 关闭的时候被调用
from_crawler(cls, crawler)
该类方法用来从 Crawler 中初始化得到一个 pipeline 实例;它必须返回一个新的 pipeline 实例;Crawler 对象提供了访问所有 Scrape 核心组件的接口,包括 settings 和 signals;
Item pipeline 例子
Price validation and dropping items with no prices
1 | from scrapy.exceptions import DropItem |
Write items to a JSON file
下面这个例子将会把所有爬虫所爬取到的数据保存到 items.jl 文件中,.jl
既是表示 JSON Lines 格式,既是每一行存储一个 item;
1 | import json |
Write items to MongoDB
MongoDB address 以及 database name 是通过 Scrapy settings 配置的;下面这个用例主要用来展示如何使用 from_crawler 的用法以及如何正确的清理掉这些相关的 resources,
1 | import pymongo |
该例子中,通过类方法 from_crawler() 在内部初始化得到了一个 pipeline 实例,初始化的过程中,使用了 mongo_uri 以及 mongo_db 作为构造参数;
Take screenshot of item
下面这个例子演示了如何通过 process_item() 方法返回一个 Defferred;它使用 Splash 来来渲染一个与 item url 相关的 screenshot;Pipeline 通过对本地的 Splash 发起请求,当请求已经完成下载并且 Defferred 回调已经触发,它将 item 保存到一个文件中,并且将文件名保存在 item 中;
1 | import scrapy |
简单说一下上面这段程序的执行逻辑,首先调用 process_item,先是构造访问本地 Splash 的请求,然后发起请求,注意 dfd 得到的就是 Deferred 实例.. 然后,当 Deferred 相关事件完成以后,将会触发调用 return_item 方法;
Duplicates filter
用来检查 duplidate items 的过滤器,然后丢弃这些已经被处理过的 items;下面这个例子假定我们所使用的 items 有一个唯一的 id,而通过我们的爬虫返回的有多个重复的 items 有重复的 id;
1 | from scrapy.exceptions import DropItem |
Activating an Item Pipeline component
要激活一个 Item Pipeline 你必须将它的 class 加入 ITEM_PIPLELINES 的设置当中,向下面这样
1 | ITEM_PIPELINES = { |
后面对应的数字表示的是执行优先级,数值越小越先执行;不过习惯性的把它们定义为 1-1000 之间的数值;
扩展例子
如何为不同的 Spider 定义不同的 Item Pipeline
从 main settings 中将通用的配置删除,然后为每一个 spider 单独添加相关配置;
1 | class testSpider(InitSpider): |
一个完整的爬取 stackflow 的例子,使用 pipeline 存储
https://realpython.com/blog/python/web-scraping-with-scrapy-and-mongodb/
我的疑问
我们知道,通过 Item Loaders 所定义的 input processors 和 output processors 都是针对每一个不同的 item 定义一个唯一的 processors,这样可以具体针对某一个 item 具体处理;但是,这里让我没想明白的是,item pipeline 为什么被设计用来对所有的 items 都要进行处理?item pipelines 其实不更聚焦于某一个 item 的处理吗?针对某一个具体的 item 进行清洗,然后针对一个具体的 item 进行保存才对呀,那在设计的初衷上,不是更应该和 processors 一样吗?怎么 item pipeline 的默认设计是用来针对所有的 items 进行处理的呢?至少我觉得,是否应该在 ITEM_PIPELINES 的配置上,添加 pipeline 与 spiders 之间的对应关系才好。