首先我们需要安装 RabbitMQ,然后通过服务启动它,默认为注册到本机的5672端口。我们的爬虫和数据库写入脚本都需要连接到 RabbitMQ,一边往队列中写入数据,另一边从队列中取出数据,然后插入到数据。 Python 中使用 RabbitMQ 可以通过调用 pika 这个库,安装过程见官方文档,对于 RabbitMQ 本身也有中文教程。 本项目用到的模型是一对一的,用 pika 写很容易,代码如下: - import pika
- # 导入库
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- # 设置一个新连接,连接到本地的 RabbitMQ 服务端。
- channel = connection.channel()
- # 注册到 books 队列
- channel.queue_declare(queue='books')
- channel.basic_publish(exchange='', routing_key='books', body='Whats up')
- # 发送消息 body
- connection.close()
- #
复制代码在 basic_publish 这个函数中,我们设置 exchange 为空,而 routing-key 为 books,此时 basic_publish 会默认把我们的 body 信息根据 routing-key 的内容发送到 books 的队列中。 这里 exchange 其实是一个信息中转站,如下图,P 为我们要发送的信息,X 就是信息中转站,我们通过 exchange 字段来设置我们的目标中转站,然后由 exchange 来决定我们的信息要往哪里走。
而 routing-key 的设置也很有讲究,可以参考教程中 Routing 一节。 到此,我们已经写好生产者了,接下来我们看消费者。 - import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='books')
- def callback(ch, method, properties, body):
- print body
- channel.basic_consume(callback, queue='books', no_ack=True)
- # 注册回调函数,当有消息取出时,程序调用 callback 函数,其中 body 就是取出的消息。
- channel.start_consuming()
复制代码
这里我们在回调函数中可以直接打印 body 到控制台。最后贴一下代码,爬虫端: - #!/usr/bin/env python
- # -*- encoding: utf-8 -*-
- from pyspider.libs.base_handler import *
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='books')
- class Handler(BaseHandler):
- crawl_config = {}
- def on_start(self):
- self.crawl('http://scrapy.org/', callback=self.index_page)
-
- def on_result(self, result):
- # 重写 on_result 函数
- if not result:
- return
- assert self.task, "on_result can't outside a callback."
- result['callback'] = self.task['process']['callback']
- if self.is_debugger():
- pprint(result)
- channel.basic_publish(exchange='', routing_key='books', body=result)
- if self.__env__.get('result_queue'):
- channel.basic_publish(exchange='', routing_key='books', body=result)
- self.__env__['result_queue'].put((self.task, result))
-
- @config(priority=5)
- def index_page(self, response):
- url_list = []
- for each in response.doc('a[href^="http"]').items():
- url_list.append(each.attr.href)
- return url_list
复制代码消费者端: - import os
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'spider.settings')
- import django
- django.setup()
- from django.core.exceptions import ObjectDoesNotExist
- from importer.models import Books
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='books')
- def callback(ch, method, properties, body):
- new_book = Books()
- new_book.url = body
- new_book.save()
- print body + ' saved!'
- channel.basic_consume(callback, queue='books', no_ack=True)
- channel.start_consuming()
复制代码本例中调用了 django 的 BOOKS 的数据库模型,在队列中取出消息后,存入 BOOKS 表中,字段为url。
|