首页 文章

通过Django-celery调度数千个一次性(非重新安置)任务以实现近同步执行

提问于
浏览
7

一些上下文:我正在构建一个Django应用程序,允许用户预先保存操作,并安排他们希望执行该操作的未来的确切日期/时间 . 例如,安排一个帖子将在下周早上5:30以编程方式推送到Facebook墙上 .

我正在寻找一个可以处理一次性任务的一千个实例的任务调度系统,所有这些都设置为几乎同时执行(误差范围加上或减去一分钟) .

我正在考虑使用Django-celery / Rabbitmq,但我注意到Celery docs没有解决一次性使用的任务 . Django-celery在这里是正确的选择(也许是通过继承CrontabSchedule)或者我的能量更好地用于研究其他方法吗?也许和Sched Module和Cron一起乱砍一些东西 .

2 回答

  • 0

    Edit 2:

    出于某种原因,我的脑袋原本停留在重复任务的领域 . 这是一个更简单的解决方案 .

    您真正需要的是为每个用户操作定义一个任务 . 您可以跳过存储要在数据库中执行的任务 - 这就是芹菜的用途!

    重新使用你的facebook帖子示例,并再次假设你有一个功能 post_to_facebook 某个地方,它需要用户和一些文本,做一些魔术,并将文本发布到该用户的Facebook,你可以将它定义为这样的任务:

    # Task to send one update.
    @celery.task(ignore_result=True)
    def post_to_facebook(user, text):
        # perform magic
        return whatever_you_want
    

    当用户准备将这样的帖子排队时,您只需告诉芹菜何时运行该任务:

    post_to_facebook.apply_async(
        (user, text),   # args
        eta=datetime.datetime(2012, 9, 15, 11, 45, 4, 126440)  # pass execution options as kwargs
    )
    

    这里详细介绍了一大堆可用的看涨期权:http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown

    如果需要调用结果,可以跳过任务定义中的ignore_result参数并返回AsyncResult对象,然后检查调用结果 . 更多这里:http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results

    下面的一些答案仍然有用 . 您仍然希望为每个用户操作执行任务,您仍然需要考虑任务设计等,但这是一个更简单的方法来完成您的要求 .

    Original answer using recurring tasks follows:

    Dannyroa有正确的想法 . 我会在这里 Build 一点 .

    Edit / TLDR: 答案是 Yes ,芹菜适合您的需要 . 您可能需要重新考虑您的任务定义 .

    我假设你不允许你的用户编写任意Python代码来定义他们的任务 . 除此之外,您必须预定义用户可以安排的某些操作,然后允许他们按照自己的喜好安排这些操作 . 然后,您可以为每个用户操作运行一个计划任务,检查条目并为每个条目执行操作 .

    One user action:

    使用您的Facebook示例,您可以将用户的更新存储在表格中:

    class ScheduledPost(Model):
        user = ForeignKey('auth.User')
        text = TextField()
        time = DateTimeField()
        sent = BooleanField(default=False)
    

    然后,您将每分钟运行一个任务,检查计划在最后一分钟发布的表中的条目(基于您提到的错误边距) . 如果您点击一分钟窗口非常重要,则可以更频繁地安排任务,例如每30秒 . 任务可能如下所示(在myapp / tasks.py中):

    @celery.task
    def post_scheduled_updates():
        from celery import current_task
        scheduled_posts = ScheduledPost.objects.filter(
            sent=False,
            time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
            time__lte=timezone.now()
        )
        for post in scheduled_posts:
            if post_to_facebook(post.text):
                post.sent = True
                post.save()
    

    配置可能如下所示:

    CELERYBEAT_SCHEDULE = {
        'fb-every-30-seconds': {
            'task': 'tasks.post_scheduled_updates',
            'schedule': timedelta(seconds=30),
        },
    }
    

    Additional user actions:

    对于除了发布到Facebook之外的每个用户操作,您还可以定义新表和新任务:

    class EmailToMom(Model):
        user = ForeignKey('auth.User')
        text = TextField()
        subject = CharField(max_length=255)
        sent = BooleanField(default=False)
        time = DateTimeField()
    
    @celery.task
    def send_emails_to_mom():
        scheduled_emails = EmailToMom.objects.filter(
            sent=False,
            time__lt=timezone.now()
        )
        for email in scheduled_emails:
            sent = send_mail(
                email.subject,
                email.text,
                email.user.email,
                [email.user.mom.email],
            )
            if sent:
                email.sent = True
                email.save()
    
        CELERYBEAT_SCHEDULE = {
            'fb-every-30-seconds': {
                'task': 'tasks.post_scheduled_updates',
                'schedule': timedelta(seconds=30),
            },
            'mom-every-30-seconds': {
                'task': 'tasks.send_emails_to_mom',
                'schedule': timedelta(seconds=30),
            },
        }
    

    Speed and optimization:

    为了获得更多的吞吐量,您可以在 post_scheduled_updates 调用期间迭代更新以发布和串行发送它们,而不是产生一堆子任务并且并行执行(给定足够的workers) . 然后,对 post_scheduled_updates 的调用运行得非常快,并安排了一大堆任务 - 每个fb更新一个 - 以尽快运行 . 这看起来像这样:

    # Task to send one update. This will be called by post_scheduled_updates.
    @celery.task
    def post_one_update(update_id):
        try:
            update = ScheduledPost.objects.get(id=update_id)
        except ScheduledPost.DoesNotExist:
            raise
        else:
            sent = post_to_facebook(update.text)
            if sent:
                update.sent = True
                update.save()
            return sent
    
    @celery.task
    def post_scheduled_updates():
        from celery import current_task
        scheduled_posts = ScheduledPost.objects.filter(
            sent=False,
            time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
            time__lte=timezone.now()
        )
        for post in scheduled_posts:
            post_one_update.delay(post.id)
    

    我发布的代码未经过测试,当然也未经过优化,但它应该让您走上正轨 . 在您的问题中,您暗示了对吞吐量的一些担忧,因此您需要仔细查看要优化的位置 . 一个显而易见的是批量更新,而不是迭代地调用 post.sent=True;post.save() .

    More info:

    有关定期任务的更多信息:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html .

    关于任务设计战略的一节:http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies

    这里有关于优化芹菜的整页:http://docs.celeryproject.org/en/latest/userguide/optimizing.html .

    这个关于子任务的页面也可能很有趣:http://docs.celeryproject.org/en/latest/userguide/canvas.html .

    事实上,我建议阅读所有芹菜文档 .

  • 9

    我要做的是创建一个名为ScheduledPost的模型 .

    我将有一个每5分钟左右运行一次的PeriodicTask .

    该任务将检查ScheduledPost表以查找需要推送到Facebook的任何帖子 .

相关问题