51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

手机号码,快捷登录

查看: 1347|回复: 1
打印 上一主题 下一主题

pyspider + RabbitMQ 使用记 - 下

[复制链接]

该用户从未签到

跳转到指定楼层
1#
发表于 2019-3-7 14:37:57 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

首先我们需要安装 RabbitMQ,然后通过服务启动它,默认为注册到本机的5672端口。我们的爬虫和数据库写入脚本都需要连接到 RabbitMQ,一边往队列中写入数据,另一边从队列中取出数据,然后插入到数据。

Python 中使用 RabbitMQ 可以通过调用 pika 这个库,安装过程见官方文档,对于 RabbitMQ 本身也有中文教程。

本项目用到的模型是一对一的,用 pika 写很容易,代码如下:

  1. import pika
  2. # 导入库
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  4. # 设置一个新连接,连接到本地的 RabbitMQ 服务端。
  5. channel = connection.channel()
  6. # 注册到 books 队列
  7. channel.queue_declare(queue='books')
  8. channel.basic_publish(exchange='', routing_key='books', body='Whats up')
  9. # 发送消息 body
  10. connection.close()
  11. #
复制代码

在 basic_publish 这个函数中,我们设置 exchange 为空,而 routing-key 为 books,此时 basic_publish 会默认把我们的 body 信息根据 routing-key 的内容发送到 books 的队列中。

这里 exchange 其实是一个信息中转站,如下图,P 为我们要发送的信息,X 就是信息中转站,我们通过 exchange 字段来设置我们的目标中转站,然后由 exchange 来决定我们的信息要往哪里走。


而 routing-key 的设置也很有讲究,可以参考教程中 Routing 一节。

到此,我们已经写好生产者了,接下来我们看消费者。

  1. import pika
  2. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  3. channel = connection.channel()
  4. channel.queue_declare(queue='books')


  5. def callback(ch, method, properties, body):
  6.     print body

  7. channel.basic_consume(callback, queue='books', no_ack=True)
  8. # 注册回调函数,当有消息取出时,程序调用 callback 函数,其中 body 就是取出的消息。
  9. channel.start_consuming()
复制代码

这里我们在回调函数中可以直接打印 body 到控制台。

最后贴一下代码,爬虫端:

  1. #!/usr/bin/env python
  2. # -*- encoding: utf-8 -*-

  3. from pyspider.libs.base_handler import *
  4. import pika

  5. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  6. channel = connection.channel()
  7. channel.queue_declare(queue='books')


  8. class Handler(BaseHandler):
  9.     crawl_config = {}

  10.     def on_start(self):
  11.         self.crawl('http://scrapy.org/', callback=self.index_page)
  12.    
  13.     def on_result(self, result):
  14.         # 重写 on_result 函数
  15.         if not result:
  16.             return
  17.         assert self.task, "on_result can't outside a callback."
  18.         result['callback'] = self.task['process']['callback']
  19.         if self.is_debugger():
  20.             pprint(result)
  21.             channel.basic_publish(exchange='', routing_key='books', body=result)
  22.         if self.__env__.get('result_queue'):
  23.             channel.basic_publish(exchange='', routing_key='books', body=result)
  24.             self.__env__['result_queue'].put((self.task, result))
  25.         
  26.     @config(priority=5)
  27.     def index_page(self, response):
  28.         url_list = []
  29.         for each in response.doc('a[href^="http"]').items():
  30.             url_list.append(each.attr.href)
  31.         return url_list
复制代码

消费者端:

  1. import os
  2. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'spider.settings')
  3. import django
  4. django.setup()
  5. from django.core.exceptions import ObjectDoesNotExist
  6. from importer.models import Books

  7. import pika
  8. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  9. channel = connection.channel()
  10. channel.queue_declare(queue='books')


  11. def callback(ch, method, properties, body):
  12.     new_book = Books()
  13.     new_book.url = body
  14.     new_book.save()
  15.     print body + ' saved!'

  16. channel.basic_consume(callback, queue='books', no_ack=True)
  17. channel.start_consuming()
复制代码

本例中调用了 django 的 BOOKS 的数据库模型,在队列中取出消息后,存入 BOOKS 表中,字段为url。


本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?(注-册)加入51Testing

x
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏
回复

使用道具 举报

本版积分规则

关闭

站长推荐上一条 /1 下一条

小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

GMT+8, 2024-11-6 08:07 , Processed in 0.069391 second(s), 23 queries .

Powered by Discuz! X3.2

© 2001-2024 Comsenz Inc.

快速回复 返回顶部 返回列表