爬虫 Scrapy 学习系列之九:Item Pipeline

前言

这是 Scrapy 系列学习文章之一,本章主要介绍 Item Pipeline 的相关的内容;

本文为作者的原创作品,转载需注明出处;

简介

当数据被 Scrapy 爬虫爬取并转换为 Item 以后,将会被送给 Item Pipeline 进一步处理,Item Pipeline 包含多个组件,这些组件将会一个接一个按顺序执行;

每一个 Item Pipeline 组件都是实现了一个简单方法的 Python class;这些方法接受 Item 作为参数,然后对它进行处理;处理的结果决定该 Item 是否进入下一个 Item Pipeline 继续处理还是就此丢弃;

有关 Item Pipeline 的典型用法,

  • 清洗 HTML 数据
  • 对爬取的数据进行验证,比如检查某些 items 是否包含特定的 fields 等
  • 检查是否有重复的 items,如果有,可以选择丢弃
  • 保存爬取的数据到数据库中

撰写你自己的 item pipeline

每一个 item pipeline 组件都是一个 Python class,下面来看看 item pipeline 所包含的方法,备注,下面的方法只有 process_item(self, item, spider) 是必须实现的,其余的都是可选的;

process_item(self, item, spider)

1
process_item(self, item, spider)

该方法被每一个 item pipeline 组件所调用,process_item 必须返回以下其中的任意一个对象

  • 一个 dict
  • 一个 Item 对象或者它的子类对象
  • 一个 Twisted Deferred 对象
  • 一个 DropItem exception;如果返回此异常,则该 item 将不会被后续的 item pipeline 所继续访问;

参数

  • item( or a dict ) - 被爬取的 item
  • spider - 用来爬取该 item 的 spider

备注,该方法是每一个 Item pipeline 必须实现的方法,其余的方法都是可选的;

open_spider(self, spider)

1
open_spider(self, spider)

该方法当 spider 被打开时调用

close_spider(self, spider)

该方法是当 spider 关闭的时候被调用

from_crawler(cls, crawler)

该类方法用来从 Crawler 中初始化得到一个 pipeline 实例;它必须返回一个新的 pipeline 实例;Crawler 对象提供了访问所有 Scrape 核心组件的接口,包括 settings 和 signals;

Item pipeline 例子

Price validation and dropping items with no prices

1
2
3
4
5
6
7
8
9
10
11
12
13
from scrapy.exceptions import DropItem

class PricePipeline(object):

vat_factor = 1.15

def process_item(self, item, spider):
if item['price']:
if item['price_excludes_vat']:
item['price'] = item['price'] * self.vat_factor
return item
else:
raise DropItem("Missing price in %s" % item)

Write items to a JSON file

下面这个例子将会把所有爬虫所爬取到的数据保存到 items.jl 文件中,.jl既是表示 JSON Lines 格式,既是每一行存储一个 item;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import json

class JsonWriterPipeline(object):

def open_spider(self, spider):
self.file = open('items.jl', 'w')

def close_spider(self, spider):
self.file.close()

def process_item(self, item, spider):
line = json.dumps(dict(item)) + "\n"
self.file.write(line)
return item

Write items to MongoDB

MongoDB address 以及 database name 是通过 Scrapy settings 配置的;下面这个用例主要用来展示如何使用 from_crawler 的用法以及如何正确的清理掉这些相关的 resources,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pymongo

class MongoPipeline(object):

collection_name = 'scrapy_items'

def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db

@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)

def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]

def close_spider(self, spider):
self.client.close()

def process_item(self, item, spider):
self.db[self.collection_name].insert_one(dict(item))
return item

该例子中,通过类方法 from_crawler() 在内部初始化得到了一个 pipeline 实例,初始化的过程中,使用了 mongo_uri 以及 mongo_db 作为构造参数;

Take screenshot of item

