51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

手机号码,快捷登录

查看: 1446|回复: 0
打印 上一主题 下一主题

pyspider源码简析

[复制链接]
  • TA的每日心情
    无聊
    半小时前
  • 签到天数: 528 天

    连续签到: 1 天

    [LV.9]测试副司令

    跳转到指定楼层
    1#
    发表于 2018-11-29 13:57:56 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
    pyspider优势所在
    pyspider非常适合那种很小很杂的爬虫的管理,比如有100个小网站,规则又各不相同,我要获取他的一些很简单的内容,如标题,所有的图片,正文内容。他分为几个模块:scheduler,fetcher,processor,resultworker以及一个ui,前三者各自分离,用消息队列连接,因此很容易做成分布式(或者说设计之初就是为了分布式的)。

    scheduler
    了解scheduler之前,先了解两个概念,一个是project,代表着一个项目,如百度爬虫项目;一个是task,代表一个爬取任务,如爬取百度首页,爬取某一个新闻业,都是一个task。

    与scheduler相关的队列有三个:

    • scheduler2fetcher 也就是scheduler中的out queue,用于发送task给fetcher
    • status_queue 用于从processor中获取已经爬取的task的状态并做相应处理
    • newtask_queue 新产生的task


    scheduler负责调度,与scrapy或者其他的爬虫框架类似,调度器负责调度需要爬取的内容,决定哪些内容在哪些时候进行爬取。我们从代码入手看下pyspider的调度器做了啥。
    1. def run(self):
    2.     while not self._quit:
    3.         self.run_once()

    4. def run_once(self):
    5.     self._update_projects()
    6.     self._check_task_done()
    7.     self._check_request()
    8.     while self._check_cronjob():
    9.         pass
    10.     self._check_select()
    11.     self._check_delete()
    12.     self._try_dump_cnt()
    复制代码

    入口为run函数,真正有用的是run_once函数。我们可以看到,每一轮调度都会依次调用几个方法。

    _update_projects
    1. 该方法会从projectdb中读取是有有新的project更新,如果更新了就得处理这个project
    复制代码
    _check_task_done ?
    1. 该方法会消费status queue,爬取失败的task,检查下要不要重新爬,标记一下,存起来。爬取成功的task,看下是否要再爬一次,标记一下,存起来。
    复制代码
    _check_request
    1. 消费newtask_queue,该队列为待爬取的队列,任务取出来,处理处理,标记一下,存起来。
    复制代码
    _check_cronjob
    1. 看下有没有什么定时任务触发了,有的话,丢到out queue(scheduler2fetcher)给fetcher爬去。
    复制代码
    _check_select
    1. 之前不是标记并存了好多要爬取的任务咩,取出来,丢给out queue给fetcher爬去。
    复制代码
    _check_delete
    1. 处理一些被标记为删除的project
    复制代码
    scheduler逻辑相当清晰,分工也很明确:找到需要爬取的任务给fetcher。

    fetcher
    fetcher的职责更为清晰:下载。

    与他相关的有两个队列:
    • scheduler2fetcher 也是fetcher中的inqueue,调度器传给fetcher的任务
    • fetcher2processor 也是fetcher中的outqueue,fetcher传给processor的任务


    fetcher的入口也是run方法,会从inqueue中读取任务去爬取。整个fetcher是基于tornado实现的(说真,tornado在py3 async的时代看起来显得好丑..)并提供了几种爬取的方式。这部分代码很简单,不细说了,就是下载下来,爬取结束之后发送到outqueue中。

    processor
    涉及到四个队列
    • fetcher2processor 也是inqueue,为fetcher的输入
    • status_queue 把fetcher爬到的内容输出给scheduler
    • newtask_queue 新任务队列,一个task可能会产生多个新的task,传递给scheduler
    • processor2result 也是result_queue,输出获取到的需要的数据,为最终的输出


    程序的入口同样为run,核心方法只有一个,就是on_task,处理唯一的输入inqueue中获取到的task,主要做了这么几件事
    • 处理下task,该找外链的找外链,该获取格式化数据的获取数据,并发送到result_queue中。(这部分在ProjectManager这个类的on_result方法中完成)
    • 把task的内容做一些处理,形成一个新的dict,包含爬取状态,时间等信息,发到status_queue
    • 处理找到的外链(如果有需要的话,即在回调中有调用self.crawl)包装一下,发送给newtask_queue


    result worker
    result worker只涉及到一个队列,就是processor中输出的result queue。

    这部分我觉得是pyspider比较弱的一部分,类似于scrapy中的Pipeline,对输出的数据进行一些处理,如保存数据库等。需要继承实现一个ResultWorker类。默认的这个类会把数据保存到resultdb中,但我们实际需要的肯定不止如此,可以重写on_result方法做一些处理。

    不过因为所有的输出都在一个队列,所以result worker也只能有一类(并不是一个,可以做分布式处理),处理一个类似的逻辑,比如统统都保存到mongo。或者在一个result worker中写判断语句,进行不同的逻辑处理。但这样就不够优雅了。

    总结
    pyspider应该算是一个相当不错的框架,代码很清晰,很适合去读。不过适合的场景还是比较有限,着重于调度,分布式爬取,弱化了对数据的处理部分(当然,这部分也可以很方便的扩展)。



    分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
    收藏收藏
    回复

    使用道具 举报

    本版积分规则

    关闭

    站长推荐上一条 /1 下一条

    小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

    GMT+8, 2024-11-18 10:27 , Processed in 0.063161 second(s), 26 queries .

    Powered by Discuz! X3.2

    © 2001-2024 Comsenz Inc.

    快速回复 返回顶部 返回列表