目录
基本使用trigger启动方式cron启动方式使用装饰器定时启动任务flask-apscheduler
将apscheduler
移植到了flask
应用中,使得在flask
中可以非常方便的使用定时任务了,除此之外,它还有如下几个特性
(资料图)
根据Flask
配置加载调度器配置根据Flask
配置加载任务调度器允许指定服务器运行任务提供RESTful API
管理任务,也就是远程管理任务为RESTful API
提供认证
下载安装
pip install flask-apscheduler
基本使用
flask-apscheduler
的相关配置,我们会将它和其它扩展一起,放在应用的配置里
class Config(object): // 配置项 JOBS = [ { "id": "job1", "func": "run:add", "args": (1, 2), "trigger": "interval", "seconds": 3 } ] SCHEDULER_API_ENABLED = True def add(a, b): print(a+b)
JOBS列表的每一个元素表示一个定时任务,列子中只有一个interval任务,表示每隔3秒运行一次函数add。func指定调用的函数,args表示传入函数的参数,trigger表示启动方式,常用的有两种,分别是trigger和cron。
上边我们设置了SCHEDULER_API_ENABLED = True
,可以通过访问http://127.0.0.1:5000/scheduler,其中scheduler
是默认的RESTful API
前缀
通过查看源码,可以发现flask-apscheduler提供了以下的接口
def _load_api(self): """ Add the routes for the scheduler API. """ self._add_url_route("get_scheduler_info", "", api.get_scheduler_info, "GET") self._add_url_route("add_job", "/jobs", api.add_job, "POST") self._add_url_route("get_job", "/jobs/", api.get_job, "GET") self._add_url_route("get_jobs", "/jobs", api.get_jobs, "GET") self._add_url_route("delete_job", "/jobs/ ", api.delete_job, "DELETE") self._add_url_route("update_job", "/jobs/ ", api.update_job, "PATCH") self._add_url_route("pause_job", "/jobs/ /pause", api.pause_job, "POST") self._add_url_route("resume_job", "/jobs/ /resume", api.resume_job, "POST") self._add_url_route("run_job", "/jobs/ /run", api.run_job, "POST")
如果需要查看当前运行的所有定时任务,则请求http://127.0.0.1:5000/scheduler/jobs即可。
trigger启动方式
trigger表示间隔启动,在trigger方式中,使用seconds配置间隔多久启动一次,单位是秒。
cron启动方式
cron表示定时启动
class Config(object): JOBS = [ { "id": "job1", "func": "scheduler:task", "args": (1, 2), "trigger": "cron", "day": "*", "hour": "13", "minute": "16", "second": "20" } ] SCHEDULER_API_ENABLED = True def task(a, b): print(str(datetime.datetime.now()) + " execute task " + "{}+{}={}".format(a, b, a + b))
该配置项则表示每天的13点16分20秒启动一次。*表示全部。
有关常用的cron配置有:
day
表示天hour
表示小时minute
表示分钟second
表示秒week
周day_of_week
星期几,如星期天使用sun,星期五使用fri,其他的类似。使用装饰器定时启动任务
from flask import Flask from flask_apscheduler import APScheduler import datetime class Config(object): SCHEDULER_API_ENABLED = True scheduler = APScheduler() # interval examples @scheduler.task("interval", id="do_job_1", seconds=30, misfire_grace_time=900) def job1(): print(str(datetime.datetime.now()) + " Job 1 executed")
表示每隔30秒调用一次job1函数。
到此这篇关于Python flask框架定时任务apscheduler应用介绍的文章就介绍到这了,更多相关Python flask apscheduler内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
X 关闭
X 关闭
- 15G资费不大降!三大运营商谁提供的5G网速最快?中国信通院给出答案
- 2联想拯救者Y70发布最新预告:售价2970元起 迄今最便宜的骁龙8+旗舰
- 3亚马逊开始大规模推广掌纹支付技术 顾客可使用“挥手付”结账
- 4现代和起亚上半年出口20万辆新能源汽车同比增长30.6%
- 5如何让居民5分钟使用到各种设施?沙特“线性城市”来了
- 6AMD实现连续8个季度的增长 季度营收首次突破60亿美元利润更是翻倍
- 7转转集团发布2022年二季度手机行情报告:二手市场“飘香”
- 8充电宝100Wh等于多少毫安?铁路旅客禁止、限制携带和托运物品目录
- 9好消息!京东与腾讯续签三年战略合作协议 加强技术创新与供应链服务
- 10名创优品拟通过香港IPO全球发售4100万股 全球发售所得款项有什么用处?