lsekfe 发表于 2022-10-11 13:08:33

apscheduler如何在Python中实现定时?

前言
  之前有介绍了用Linux crontab的方式来实现定时任务,这是使用Linux内置模块来实现的。而在Python中,还可以用第三方包来管理定时任务,比如celery、apscheduler。相对来说apscheduler使用起来更简单一些,这里来介绍一下apscheduler的使用方法。
  首先安装起来很简单,运行pip install apscheduler即可。
  初识apscheduler
  来个简单的例子看看apscheduler是如何使用的。
<font size="3"> #encoding:utf-8

  from apscheduler.schedulers.blocking import BlockingScheduler

  import datetime

  def sch_test():

      now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

      print('时间:{}, 测试apscheduler'.format(now))

  task = BlockingScheduler()

  task.add_job(func=sch_test, trigger='cron', second='*/10')

  task.start()</font>上述例子很简单,我们首先要定义一个apscheduler的对象,然后add_job添加任务,最后start开启任务就行了。
  例子是每隔10秒运行一次sch_test任务,运行结果如下:

<font size="3">时间:2022-10-08 15:16:30, 测试apscheduler

  时间:2022-10-08 15:16:40, 测试apscheduler

  时间:2022-10-08 15:16:50, 测试apscheduler

  时间:2022-10-08 15:17:00, 测试apscheduler</font>如果我们要在执行任务函数时携带参数,只要在add_job函数中添加args就行,比如task.add_job(func=sch_test, args=('a'), trigger='cron', second='*/10')。
  apscheduler有哪些模块
  上面例子中我们初步了解到如何使用apschedulerl了,接下来需要知道apscheduler的设计框架。apscheduler有四个主要模块,分别是:触发器triggers、任务存储器job_stores、执行器executors、调度器schedulers。
  1. 触发器triggers:
  触发器指的是任务指定的触发方式,例子中我们用的是“cron”方式。我们可以选择cron、date、interval中的一个。
  ·cron表示的是定时任务,类似linux crontab,在指定的时间触发。
  使用方法示例,在每天7点20分执行一次:
  task.add_job(func=sch_test, args=('定时任务',), trigger='cron',
  hour='7', minute='20')
  ·date表示具体到某个时间的一次性任务;

<font size="3">使用方法示例:

  # 使用run_date指定运行时间

  task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))

  # 或者用next_run_time

  task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))</font> ·interval表示的是循环任务,指定一个间隔时间,每过间隔时间执行一次。
  使用方法示例,每隔3秒执行一次sch_test任务:
  task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。
  来个例子把3种触发器都使用一遍:

<font size="3"># encoding:utf-8

  from apscheduler.schedulers.blocking import BlockingScheduler

  import datetime

  def sch_test(job_type):

      now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

      print('时间:{}, {}测试apscheduler'.format(now, job_type))

  task = BlockingScheduler()

  task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))

  task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')

  task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)

  task.start()</font>打印部分结果:
<font size="3">时间:2022-10-08 15:45:49, 一次性任务测试apscheduler

  时间:2022-10-08 15:45:49, 循环任务测试apscheduler

  时间:2022-10-08 15:45:50, 定时任务测试apscheduler

  时间:2022-10-08 15:45:52, 循环任务测试apscheduler

  时间:2022-10-08 15:45:55, 定时任务测试apscheduler

  时间:2022-10-08 15:45:55, 循环任务测试apscheduler

  时间:2022-10-08 15:45:58, 循环任务测试apscheduler</font>通过代码示例和结果展示,我们可清晰的知道不同触发器的使用区别。
  2. 任务存储器job_stores
  顾名思义,任务存储器是存储任务的地方,默认都是存储在内存中。我们也可自定义存储方式,比如将任务存到mysql中。这里有以下几种选择:


通常默认存储在内存即可,但若程序故障重启的话,会重新拉取任务运行了,如果你对任务的执行要求高,那么可以选择其他的存储器。
  使用SQLAlchemyJobStore存储器示例:

<font size="3"> from apscheduler.schedulers.blocking import BlockingScheduler

  def sch_test(job_type):

      now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

      print('时间:{}, {}测试apscheduler'.format(now, job_type))

  sched = BlockingScheduler()

  # 使用mysql存储任务

  sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'

  sched.add_jobstore('sqlalchemy',url=sql_url)

  # 添加任务

  sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')

  sched.start()</font>3. 执行器executors
  执行器的功能就是将任务放到线程池或进程池中运行。有以下几种选择:


