日常工作中经常要使用定时任务来处理一些统计,也常用time.sleep()配合多进程/多线程来实现,但有时候tim.sleep()就有点局限了,比如在flask中引用定时工作,apscheduler就大显身手了,而且在flask环境下有集成包–flask-apscheduler,查看源码,非常简单,对apscheduler接口进行了二次封装,屡次不爽
APScheduler
APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统
原理
scheduler的主循环(main_loop),其实就是反复检查是不是有到时需要执行的任务,完成一次检查的函数是_process_jobs, 这个函数做这么几件事:
- 询问自己的每一个jobstore,有没有到期需要执行的任务
- 如果有,计算这些job中每个job需要运行的时间点,如果run_times有多个,会coalesce检查(下面会解释)
- 提交给executor排期运行
安装
1 | pip3 install apscheduler |
组成
- 调度器(scheduler): 一些配置相关的接口都在调试器中完成,如常用的添加/删除作业等
- 执行器(executor): 负责把具体任务提交到进程中执行,执行完之后会通知调试器
- 触发器(tigger): 某一工作到来时引发的事件,
- 作业存储(jobstore): 默认作业是在内存中被执行的,也可把作业存储到数据库中,保存的时候被序列化,执行时被反序列化,几乎支持所有的常用数据库
scheduler
调度器分为以下几种,可根据不同的使用场景选用不同的调度器:
- BlockingScheduler: 很明显这是种阻塞型,一般用在没有其它进程运行的场景下
- BackGroundScheduler: 后台式,也就是单起一个进程/线程运行该任务,不影响主程序
- ASyncIOScheduler:
- GeventScheduler:
- TornadoScheduler:
- TwistedScheduler:
- QtScheduler:
本人只使用过BlockingScheduler跟BackGroundScheduler,flask-scheduler使用的即为BackGroundScheduler,其它的后续再研究研究
选择类型也很简单,初始化时直接实例化:
1 | from apscheduler.schedulers.background import BackgroundScheduler |
trigger
- cron: 类linux下的crontab格式,属于定时调度
- interval:每隔多久调度一次
- date:一次性调度
1 | #1. cron风格 |
executor
说白了就是指定任务是以线程池/进程池里运行,这在初始化时可以指定,同时可以指定最大的工作池,默认的为default: ThreadPoolExecutor,max-worker为20,当然也可以指定为processpool,默认max-worker为5
jobstore
jobstore则是指的是job持久化,默认job运行在内存中,可持久化在数据库,指定为mongo的MongoDBJobStore或者是使用sqlite的SQLAlchemyJobStore,同时可指定多种jobstore
1 | from pytz import utc |
重要配置
有几种参数需要简单说明一下:
- max_instances: 每个job在同一时刻能够运行的最大实例数,默认情况下为1个,可以指定为更大值,这样即使上个job还没运行完同一个job又被调度的话也能够再开一个线程执行
- coalesce:当由于某种原因导致某个job积攒了好几次没有实际运行(比如说系统挂了5分钟后恢复,有一个任务是每分钟跑一次的,按道理说这5分钟内本来是“计划”运行5次的,但实际没有执行),如果coalesce为True,下次这个job被submit给executor时,只会执行1次,也就是最后这次,如果为False,那么会执行5次(不一定,因为还有其他条件,看下面的misfire_grace_time的解释)
- misfire_grace_time:单位为秒,假设有这么一种情况,当某一job被调度时刚好线程池都被占满,调度器会选择将该job排队不运行,misfire_grace_time参数则是在线程池有可用线程时会比对该job的应调度时间跟当前时间的差值,如果差值<misfire_grace_time时,调度器会再次调度该job.反之该job的执行状态为EVENT_JOB_MISSED了,即错过运行.
API
apscheduler的API非常的简洁,官网的文档也很齐全,
1 | #1. 添加/删除 作业 |
事件
一个很重要的问题就是:我怎么知道任务是执行成功了还是失败还是说压根就没执行,apscheduler提供了很人性化的接口,我们可以给scheduler注册监听事件,在每次任务执行后会记录任务执行状态,这便是scheduler event功能,官方的example很简单:
1 | def my_listener(event): |
即当执行job状态变成EVENT_JOB_EXECUTED 或者EVENT_JOB_ERROR即会调用my_listener函数,当然这里的my_listener非常简单,但我们完全可以在这里通过判断event是否发生异常来实现其它一些逻辑,如job执行情况写入数据库,这样就很容易在月报表里统计定时任务执行情况了
其中my_listener的event类其实继承了class apscheduler.events.JobExecutionEvent类,主要有以下属性:
- code, 状态码,详见这里
- job_id,
- jobstore,
- scheduled_run_time,计划运行时间
- retval=None, 执行成功时的返回值
- exception=None, 是否发生异常,上述代码就是判断了这个值是否为None,正常执行的话这里为None
- traceback=None
今天就记录到这吧,关于apscheduler的web应用flask-apscheduler,下次再更.