51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

查看: 953|回复: 0
打印 上一主题 下一主题

[python] apscheduler如何在Python中实现定时?

[复制链接]
  • TA的每日心情
    无聊
    3 天前
  • 签到天数: 941 天

    连续签到: 3 天

    [LV.10]测试总司令

    跳转到指定楼层
    1#
    发表于 2022-10-11 13:08:33 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
    前言
      之前有介绍了用Linux crontab的方式来实现定时任务,这是使用Linux内置模块来实现的。而在Python中,还可以用第三方包来管理定时任务,比如celery、apscheduler。相对来说apscheduler使用起来更简单一些,这里来介绍一下apscheduler的使用方法。
      首先安装起来很简单,运行pip install apscheduler即可。
      初识apscheduler
      来个简单的例子看看apscheduler是如何使用的。
    1. <font size="3"> #encoding:utf-8

    2.   from apscheduler.schedulers.blocking import BlockingScheduler

    3.   import datetime

    4.   def sch_test():

    5.       now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    6.       print('时间:{}, 测试apscheduler'.format(now))

    7.   task = BlockingScheduler()

    8.   task.add_job(func=sch_test, trigger='cron', second='*/10')

    9.   task.start()</font>
    复制代码
    上述例子很简单,我们首先要定义一个apscheduler的对象,然后add_job添加任务,最后start开启任务就行了。
      例子是每隔10秒运行一次sch_test任务,运行结果如下:

    1. <font size="3">时间:2022-10-08 15:16:30, 测试apscheduler

    2.   时间:2022-10-08 15:16:40, 测试apscheduler

    3.   时间:2022-10-08 15:16:50, 测试apscheduler

    4.   时间:2022-10-08 15:17:00, 测试apscheduler</font>
    复制代码
    如果我们要在执行任务函数时携带参数,只要在add_job函数中添加args就行,比如task.add_job(func=sch_test, args=('a'), trigger='cron', second='*/10')。
      apscheduler有哪些模块
      上面例子中我们初步了解到如何使用apschedulerl了,接下来需要知道apscheduler的设计框架。apscheduler有四个主要模块,分别是:触发器triggers、任务存储器job_stores、执行器executors、调度器schedulers。
      1. 触发器triggers:
      触发器指的是任务指定的触发方式,例子中我们用的是“cron”方式。我们可以选择cron、date、interval中的一个。
      ·cron表示的是定时任务,类似linux crontab,在指定的时间触发。
      使用方法示例,在每天7点20分执行一次:
      task.add_job(func=sch_test, args=('定时任务',), trigger='cron',
      hour='7', minute='20')
      ·date表示具体到某个时间的一次性任务;

    1. <font size="3">使用方法示例:

    2.   # 使用run_date指定运行时间

    3.   task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))

    4.   # 或者用next_run_time

    5.   task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))</font>
    复制代码
     ·interval表示的是循环任务,指定一个间隔时间,每过间隔时间执行一次。
      使用方法示例,每隔3秒执行一次sch_test任务:
      task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。
      来个例子把3种触发器都使用一遍:

    1. <font size="3"># encoding:utf-8

    2.   from apscheduler.schedulers.blocking import BlockingScheduler

    3.   import datetime

    4.   def sch_test(job_type):

    5.       now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    6.       print('时间:{}, {}测试apscheduler'.format(now, job_type))

    7.   task = BlockingScheduler()

    8.   task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))

    9.   task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')

    10.   task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)

    11.   task.start()</font>
    复制代码
    打印部分结果:
    1. <font size="3">时间:2022-10-08 15:45:49, 一次性任务测试apscheduler

    2.   时间:2022-10-08 15:45:49, 循环任务测试apscheduler

    3.   时间:2022-10-08 15:45:50, 定时任务测试apscheduler

    4.   时间:2022-10-08 15:45:52, 循环任务测试apscheduler

    5.   时间:2022-10-08 15:45:55, 定时任务测试apscheduler

    6.   时间:2022-10-08 15:45:55, 循环任务测试apscheduler

    7.   时间:2022-10-08 15:45:58, 循环任务测试apscheduler</font>
    复制代码
    通过代码示例和结果展示,我们可清晰的知道不同触发器的使用区别。
      2. 任务存储器job_stores
      顾名思义,任务存储器是存储任务的地方,默认都是存储在内存中。我们也可自定义存储方式,比如将任务存到mysql中。这里有以下几种选择:


    通常默认存储在内存即可,但若程序故障重启的话,会重新拉取任务运行了,如果你对任务的执行要求高,那么可以选择其他的存储器。
      使用SQLAlchemyJobStore存储器示例:

    1. <font size="3"> from apscheduler.schedulers.blocking import BlockingScheduler

    2.   def sch_test(job_type):

    3.       now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    4.       print('时间:{}, {}测试apscheduler'.format(now, job_type))

    5.   sched = BlockingScheduler()

    6.   # 使用mysql存储任务

    7.   sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'

    8.   sched.add_jobstore('sqlalchemy',url=sql_url)

    9.   # 添加任务

    10.   sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')

    11.   sched.start()</font>
    复制代码
    3. 执行器executors
      执行器的功能就是将任务放到线程池或进程池中运行。有以下几种选择:


    默认是ThreadPoolExecutor, 常用的也就是第线程和进程池执行器。如果应用是CPU密集型操作,可用ProcessPoolExecutor来执行。
      4. 调度器schedulers
      调度器属于apscheduler的核心,它扮演着统筹整个apscheduler系统的角色,存储器、执行器、触发器在它的调度下正常运行。调度器有以下几个:


    不是特定场景下,我们最常用的是BlockingScheduler调度器。
      异常监听
      定时任务在运行时,若出现错误,需要设置监听机制,我们通常结合logging模块记录错误信息。
      使用示例:

    1. <font size="3">from apscheduler.schedulers.blocking import BlockingScheduler

    2.   import datetime

    3.   from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR

    4.   import logging

    5.   # logging日志配置打印格式及保存位置

    6.   logging.basicConfig(level=logging.INFO,

    7.                       format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',

    8.                       datefmt='%Y-%m-%d %H:%M:%S',

    9.                       filename='sche.log',

    10.                       filemode='a')

    11.   def log_listen(event):

    12.   if event.exception :

    13.   print ( '任务出错,报错信息:{}'.format(event.exception))

    14.   else:

    15.   print ( '任务正常运行...' )

    16.   def sch_test(job_type):

    17.       now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    18.       print('时间:{}, {}测试apscheduler'.format(now, job_type))

    19.       print(1/0)

    20.   sched = BlockingScheduler()

    21.   # 使用mysql存储任务

    22.   sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'

    23.   sched.add_jobstore('sqlalchemy',url=sql_url)

    24.   # 添加任务

    25.   sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')

    26.   # 配置任务执行完成及错误时的监听

    27.   sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

    28.   # 配置日志监听

    29.   sched._logger = logging

    30.   sched.start()</font>
    复制代码
    apscheduler的封装使用
      上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。

    1. <font size="3">from apscheduler.schedulers.blocking import BlockingScheduler

    2.   from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

    3.   from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR

    4.   import logging

    5.   import logging.handlers

    6.   import os

    7.   import datetime

    8.   class LoggerUtils():

    9.       def init_logger(self, logger_name):

    10.           # 日志格式

    11.           formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')

    12.           log_obj = logging.getLogger(logger_name)

    13.           log_obj.setLevel(logging.INFO)

    14.           # 设置log存储位置

    15.           path = '/data/logs/'

    16.           filename = '{}{}.log'.format(path, logger_name)

    17.           if not os.path.exists(path):

    18.               os.makedirs(path)

    19.           # 设置日志按照时间分割

    20.           timeHandler = logging.handlers.TimedRotatingFileHandler(

    21.              filename,

    22.              when='D',  # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周

    23.              interval=1, # 多少天切割一次

    24.              backupCount=10  # 保留几天

    25.           )

    26.           timeHandler.setLevel(logging.INFO)

    27.           timeHandler.setFormatter(formatter)

    28.           log_obj.addHandler(timeHandler)

    29.           return log_obj

    30.   class Scheduler(LoggerUtils):

    31.       def __init__(self):

    32.           # 执行器设置

    33.           executors = {

    34.               'default': ThreadPoolExecutor(10),  # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10

    35.               'processpool': ProcessPoolExecutor(5)  # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5

    36.           }

    37.           self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)

    38.           # 存储器设置

    39.           # 这里使用sqlalchemy存储器,将任务存储在mysql

    40.           sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'

    41.           self.scheduler.add_jobstore('sqlalchemy',url=sql_url)

    42.           def log_listen(event):

    43.               if event.exception:

    44.                   # 日志记录

    45.                   self.scheduler._logger.error(event.traceback)

    46.      

    47.           # 配置任务执行完成及错误时的监听

    48.           self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

    49.           # 配置日志监听

    50.           self.scheduler._logger = self.init_logger('sche_test')

    51.       def add_job(self, *args, **kwargs):

    52.           """添加任务"""

    53.           self.scheduler.add_job(*args, **kwargs)

    54.       def start(self):

    55.           """开启任务"""

    56.           self.scheduler.start()

    57.   # 测试任务

    58.   def sch_test(job_type):

    59.       now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    60.       print('时间:{}, {}测试apscheduler'.format(now, job_type))

    61.       print(1/0)

    62.   # 添加任务,开启任务

    63.   sched = Scheduler()

    64.   # 添加任务

    65.   sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')

    66.   # 开启任务

    67.   sched.start()</font>
    复制代码
    小结
      这篇文章介绍了Python实现定时任务的又一利器apscheduler,通过简单例子及apscheduler框架的主要模块分解,我们可以根据实际需求配置好模块信息,再结合logging模块,我们可以实时监控到定时任务的运行情况。话说大家是如何来实现定时任务的,欢迎评论区讨论。


    本帖子中包含更多资源

    您需要 登录 才可以下载或查看,没有帐号?(注-册)加入51Testing

    x
    分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
    收藏收藏
    回复

    使用道具 举报

    本版积分规则

    关闭

    站长推荐上一条 /1 下一条

    小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

    GMT+8, 2024-5-3 02:55 , Processed in 0.067159 second(s), 23 queries .

    Powered by Discuz! X3.2

    © 2001-2024 Comsenz Inc.

    快速回复 返回顶部 返回列表