定义模型
class TaskTracking(models.Model):
task_id = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务id', unique=True)
task_type = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务类型')
task_pk = models.IntegerField(verbose_name='任务主键', null=False, blank=False, default=0)
status = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务状态')
result = models.CharField(max_length=100, null=True, blank=True, verbose_name='任务结果', )
created_at = models.DateTimeField(verbose_name='创建时间', editable=False, null=False, auto_now_add=True)
updated_at = models.DateTimeField(verbose_name='更新时间', editable=False, null=False, auto_now=True)
class Meta:
verbose_name = "任务追踪"
verbose_name_plural = verbose_name
迁移
python manage.py makemigrations && python manage.py migrate
定义装饰器
from functools import wraps
from celery.app.task import Task
from study.celery import app
def task_tracker(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 获取方法名作为 task_type
task_type = func.__name__
# 获取任务ID
task_id = None
if isinstance(args[0], Task): # 检查第一个参数是否是 Celery 任务实例
task_id = args[0].request.id
# 获取 x 和 y 的值
task_pk = args[1] if len(args) > 1 else kwargs.get('pk')
# 任务开始,记录到数据库(包括 x 和 y 的值)
task = TaskTracking.objects.create(task_id=task_id, task_type=task_type, task_pk=task_pk, status='started')
try:
result = func(*args, **kwargs)
# 任务成功,更新数据库状态
task.status = 'completed'
task.result = result
task.save()
return result
except Exception as e:
# 任务失败,更新数据库状态
task.status = 'failed'
task.result = str(e)
task.save()
raise e
return wrapper
use
@app.task(bind=True)
@task_tracker
def dosomething(self, pk):
pass