下面这个例子演示了如何通过 process_item() 方法返回一个 Defferred;它使用 Splash 来来渲染一个与 item url 相关的 screenshot;Pipeline 通过对本地的 Splash 发起请求,当请求已经完成下载并且 Defferred 回调已经触发,它将 item 保存到一个文件中,并且将文件名保存在 item 中;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import scrapy
import hashlib
from urllib.parse import quote


class ScreenshotPipeline(object):
"""Pipeline that uses Splash to render screenshot of
every Scrapy item."""

SPLASH_URL = "http://localhost:8050/render.png?url={}"

def process_item(self, item, spider):
encoded_item_url = quote(item["url"])
screenshot_url = self.SPLASH_URL.format(encoded_item_url)
request = scrapy.Request(screenshot_url)
dfd = spider.crawler.engine.download(request, spider)
dfd.addBoth(self.return_item, item)
return dfd

def return_item(self, response, item):
if response.status != 200:
# Error happened, return item.
return item

# Save screenshot to file, filename will be hash of url.
url = item["url"]
url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
filename = "{}.png".format(url_hash)
with open(filename, "wb") as f:
f.write(response.body)

# Store filename in item.
item["screenshot_filename"] = filename
return item

简单说一下上面这段程序的执行逻辑,首先调用 process_item,先是构造访问本地 Splash 的请求,然后发起请求,注意 dfd 得到的就是 Deferred 实例.. 然后,当 Deferred 相关事件完成以后,将会触发调用 return_item 方法;

Duplicates filter

用来检查 duplidate items 的过滤器,然后丢弃这些已经被处理过的 items;下面这个例子假定我们所使用的 items 有一个唯一的 id,而通过我们的爬虫返回的有多个重复的 items 有重复的 id;

1
2
3
4
5
6
7
8
9
10
11
12
13
from scrapy.exceptions import DropItem

class DuplicatesPipeline(object):

def __init__(self):
self.ids_seen = set()

def process_item(self, item, spider):
if item['id'] in self.ids_seen:
raise DropItem("Duplicate item found: %s" % item)
else:
self.ids_seen.add(item['id'])
return item

Activating an Item Pipeline component

要激活一个 Item Pipeline 你必须将它的 class 加入 ITEM_PIPLELINES 的设置当中,向下面这样

1
2
3
4
ITEM_PIPELINES = {
'myproject.pipelines.PricePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
}

后面对应的数字表示的是执行优先级,数值越小越先执行;不过习惯性的把它们定义为 1-1000 之间的数值;

扩展例子

如何为不同的 Spider 定义不同的 Item Pipeline

从 main settings 中将通用的配置删除,然后为每一个 spider 单独添加相关配置;

1
2
3
4
5
6
7
class testSpider(InitSpider):
name = 'test'
custom_settings = {
'ITEM_PIPELINES': {
'app.MyPipeline': 400
}
}

https://stackoverflow.com/questions/8372703/how-can-i-use-different-pipelines-for-different-spiders-in-a-single-scrapy-proje

一个完整的爬取 stackflow 的例子,使用 pipeline 存储

https://realpython.com/blog/python/web-scraping-with-scrapy-and-mongodb/

我的疑问

我们知道,通过 Item Loaders 所定义的 input processors 和 output processors 都是针对每一个不同的 item 定义一个唯一的 processors,这样可以具体针对某一个 item 具体处理;但是,这里让我没想明白的是,item pipeline 为什么被设计用来对所有的 items 都要进行处理?item pipelines 其实不更聚焦于某一个 item 的处理吗?针对某一个具体的 item 进行清洗,然后针对一个具体的 item 进行保存才对呀,那在设计的初衷上,不是更应该和 processors 一样吗?怎么 item pipeline 的默认设计是用来针对所有的 items 进行处理的呢?至少我觉得,是否应该在 ITEM_PIPELINES 的配置上,添加 pipeline 与 spiders 之间的对应关系才好。