51Testing软件测试论坛
标题:
pyspider源码简析
[打印本页]
作者:
测试积点老人
时间:
2018-11-29 13:57
标题:
pyspider源码简析
pyspider优势所在
pyspider非常适合那种很小很杂的爬虫的管理,比如有100个小网站,规则又各不相同,我要获取他的一些很简单的内容,如标题,所有的图片,正文内容。他分为几个模块:scheduler,fetcher,processor,resultworker以及一个ui,前三者各自分离,用消息队列连接,因此很容易做成分布式(或者说设计之初就是为了分布式的)。
scheduler
了解scheduler之前,先了解两个概念,一个是project,代表着一个项目,如百度爬虫项目;一个是task,代表一个爬取任务,如爬取百度首页,爬取某一个新闻业,都是一个task。
与scheduler相关的队列有三个:
scheduler2fetcher 也就是scheduler中的out queue,用于发送task给fetcher
status_queue 用于从processor中获取已经爬取的task的状态并做相应处理
newtask_queue 新产生的task
scheduler负责调度,与scrapy或者其他的爬虫框架类似,调度器负责调度需要爬取的内容,决定哪些内容在哪些时候进行爬取。我们从代码入手看下pyspider的调度器做了啥。
def run(self):
while not self._quit:
self.run_once()
def run_once(self):
self._update_projects()
self._check_task_done()
self._check_request()
while self._check_cronjob():
pass
self._check_select()
self._check_delete()
self._try_dump_cnt()
复制代码
入口为
run
函数,真正有用的是
run_once
函数。我们可以看到,每一轮调度都会依次调用几个方法。
_update_projects
该方法会从projectdb中读取是有有新的project更新,如果更新了就得处理这个project
复制代码
_check_task_done ?
该方法会消费status queue,爬取失败的task,检查下要不要重新爬,标记一下,存起来。爬取成功的task,看下是否要再爬一次,标记一下,存起来。
复制代码
_check_request
消费newtask_queue,该队列为待爬取的队列,任务取出来,处理处理,标记一下,存起来。
复制代码
_check_cronjob
看下有没有什么定时任务触发了,有的话,丢到out queue(scheduler2fetcher)给fetcher爬去。
复制代码
_check_select
之前不是标记并存了好多要爬取的任务咩,取出来,丢给out queue给fetcher爬去。
复制代码
_check_delete
处理一些被标记为删除的project
复制代码
scheduler逻辑相当清晰,分工也很明确:找到需要爬取的任务给fetcher。
fetcher
fetcher的职责更为清晰:下载。
与他相关的有两个队列:
scheduler2fetcher 也是fetcher中的inqueue,调度器传给fetcher的任务
fetcher2processor 也是fetcher中的outqueue,fetcher传给processor的任务
fetcher的入口也是run方法,会从inqueue中读取任务去爬取。整个fetcher是基于tornado实现的(说真,tornado在py3 async的时代看起来显得好丑..)并提供了几种爬取的方式。这部分代码很简单,不细说了,就是下载下来,爬取结束之后发送到outqueue中。
processor
涉及到四个队列
fetcher2processor 也是inqueue,为fetcher的输入
status_queue 把fetcher爬到的内容输出给scheduler
newtask_queue 新任务队列,一个task可能会产生多个新的task,传递给scheduler
processor2result 也是result_queue,输出获取到的需要的数据,为最终的输出
程序的入口同样为run,核心方法只有一个,就是on_task,处理唯一的输入inqueue中获取到的task,主要做了这么几件事
处理下task,该找外链的找外链,该获取格式化数据的获取数据,并发送到result_queue中。(这部分在ProjectManager这个类的on_result方法中完成)
把task的内容做一些处理,形成一个新的dict,包含爬取状态,时间等信息,发到status_queue
处理找到的外链(如果有需要的话,即在回调中有调用self.crawl)包装一下,发送给newtask_queue
result worker
result worker只涉及到一个队列,就是processor中输出的result queue。
这部分我觉得是pyspider比较弱的一部分,类似于scrapy中的Pipeline,对输出的数据进行一些处理,如保存数据库等。需要继承实现一个ResultWorker类。默认的这个类会把数据保存到resultdb中,但我们实际需要的肯定不止如此,可以重写on_result方法做一些处理。
不过因为所有的输出都在一个队列,所以result worker也只能有一类(并不是一个,可以做分布式处理),处理一个类似的逻辑,比如统统都保存到mongo。或者在一个result worker中写判断语句,进行不同的逻辑处理。但这样就不够优雅了。
总结
pyspider应该算是一个相当不错的框架,代码很清晰,很适合去读。不过适合的场景还是比较有限,着重于调度,分布式爬取,弱化了对数据的处理部分(当然,这部分也可以很方便的扩展)。
欢迎光临 51Testing软件测试论坛 (http://bbs.51testing.com/)
Powered by Discuz! X3.2