首页 文章

Google App Engine:如何使用任务队列进行此处理?

提问于
浏览
3

我正在使用Python GAE SDK .

我需要在6000个 MyKind 实例上进行一些处理 . 在单个请求中执行速度太慢,因此我正在使用任务队列 . 如果我只对一个实体进行单个任务处理,那么它应该只需几秒钟 .

documentation表示在"batch"中只能添加100个任务 . (他们的意思是什么?在一个请求中?在一个任务中?)

因此,假设“批处理”意味着“请求”,我试图找出为数据存储区中的每个实体创建任务的最佳方法 . 你怎么看?

如果我可以假设 MyKind 的顺序永远不会改变,那就容易多了 . (处理将永远不会实际更改 MyKind 实例 - 它只会创建其他类型的新实例 . )我可以创建一堆任务,给每个任务一个偏移量,从哪里开始,间隔小于100 . 然后,每个任务可以创建执行实际处理的单个任务 .

但是如果有这么多实体原始请求无法添加所有必要的调度任务呢?这让我觉得我需要一个递归解决方案 - 每个任务都会查看它给出的范围 . 如果范围中只有一个元素,则对其进行处理 . 否则,它将范围进一步细分为后续任务 .

如果我不能指望使用偏移和限制来识别实体(因为它们的排序不能保证不变),也许我可以使用他们的密钥?但后来我可以发送1000个钥匙,这看起来很笨重 .

我是在正确的道路上走下去,还是我应该考虑另一种设计?

2 回答

  • 9

    当您运行像 taskqueue.add(url='/worker', params={'cursor': cursor}) 这样的代码时,您正在排队任务;使用您提供的参数安排带外执行请求 . 显然,您可以在一次操作中安排最多100个这样的操作 .

    不过,我认为你不想 . 任务链将使这更简单:

    你的工作任务会做这样的事情:

    • 运行查询以获取某些记录以进行处理 . 如果任务参数中提供了游标,请使用它 . 将查询限制为10条记录,或者您认为可以在30秒内完成的任何记录 .

    • 处理您的10条记录

    • 如果您的查询返回了10条记录,则将其他任务排入队列并从查询中传递更新的光标,以便它可以从您上次停止的地方继续 .

    • 如果记录少于10条,那就完成了 . 万岁!发射电子邮件或其他东西然后退出 .

    通过这条路线,您只需启动第一个任务,其余任务自行添加 .

    请注意,如果任务失败,App Engine将重试它,直到成功为止,因此您无需担心数据存储打嗝导致一个任务超时并中断链 .

    Edit:

    上述步骤不保证实体只会被处理一次 . 任务通常只应运行一次,但Google会建议您设计幂等性 . 如果这是一个主要问题,这是处理它的一种方法:

    • 在要处理的每个实体上放置一个状态标志,或者创建一个互补的实体来保存该标志 . 它应该具有类似于Pending,Processing和Processed的状态 .

    • 当您获取要处理的新实体时,事务性锁定并递增处理标志 . 仅在实体处于待定状态时运行该实体 . 处理完成后,再次递增标志 .

    请注意,在开始之前,不必将处理标志添加到每个实体 . 您的“待处理”状态只是意味着该 property 或相应的实体尚不存在 .

  • 0

    另外,根据您的设计,您可以执行我所做的操作,这是所有需要处理的记录的编号 . 我处理了大约3500个项目,每个项目需要3秒左右的时间来处理 . 为了避免重叠,超时和考虑将来扩展,我的第一个任务从数据库中获取该类型的所有唯一项目的列表 . 然后将它分成500个每个项目标识符的列表,循环直到它占据我数据库中的所有唯一项目,并将每个500个标识符块发布到第二层处理程序任务 . 每个第二个处理程序任务,当前是七个或八个不同的任务,然后具有500个项目的唯一列表,并且每个处理程序任务添加500个任务,每个唯一标识符一个 .

    因为它全部通过循环进行管理并根据我数据库中的唯一项目数进行计数,然后我可以添加任意数量的唯一项目,并且任务数量将会扩展以适应它们绝对没有重复 . 我用它来每天跟踪游戏中的价格,因此它全部用cron作业解雇,根本不需要我干预 .

相关问题