pip install django_apscheduler

settings.py

INSTALLED_APPS = [

    # ...
    'django_apscheduler',
    # ...
]

app为应用目录
app下新建 scheduler.py

import time
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job, register_events
import logging

print('django-apscheduler')

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
def job2(name):
    # 具体要执行的代码
    print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))

# 实例化调度器
scheduler = BackgroundScheduler()
# 调度器使用DjangoJobStore()
# 持久化会报错 注释掉
# scheduler.add_jobstore(DjangoJobStore(), "default")
# 添加任务1
# 每隔5s执行这个任务
@register_job(scheduler,"interval", seconds=5,args=['王路'],id='job1')
def job1(name):
    # 具体要执行的代码
    print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))

scheduler.add_job(job2,"interval",seconds=10,args=['王飞'],id="job2")

# 监控任务
register_events(scheduler)
# 调度器开始运行

app.apps.py修改

from django.apps import AppConfig
from django.conf import settings

class AppConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'app'
    
    # 新增
    def ready(self):
        if settings.SCHEDULER_AUTOSTART:
            from .scheduler import scheduler
            scheduler.start()

标签: django, python, 定时任务

仅有一条评论

  1. 真棒!

添加新评论