Z.S.K.'s Records

python定时框架APScheduler使用(-)

日常工作中经常要使用定时任务来处理一些统计,也常用time.sleep()配合多进程/多线程来实现,但有时候tim.sleep()就有点局限了,比如在flask中引用定时工作,apscheduler就大显身手了,而且在flask环境下有集成包–flask-apscheduler,查看源码,非常简单,对apscheduler接口进行了二次封装,屡次不爽

APScheduler

APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统

原理

scheduler的主循环(main_loop),其实就是反复检查是不是有到时需要执行的任务,完成一次检查的函数是_process_jobs, 这个函数做这么几件事:

  1. 询问自己的每一个jobstore,有没有到期需要执行的任务
  2. 如果有,计算这些job中每个job需要运行的时间点,如果run_times有多个,会coalesce检查(下面会解释)
  3. 提交给executor排期运行

安装

1
pip3 install apscheduler

组成

  1. 调度器(scheduler): 一些配置相关的接口都在调试器中完成,如常用的添加/删除作业等
  2. 执行器(executor): 负责把具体任务提交到进程中执行,执行完之后会通知调试器
  3. 触发器(tigger): 某一工作到来时引发的事件,
  4. 作业存储(jobstore): 默认作业是在内存中被执行的,也可把作业存储到数据库中,保存的时候被序列化,执行时被反序列化,几乎支持所有的常用数据库

scheduler

调度器分为以下几种,可根据不同的使用场景选用不同的调度器:

  1. BlockingScheduler: 很明显这是种阻塞型,一般用在没有其它进程运行的场景下
  2. BackGroundScheduler: 后台式,也就是单起一个进程/线程运行该任务,不影响主程序
  3. ASyncIOScheduler:
  4. GeventScheduler:
  5. TornadoScheduler:
  6. TwistedScheduler:
  7. QtScheduler:

本人只使用过BlockingScheduler跟BackGroundScheduler,flask-scheduler使用的即为BackGroundScheduler,其它的后续再研究研究

选择类型也很简单,初始化时直接实例化:

1
2
3
4
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
#启动
scheduler.start()

trigger

  1. cron: 类linux下的crontab格式,属于定时调度
  2. interval:每隔多久调度一次
  3. date:一次性调度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#1. cron风格
(int|str) 表示参数既可以是int类型,也可以是str类型
(datetime | str) 表示参数既可以是datetime类型,也可以是str类型
year (int|str) – 4-digit year -(表示四位数的年份,如2008年)
month (int|str) – month (1-12) -(表示取值范围为1-12月)
day (int|str) – day of the (1-31) -(表示取值范围为1-31日)
week (int|str) – ISO week (1-53) -(格里历20061231日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
hour (int|str) – hour (0-23) - (表示取值范围为0-23时)
minute (int|str) – minute (0-59) - (表示取值范围为0-59分)
second (int|str) – second (0-59) - (表示取值范围为0-59秒)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)
#如:在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序
sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

#2.interval风格
weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
#如:每隔2分钟执行一次
scheduler.add_job(myfunc, 'interval', minutes=2)

#3.date风格
run_date (datetime|str) – the date/time to run the job at -(任务开始的时间)
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
#如:在2009年11月6号16时30分5秒时执行
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

executor

说白了就是指定任务是以线程池/进程池里运行,这在初始化时可以指定,同时可以指定最大的工作池,默认的为default: ThreadPoolExecutor,max-worker为20,当然也可以指定为processpool,默认max-worker为5

jobstore

jobstore则是指的是job持久化,默认job运行在内存中,可持久化在数据库,指定为mongo的MongoDBJobStore或者是使用sqlite的SQLAlchemyJobStore,同时可指定多种jobstore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

重要配置

有几种参数需要简单说明一下:

  1. max_instances: 每个job在同一时刻能够运行的最大实例数,默认情况下为1个,可以指定为更大值,这样即使上个job还没运行完同一个job又被调度的话也能够再开一个线程执行
  2. coalesce:当由于某种原因导致某个job积攒了好几次没有实际运行(比如说系统挂了5分钟后恢复,有一个任务是每分钟跑一次的,按道理说这5分钟内本来是“计划”运行5次的,但实际没有执行),如果coalesce为True,下次这个job被submit给executor时,只会执行1次,也就是最后这次,如果为False,那么会执行5次(不一定,因为还有其他条件,看下面的misfire_grace_time的解释)
  3. misfire_grace_time:单位为秒,假设有这么一种情况,当某一job被调度时刚好线程池都被占满,调度器会选择将该job排队不运行,misfire_grace_time参数则是在线程池有可用线程时会比对该job的应调度时间跟当前时间的差值,如果差值<misfire_grace_time时,调度器会再次调度该job.反之该job的执行状态为EVENT_JOB_MISSED了,即错过运行.

API

apscheduler的API非常的简洁,官网的文档也很齐全,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#1. 添加/删除 作业
#参数: id,function,trigger,time
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
#当然也可以给job指定id
job = scheduler.add_job(id='job1',myfunc, 'interval', minutes=2)
job.remove_job('job1')
#2.暂停/启用 作业
apsched.schedulers.base.BaseScheduler.pause_job(id)
apsched.schedulers.base.BaseScheduler.resume_job(id)
#3.获取作业列表
print sched.get_job(id)
print_jobs(id)
#4.修改作业
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
#5.关闭调度器,默认情况下调度器会等所有任务执行完之后才关闭,指定wait=Flase可不等待直接着关闭
sched.shutdown(wait=False)

事件

一个很重要的问题就是:我怎么知道任务是执行成功了还是失败还是说压根就没执行,apscheduler提供了很人性化的接口,我们可以给scheduler注册监听事件,在每次任务执行后会记录任务执行状态,这便是scheduler event功能,官方的example很简单:

1
2
3
4
5
6
7
def my_listener(event):
if event.exception:
print('The job crashed')
else:
print('The job worked')
#注册回调函数
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

即当执行job状态变成EVENT_JOB_EXECUTED 或者EVENT_JOB_ERROR即会调用my_listener函数,当然这里的my_listener非常简单,但我们完全可以在这里通过判断event是否发生异常来实现其它一些逻辑,如job执行情况写入数据库,这样就很容易在月报表里统计定时任务执行情况了

其中my_listener的event类其实继承了class apscheduler.events.JobExecutionEvent类,主要有以下属性:

  1. code, 状态码,详见这里
  2. job_id,
  3. jobstore,
  4. scheduled_run_time,计划运行时间
  5. retval=None, 执行成功时的返回值
  6. exception=None, 是否发生异常,上述代码就是判断了这个值是否为None,正常执行的话这里为None
  7. traceback=None

今天就记录到这吧,关于apscheduler的web应用flask-apscheduler,下次再更.

参考文章:

转载请注明原作者: 周淑科(https://izsk.me)


 wechat
Scan Me To Read on Phone
I know you won't do this,but what if you did?