定义模型

class TaskTracking(models.Model):
    task_id = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务id', unique=True)
    task_type = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务类型')
    task_pk = models.IntegerField(verbose_name='任务主键', null=False, blank=False, default=0)
    status = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务状态')
    result = models.CharField(max_length=100, null=True, blank=True, verbose_name='任务结果', )
    created_at = models.DateTimeField(verbose_name='创建时间', editable=False, null=False, auto_now_add=True)
    updated_at = models.DateTimeField(verbose_name='更新时间', editable=False, null=False, auto_now=True)

    class Meta:
        verbose_name = "任务追踪"
        verbose_name_plural = verbose_name

迁移

python manage.py makemigrations && python manage.py migrate

定义装饰器

from functools import wraps
from celery.app.task import Task
from study.celery import app


def task_tracker(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 获取方法名作为 task_type
        task_type = func.__name__

        # 获取任务ID
        task_id = None
        if isinstance(args[0], Task):  # 检查第一个参数是否是 Celery 任务实例
            task_id = args[0].request.id

        # 获取 x 和 y 的值
        task_pk = args[1] if len(args) > 1 else kwargs.get('pk')

        # 任务开始,记录到数据库(包括 x 和 y 的值)
        task = TaskTracking.objects.create(task_id=task_id, task_type=task_type, task_pk=task_pk, status='started')

        try:
            result = func(*args, **kwargs)
            # 任务成功,更新数据库状态
            task.status = 'completed'
            task.result = result
            task.save()
            return result
        except Exception as e:
            # 任务失败,更新数据库状态
            task.status = 'failed'
            task.result = str(e)
            task.save()
            raise e

    return wrapper

use


@app.task(bind=True)
@task_tracker
def dosomething(self, pk):
    pass

标签: django, python, celery

添加新评论