标签 python 下的文章

pip install django_apscheduler

settings.py

INSTALLED_APPS = [

    # ...
    'django_apscheduler',
    # ...
]

app为应用目录
app下新建 scheduler.py

import time
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job, register_events
import logging

print('django-apscheduler')

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
def job2(name):
    # 具体要执行的代码
    print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))

# 实例化调度器
scheduler = BackgroundScheduler()
# 调度器使用DjangoJobStore()
# 持久化会报错 注释掉
# scheduler.add_jobstore(DjangoJobStore(), "default")
# 添加任务1
# 每隔5s执行这个任务
@register_job(scheduler,"interval", seconds=5,args=['王路'],id='job1')
def job1(name):
    # 具体要执行的代码
    print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))

scheduler.add_job(job2,"interval",seconds=10,args=['王飞'],id="job2")

# 监控任务
register_events(scheduler)
# 调度器开始运行

app.apps.py修改

from django.apps import AppConfig
from django.conf import settings

class AppConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'app'
    
    # 新增
    def ready(self):
        if settings.SCHEDULER_AUTOSTART:
            from .scheduler import scheduler
            scheduler.start()

获取登录用户有两种方式

1.中间件
2.request.user

1.中间件

from threading import local

_user = local()


class CurrentUserMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        _user.value = request.user
        response = self.get_response(request)
        return response

    @staticmethod
    def get_current_user():
        return getattr(_user, 'value', None)

setting中

MIDDLEWARE = [
    # ...
    "django.contrib.auth.middleware.AuthenticationMiddleware",
    #...
    #需要在 auth中间件后面
    "backend.middleware.CurrentUserMiddleware",
]

使用

form .middleware import CurrentUserMiddleware

user = CurrentUserMiddleware.get_current_user()

# model中使用
class MyModel(models.Model):
    creator = models.ForeignKey(User, on_delete=models.SET_NULL, verbose_name='创建人',
                               related_name='creator', null=True, blank=True, )

    def save(self, *args, **kwargs):
        if not self.pk and not self.author:
            self.author = CurrentUserMiddleware.get_current_user()
        super().save(*args, **kwargs)

2.request

@api_view(["GET"])
@authentication_classes([SessionAuthentication, BasicAuthentication])
@permission_classes([IsAuthenticated])
def tag_create(request):
    user=request.user

定义模型

class TaskTracking(models.Model):
    task_id = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务id', unique=True)
    task_type = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务类型')
    task_pk = models.IntegerField(verbose_name='任务主键', null=False, blank=False, default=0)
    status = models.CharField(max_length=100, null=False, blank=False, verbose_name='任务状态')
    result = models.CharField(max_length=100, null=True, blank=True, verbose_name='任务结果', )
    created_at = models.DateTimeField(verbose_name='创建时间', editable=False, null=False, auto_now_add=True)
    updated_at = models.DateTimeField(verbose_name='更新时间', editable=False, null=False, auto_now=True)

    class Meta:
        verbose_name = "任务追踪"
        verbose_name_plural = verbose_name

迁移

python manage.py makemigrations && python manage.py migrate

定义装饰器

from functools import wraps
from celery.app.task import Task
from study.celery import app


def task_tracker(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 获取方法名作为 task_type
        task_type = func.__name__

        # 获取任务ID
        task_id = None
        if isinstance(args[0], Task):  # 检查第一个参数是否是 Celery 任务实例
            task_id = args[0].request.id

        # 获取 x 和 y 的值
        task_pk = args[1] if len(args) > 1 else kwargs.get('pk')

        # 任务开始,记录到数据库(包括 x 和 y 的值)
        task = TaskTracking.objects.create(task_id=task_id, task_type=task_type, task_pk=task_pk, status='started')

        try:
            result = func(*args, **kwargs)
            # 任务成功,更新数据库状态
            task.status = 'completed'
            task.result = result
            task.save()
            return result
        except Exception as e:
            # 任务失败,更新数据库状态
            task.status = 'failed'
            task.result = str(e)
            task.save()
            raise e

    return wrapper

use


@app.task(bind=True)
@task_tracker
def dosomething(self, pk):
    pass

Install the django-celery-results library:

pip install django-celery-results

Add django_celery_results to INSTALLED_APPS in your Django project’s settings.py:

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

Note that there is no dash in the module name, only underscores.

Create the Celery database tables by performing a database migrations:

python manage.py migrate django_celery_results

Configure Celery to use the django-celery-results backend.

Assuming you are using Django’s settings.py to also configure Celery, add the following settings:

CELERY_RESULT_BACKEND = 'django-db'
# For the cache backend you can use:

CELERY_CACHE_BACKEND = 'django-cache'

We can also use the cache defined in the CACHES setting in django.


# celery setting.
CELERY_CACHE_BACKEND = 'default'

# django setting.
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
        'LOCATION': 'my_cache_table',
    }
}

安装kafka-python

pip install kafka-python

代码

from kafka import KafkaConsumer, TopicPartition

bootstrap_servers = ['localhost:9092']
topic = 'test'
group_id = 'lll'

consumer = KafkaConsumer(
    group_id="my-group",
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='latest'
)

PARTITIONS = []
for partition in consumer.partitions_for_topic(topic):
    PARTITIONS.append(TopicPartition(topic, partition))

end_offsets = consumer.end_offsets(PARTITIONS)
print(end_offsets)

#输出
#{TopicPartition(topic='test', partition=2): 354915,
#TopicPartition(topic='test', partition=1): 354926,
#TopicPartition(topic='test', partition=0): 354892}