django事务
from django.db import transaction
@transaction.atomic
def viewfunc(request):
# This code executes inside a transaction.
do_stuff()
from django.db import transaction
@transaction.atomic
def viewfunc(request):
# This code executes inside a transaction.
do_stuff()
server {
listen 80;
server_name domain.com;
location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# SSE Support
proxy_buffering off;
proxy_cache off;
}
}
安装 crypto-js
pnpm install crypto-js
代码
import * as CryptoJS from 'crypto-js';
export async function decryptText(text: string): Promise<string> {
// 16进制
const keyHex = "cb69976b953380e5ff637b0bf9af35fdf1a56815c81296b3e5314e506d209a05";
const ivHex = 'a312cc7df85e4d26dc65fb4f3fe82919'
// console.log(`text: ${text}`)
// Convert hex strings to Uint8Array
const keyArray = new Uint8Array(
keyHex.match(/.{1,2}/g).map(byte => parseInt(byte, 16))
);
const ivArray = new Uint8Array(
ivHex.match(/.{1,2}/g).map(byte => parseInt(byte, 16))
);
const encryptedData = hexStringToByteArray(text)
// AES decryption using crypto-js
const decryptedData = CryptoJS.AES.decrypt(
{
ciphertext: CryptoJS.enc.Hex.parse(
Array.from(encryptedData).map(byte => ('0' + (byte & 0xFF).toString(16)).slice(-2)).join('')
)
},
CryptoJS.enc.Hex.parse(
Array.from(keyArray).map(byte => ('0' + byte.toString(16)).slice(-2)).join('')
),
{
iv: CryptoJS.enc.Hex.parse(
Array.from(ivArray).map(byte => ('0' + byte.toString(16)).slice(-2)).join('')
),
mode: CryptoJS.mode.CBC,
padding: CryptoJS.pad.Pkcs7,
}
);
return decryptedData.toString(CryptoJS.enc.Utf8)
}
(function () {
var originalSend = XMLHttpRequest.prototype.send;
var originalOpen = XMLHttpRequest.prototype.open;
XMLHttpRequest.prototype.open = function (method, url, async, user, password) {
this._url = url; // 保存请求 URL,可能在后面用到
return originalOpen.apply(this, arguments);
};
XMLHttpRequest.prototype.send = function (body) {
this.addEventListener('load', function () {
if (this.status == 200 && this._url.includes('.m3u8')) {
// 在这里处理响应
console.log('Response from:bb ', this._url, 'Status:', this.status);
}
});
return originalSend.apply(this, arguments);
};
})();
定义模型
class TaskTracking(models.Model):
task_id = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务id', unique=True)
task_type = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务类型')
task_pk = models.IntegerField(verbose_name='任务主键', null=False, blank=False, default=0)
status = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务状态')
result = models.CharField(max_length=100, null=True, blank=True, verbose_name='任务结果', )
created_at = models.DateTimeField(verbose_name='创建时间', editable=False, null=False, auto_now_add=True)
updated_at = models.DateTimeField(verbose_name='更新时间', editable=False, null=False, auto_now=True)
class Meta:
verbose_name = "任务追踪"
verbose_name_plural = verbose_name
迁移
python manage.py makemigrations && python manage.py migrate
定义装饰器
from functools import wraps
from celery.app.task import Task
from study.celery import app
def task_tracker(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 获取方法名作为 task_type
task_type = func.__name__
# 获取任务ID
task_id = None
if isinstance(args[0], Task): # 检查第一个参数是否是 Celery 任务实例
task_id = args[0].request.id
# 获取 x 和 y 的值
task_pk = args[1] if len(args) > 1 else kwargs.get('pk')
# 任务开始,记录到数据库(包括 x 和 y 的值)
task = TaskTracking.objects.create(task_id=task_id, task_type=task_type, task_pk=task_pk, status='started')
try:
result = func(*args, **kwargs)
# 任务成功,更新数据库状态
task.status = 'completed'
task.result = result
task.save()
return result
except Exception as e:
# 任务失败,更新数据库状态
task.status = 'failed'
task.result = str(e)
task.save()
raise e
return wrapper
use
@app.task(bind=True)
@task_tracker
def dosomething(self, pk):
pass