server {
    listen 80;
    server_name domain.com;

    location / {
        proxy_pass http://127.0.0.1:8080;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        # SSE Support
        proxy_buffering off;
        proxy_cache off;
    }
}

安装 crypto-js

pnpm install crypto-js

代码

import * as CryptoJS from 'crypto-js';


export async function decryptText(text: string): Promise<string> {
    // 16进制
    const keyHex = "cb69976b953380e5ff637b0bf9af35fdf1a56815c81296b3e5314e506d209a05";
    const ivHex = 'a312cc7df85e4d26dc65fb4f3fe82919'
    // console.log(`text: ${text}`)
    // Convert hex strings to Uint8Array
    const keyArray = new Uint8Array(
        keyHex.match(/.{1,2}/g).map(byte => parseInt(byte, 16))
    );
    const ivArray = new Uint8Array(
        ivHex.match(/.{1,2}/g).map(byte => parseInt(byte, 16))
    );

    const encryptedData = hexStringToByteArray(text)

    // AES decryption using crypto-js
    const decryptedData = CryptoJS.AES.decrypt(
        {
            ciphertext: CryptoJS.enc.Hex.parse(
                Array.from(encryptedData).map(byte => ('0' + (byte & 0xFF).toString(16)).slice(-2)).join('')
            )
        },
        CryptoJS.enc.Hex.parse(
            Array.from(keyArray).map(byte => ('0' + byte.toString(16)).slice(-2)).join('')
        ),
        {
            iv: CryptoJS.enc.Hex.parse(
                Array.from(ivArray).map(byte => ('0' + byte.toString(16)).slice(-2)).join('')
            ),
            mode: CryptoJS.mode.CBC,
            padding: CryptoJS.pad.Pkcs7,
        }
    );

    return decryptedData.toString(CryptoJS.enc.Utf8)
}

(function () {
  var originalSend = XMLHttpRequest.prototype.send;
  var originalOpen = XMLHttpRequest.prototype.open;

  XMLHttpRequest.prototype.open = function (method, url, async, user, password) {
    this._url = url; // 保存请求 URL,可能在后面用到
    return originalOpen.apply(this, arguments);
  };

  XMLHttpRequest.prototype.send = function (body) {
    this.addEventListener('load', function () {
      if (this.status == 200 && this._url.includes('.m3u8')) {
        // 在这里处理响应
        console.log('Response from:bb ', this._url, 'Status:', this.status);
      }
    });

    return originalSend.apply(this, arguments);
  };
})();

定义模型

class TaskTracking(models.Model):
    task_id = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务id', unique=True)
    task_type = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务类型')
    task_pk = models.IntegerField(verbose_name='任务主键', null=False, blank=False, default=0)
    status = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务状态')
    result = models.CharField(max_length=100, null=True, blank=True, verbose_name='任务结果', )
    created_at = models.DateTimeField(verbose_name='创建时间', editable=False, null=False, auto_now_add=True)
    updated_at = models.DateTimeField(verbose_name='更新时间', editable=False, null=False, auto_now=True)

    class Meta:
        verbose_name = "任务追踪"
        verbose_name_plural = verbose_name

迁移

python manage.py makemigrations && python manage.py migrate

定义装饰器

from functools import wraps
from celery.app.task import Task
from study.celery import app


def task_tracker(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 获取方法名作为 task_type
        task_type = func.__name__

        # 获取任务ID
        task_id = None
        if isinstance(args[0], Task):  # 检查第一个参数是否是 Celery 任务实例
            task_id = args[0].request.id

        # 获取 x 和 y 的值
        task_pk = args[1] if len(args) > 1 else kwargs.get('pk')

        # 任务开始,记录到数据库(包括 x 和 y 的值)
        task = TaskTracking.objects.create(task_id=task_id, task_type=task_type, task_pk=task_pk, status='started')

        try:
            result = func(*args, **kwargs)
            # 任务成功,更新数据库状态
            task.status = 'completed'
            task.result = result
            task.save()
            return result
        except Exception as e:
            # 任务失败,更新数据库状态
            task.status = 'failed'
            task.result = str(e)
            task.save()
            raise e

    return wrapper

use


@app.task(bind=True)
@task_tracker
def dosomething(self, pk):
    pass

Install the django-celery-results library:

pip install django-celery-results

Add django_celery_results to INSTALLED_APPS in your Django project’s settings.py:

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

Note that there is no dash in the module name, only underscores.

Create the Celery database tables by performing a database migrations:

python manage.py migrate django_celery_results

Configure Celery to use the django-celery-results backend.

Assuming you are using Django’s settings.py to also configure Celery, add the following settings:

CELERY_RESULT_BACKEND = 'django-db'
# For the cache backend you can use:

CELERY_CACHE_BACKEND = 'django-cache'

We can also use the cache defined in the CACHES setting in django.


# celery setting.
CELERY_CACHE_BACKEND = 'default'

# django setting.
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
        'LOCATION': 'my_cache_table',
    }
}