默认是ThreadPoolExecutor, 常用的也就是第线程和进程池执行器。如果应用是CPU密集型操作,可用ProcessPoolExecutor来执行。
  4. 调度器schedulers
  调度器属于apscheduler的核心,它扮演着统筹整个apscheduler系统的角色,存储器、执行器、触发器在它的调度下正常运行。调度器有以下几个:


不是特定场景下,我们最常用的是BlockingScheduler调度器。
  异常监听
  定时任务在运行时,若出现错误,需要设置监听机制,我们通常结合logging模块记录错误信息。
  使用示例:

<font size="3">from apscheduler.schedulers.blocking import BlockingScheduler

  import datetime

  from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR

  import logging

  # logging日志配置打印格式及保存位置

  logging.basicConfig(level=logging.INFO,

                    format='%(asctime)s %(filename)s %(levelname)s %(message)s',

                    datefmt='%Y-%m-%d %H:%M:%S',

                    filename='sche.log',

                    filemode='a')

  def log_listen(event):

  if event.exception :

  print ( '任务出错,报错信息:{}'.format(event.exception))

  else:

  print ( '任务正常运行...' )

  def sch_test(job_type):

      now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

      print('时间:{}, {}测试apscheduler'.format(now, job_type))

      print(1/0)

  sched = BlockingScheduler()

  # 使用mysql存储任务

  sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'

  sched.add_jobstore('sqlalchemy',url=sql_url)

  # 添加任务

  sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')

  # 配置任务执行完成及错误时的监听

  sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

  # 配置日志监听

  sched._logger = logging

  sched.start()</font>apscheduler的封装使用
  上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。

<font size="3">from apscheduler.schedulers.blocking import BlockingScheduler

  from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

  from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR

  import logging

  import logging.handlers

  import os

  import datetime

  class LoggerUtils():

      def init_logger(self, logger_name):

        # 日志格式

        formatter = logging.Formatter('%(asctime)s %(filename)s %(levelname)s %(message)s')

        log_obj = logging.getLogger(logger_name)

        log_obj.setLevel(logging.INFO)

        # 设置log存储位置

        path = '/data/logs/'

        filename = '{}{}.log'.format(path, logger_name)

        if not os.path.exists(path):

              os.makedirs(path)

        # 设置日志按照时间分割

        timeHandler = logging.handlers.TimedRotatingFileHandler(

           filename,

           when='D',# 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周

           interval=1, # 多少天切割一次

           backupCount=10# 保留几天

        )

        timeHandler.setLevel(logging.INFO)

        timeHandler.setFormatter(formatter)

        log_obj.addHandler(timeHandler)

        return log_obj

  class Scheduler(LoggerUtils):

      def __init__(self):

        # 执行器设置

        executors = {

              'default': ThreadPoolExecutor(10),# 设置一个名为“default”的ThreadPoolExecutor,其worker值为10

              'processpool': ProcessPoolExecutor(5)# 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5

        }

        self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)

        # 存储器设置

        # 这里使用sqlalchemy存储器,将任务存储在mysql

        sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'

        self.scheduler.add_jobstore('sqlalchemy',url=sql_url)

        def log_listen(event):

              if event.exception:

                  # 日志记录

                  self.scheduler._logger.error(event.traceback)

     

        # 配置任务执行完成及错误时的监听

        self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

        # 配置日志监听

        self.scheduler._logger = self.init_logger('sche_test')

      def add_job(self, *args, **kwargs):

        """添加任务"""

        self.scheduler.add_job(*args, **kwargs)

      def start(self):

        """开启任务"""

        self.scheduler.start()

  # 测试任务

  def sch_test(job_type):

      now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

      print('时间:{}, {}测试apscheduler'.format(now, job_type))

      print(1/0)

  # 添加任务,开启任务

  sched = Scheduler()

  # 添加任务

  sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')

  # 开启任务

  sched.start()</font>小结
  这篇文章介绍了Python实现定时任务的又一利器apscheduler,通过简单例子及apscheduler框架的主要模块分解,我们可以根据实际需求配置好模块信息,再结合logging模块,我们可以实时监控到定时任务的运行情况。话说大家是如何来实现定时任务的,欢迎评论区讨论。


页: [1]
查看完整版本: apscheduler如何在Python中实现定时?