首页 文章

在谷歌应用引擎上使用任务队列时,如何优先处理任务?

提问于
浏览
3

我正在尝试解决以下问题:

  • 我有一系列"tasks"我想执行

  • 我有一定数量的工作人员来执行这些工作程序(因为他们使用urlfetch调用外部API,并且对此API的并行调用数量有限)

  • 我希望这些"tasks"被执行"as soon as possible"(即最小延迟)

  • 这些任务是较大任务的一部分,可以根据原始任务的大小进行分类(即,一个小的原始任务可能会生成1到100个任务,一个中等的100到1000个,一个大的任务超过1000个) .

棘手的部分:我希望有效地完成所有这些工作(即最小延迟并尽可能多地使用并行API调用 - 不超过限制),但同时尝试防止生成的大量任务“大的“原始任务,以延迟从”小“原始任务生成的任务 .

换句话说:我希望为每个任务分配一个“优先级”,其中“小”任务具有更高的优先级,从而防止“大”任务的饥饿 .

有些搜索似乎并不表示任何预先制作的东西都可用,所以我想出了以下内容:

  • 创建三个推送队列: tasks-smalltasks-mediumtasks-large

  • 为每个设置最大并发请求数,使得总数是并发API调用的最大数量(例如,如果最大数量并发API调用为200,我可以将 tasks-small 设置为 max_concurrent_requests 为30, tasks-medium 60和 tasks-large 100)

  • 在排队任务时,检查否 . 每个队列中的待处理任务(使用QueueStatistics类之类的东西),如果其他队列未被100%利用,则将任务排入队列,否则只需将队列中的任务排入队列并使用相应的大小 .

例如,如果我们有任务 T1 这是一个小任务的一部分,首先检查 tasks-small 是否有空"slots"并将其排入那里 . 否则请检查 tasks-mediumtasks-large . 如果它们都没有空闲插槽,那么无论如何都要将它排入 tasks-small 并在处理之前添加任务后处理它(注意:这不是最佳的,因为如果"slots"在其他队列上释放,它们仍然不会处理待处理来自 tasks-small 队列的任务)

另一种选择是使用PULL队列,并根据优先级从该队列中拉出一个“协调器”并调度它们,但这似乎会增加一点延迟 .

然而,这似乎有点hackish,我想知道是否有更好的替代品 .


编辑:经过一些思考和反馈之后,我正在考虑以下列方式使用PULL队列:

  • 有两个PULL队列( medium-taskslarge-tasks

  • 有一个并发为1的调度程序(PUSH)队列(因此任何时候只能运行一个调度任务) . 调度任务以多种方式创建:

  • 每分钟一次的cron工作
    将中/大任务添加到推送队列后

工作任务完成后

  • 有一个worker(PUSH)队列,其并发数等于worker数

而工作流程:

  • 小任务直接添加到工作队列

  • 调度程序任务,无论何时触发,都执行以下操作:

  • 估计免费工作者的数量(通过查看工作队列中正在运行的任务的数量)

  • 对于任何"free"槽它从中型/大型任务PULL队列中获取任务并将其排入工作者(或者更确切地说:将其添加到工作程序PUSH队列,这将导致它被执行 - 最终 - 在工作者上) .

一旦实施并且至少经过适度测试,我会报告回来 .

3 回答

  • 0

    小/中/大原始任务队列本身无济于事 - 一旦原始任务入队,它们将继续产生工作任务,甚至可能破坏工作人员任务队列大小限制 . 因此,您需要调整/控制原始任务的排队 .

    我将跟踪数据存储区/ GCS中的"todo"原始任务,并从定期任务,cron作业或延迟任务中排队这些原始任务 only when the respective queue size is sufficiently low (1 or maybe 2 pending jobs) (取决于执行原始任务排队所需的速率) )它将像推送队列调度程序一样实现所需的调步和优先级逻辑,但没有你提到的额外延迟 .

  • 1

    我没有使用过拉队列,但根据我的理解,它们可以很好地适合您的用例 . 您可以定义3个拉取队列,并让 X 工作人员从他们中拉出任务,首先尝试"small"队列然后继续"medium"如果它是空的(其中 X 是您的最大并发) . 你不应该需要一个中央调度员 .

    但是,即使没有任务(或 X / threadsPerMachine ?),您也可以为 X Worker 付款,或者自己缩小规模 .

    所以,这是另一个想法:使用正确的 maximum concurrency 创建一个推送队列 . 收到新任务时,将其信息推送到 datastore 并排队 generic 作业 . 然后,该通用作业将查询数据存储区,以优先级顺序查找任务,执行它找到的第一个任务 . 这样,即使该作业已经从大型任务中排队,下一个作业仍将执行短任务 .

  • 1

    编辑:我现在迁移到一个更简单的解决方案,类似于@ eric-simonton所描述的:

    • 我有多个PULL队列,每个队列对应一个优先级

    • 许多工作人员拉动 endpoints (处理程序)

    • 处理程序生成一个随机数并执行一个简单的"if less than 0.6, try first the small queue and then the large queue, else vice-versa (large then small)"

    • 如果工作人员没有完成任务或错误,他们会进行半随机指数退避,直到达到最大超时(即,他们每隔1秒开始拉动一次,并在每次空拉动达到30秒后大约加倍超时)

    最后一点是必要的 - 除其他原因外 - 因为PULL队列中的拉/秒数限制为10k / s:https://cloud.google.com/appengine/docs/python/taskqueue/overview-pull#Python_Leasing_tasks


    我实现了UPDATE中描述的解决方案:

    • 两个PULL队列(中型任务和大型任务)

    • 一个并发为1的调度程序(PUSH)队列

    • 一个并发等于worker数的worker(PUSH)队列

    有关详细信息,请参阅问题 . 一些说明:

    • 由于最终的一致性,任务可见性有一些延迟(即,调度员任务有时从拉队列中完成任务,另一个将在以后出现)

    • 确保为每项任务命名,以消除双重调度的可能性

    • 你不能从PULL队列租赁0件物品:-)

    • 批处理操作有一个上限,因此您必须在批处理任务队列调用上进行自己的批处理

    • 似乎没有办法以编程方式获取队列的"maximum parallelism"值,所以我不得不在调度程序中对其进行硬编码(以计算它可以安排多少任务)

    • 如果队列中已经有一些(至少10个),则不添加调度程序任务

相关问题