django调度任务apscheduler
pip install django_apscheduler
settings.py
INSTALLED_APPS = [
# ...
'django_apscheduler',
# ...
]
app为应用目录
app下新建 scheduler.py
import time
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job, register_events
import logging
print('django-apscheduler')
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
def job2(name):
# 具体要执行的代码
print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))
# 实例化调度器
scheduler = BackgroundScheduler()
# 调度器使用DjangoJobStore()
# 持久化会报错 注释掉
# scheduler.add_jobstore(DjangoJobStore(), "default")
# 添加任务1
# 每隔5s执行这个任务
@register_job(scheduler,"interval", seconds=5,args=['王路'],id='job1')
def job1(name):
# 具体要执行的代码
print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))
scheduler.add_job(job2,"interval",seconds=10,args=['王飞'],id="job2")
# 监控任务
register_events(scheduler)
# 调度器开始运行
app.apps.py修改
from django.apps import AppConfig
from django.conf import settings
class AppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'app'
# 新增
def ready(self):
if settings.SCHEDULER_AUTOSTART:
from .scheduler import scheduler
scheduler.start()