分类 默认分类 下的文章

定义模型

class TaskTracking(models.Model):
    task_id = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务id', unique=True)
    task_type = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务类型')
    task_pk = models.IntegerField(verbose_name='任务主键', null=False, blank=False, default=0)
    status = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务状态')
    result = models.CharField(max_length=100, null=True, blank=True, verbose_name='任务结果', )
    created_at = models.DateTimeField(verbose_name='创建时间', editable=False, null=False, auto_now_add=True)
    updated_at = models.DateTimeField(verbose_name='更新时间', editable=False, null=False, auto_now=True)

    class Meta:
        verbose_name = "任务追踪"
        verbose_name_plural = verbose_name

迁移

python manage.py makemigrations && python manage.py migrate

定义装饰器

from functools import wraps
from celery.app.task import Task
from study.celery import app


def task_tracker(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 获取方法名作为 task_type
        task_type = func.__name__

        # 获取任务ID
        task_id = None
        if isinstance(args[0], Task):  # 检查第一个参数是否是 Celery 任务实例
            task_id = args[0].request.id

        # 获取 x 和 y 的值
        task_pk = args[1] if len(args) > 1 else kwargs.get('pk')

        # 任务开始,记录到数据库(包括 x 和 y 的值)
        task = TaskTracking.objects.create(task_id=task_id, task_type=task_type, task_pk=task_pk, status='started')

        try:
            result = func(*args, **kwargs)
            # 任务成功,更新数据库状态
            task.status = 'completed'
            task.result = result
            task.save()
            return result
        except Exception as e:
            # 任务失败,更新数据库状态
            task.status = 'failed'
            task.result = str(e)
            task.save()
            raise e

    return wrapper

use


@app.task(bind=True)
@task_tracker
def dosomething(self, pk):
    pass

Install the django-celery-results library:

pip install django-celery-results

Add django_celery_results to INSTALLED_APPS in your Django project’s settings.py:

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

Note that there is no dash in the module name, only underscores.

Create the Celery database tables by performing a database migrations:

python manage.py migrate django_celery_results

Configure Celery to use the django-celery-results backend.

Assuming you are using Django’s settings.py to also configure Celery, add the following settings:

CELERY_RESULT_BACKEND = 'django-db'
# For the cache backend you can use:

CELERY_CACHE_BACKEND = 'django-cache'

We can also use the cache defined in the CACHES setting in django.


# celery setting.
CELERY_CACHE_BACKEND = 'default'

# django setting.
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
        'LOCATION': 'my_cache_table',
    }
}

生成加密key

openssl rand  16 > enc.key

生成IV

openssl rand -hex 16

新建enc.keyinfo,内容如下

enc.key
enc.key
191029d9c2d4e9051a8a5deb2f7f5c04

以加密hls.mkv为例

ffmpeg -i hls.mkv \
-codec:v libx264 \
-codec:a mp3 \
-map 0 \
-s 640x360 \
-hls_time 10 \
-hls_list_size 0 \
-hls_allow_cache 1 \
-hls_base_url http://localhost/videos/ \
-hls_segment_filename out%03d.ts \
-hls_key_info_file enc.keyinfo \
playlist.m3u8

新建html测试效果

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>Encrypted Video Playback Test</title>
  <!-- Include video.js CSS -->
  <link href="https://vjs.zencdn.net/7.17.0/video-js.css" rel="stylesheet">

  <!-- Include video.js library -->
  <script src="https://vjs.zencdn.net/7.17.0/video.js"></script>
</head>
<body>

<video id="encrypted-video" class="video-js vjs-default-skin" controls width="640" height="360">
  <!-- Include an HLS source (replace with your encrypted m3u8 URL) -->
  <source src="/playlist.m3u8" type="application/x-mpegURL">
</video>

<script>
  // Initialize video.js
  var player = videojs('encrypted-video');

  // Add any additional configurations or event listeners if needed
</script>

</body>
</html>
python -m http.server 1089

const hexString = "68656c6c6f";
const byteArray = Buffer.from(hexString, 'hex');
console.log(byteArray);

由于浏览器不支持Buffer,使用下面的形式

const hexStringToByteArray = function (hexString) {
    const bytes = new Uint8Array(hexString.length / 2);

    for (let i = 0; i < hexString.length; i += 2) {
        bytes[i / 2] = parseInt(hexString.substr(i, 2), 16);
    }

    return bytes;
}

server {
  listen 80;
  server_name luntan;
  root /var/www/html;
  index index.php index.html;

  location / {
    try_files $uri $uri/ /index.php?$query_string;
  }

  location ~ \.php$ {
    fastcgi_pass unix:/run/php/php7.4-fpm.sock;
    fastcgi_index index.php;
    fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
    include fastcgi_params;
  }

  location ~ /\.ht {
    deny all;
  }
}