我正在尝试解决以下问题:
-
我有一系列"tasks"我想执行
-
我有一定数量的工作人员来执行这些工作程序(因为他们使用urlfetch调用外部API,并且对此API的并行调用数量有限)
-
我希望这些"tasks"被执行"as soon as possible"(即最小延迟)
-
这些任务是较大任务的一部分,可以根据原始任务的大小进行分类(即,一个小的原始任务可能会生成1到100个任务,一个中等的100到1000个,一个大的任务超过1000个) .
棘手的部分:我希望有效地完成所有这些工作(即最小延迟并尽可能多地使用并行API调用 - 不超过限制),但同时尝试防止生成的大量任务“大的“原始任务,以延迟从”小“原始任务生成的任务 .
换句话说:我希望为每个任务分配一个“优先级”,其中“小”任务具有更高的优先级,从而防止“大”任务的饥饿 .
有些搜索似乎并不表示任何预先制作的东西都可用,所以我想出了以下内容:
-
创建三个推送队列:
tasks-small
,tasks-medium
,tasks-large
-
为每个设置最大并发请求数,使得总数是并发API调用的最大数量(例如,如果最大数量并发API调用为200,我可以将
tasks-small
设置为max_concurrent_requests
为30,tasks-medium
60和tasks-large
100) -
在排队任务时,检查否 . 每个队列中的待处理任务(使用QueueStatistics类之类的东西),如果其他队列未被100%利用,则将任务排入队列,否则只需将队列中的任务排入队列并使用相应的大小 .
例如,如果我们有任务 T1
这是一个小任务的一部分,首先检查 tasks-small
是否有空"slots"并将其排入那里 . 否则请检查 tasks-medium
和 tasks-large
. 如果它们都没有空闲插槽,那么无论如何都要将它排入 tasks-small
并在处理之前添加任务后处理它(注意:这不是最佳的,因为如果"slots"在其他队列上释放,它们仍然不会处理待处理来自 tasks-small
队列的任务)
另一种选择是使用PULL队列,并根据优先级从该队列中拉出一个“协调器”并调度它们,但这似乎会增加一点延迟 .
然而,这似乎有点hackish,我想知道是否有更好的替代品 .
编辑:经过一些思考和反馈之后,我正在考虑以下列方式使用PULL队列:
-
有两个PULL队列(
medium-tasks
和large-tasks
) -
有一个并发为1的调度程序(PUSH)队列(因此任何时候只能运行一个调度任务) . 调度任务以多种方式创建:
-
每分钟一次的cron工作
将中/大任务添加到推送队列后
工作任务完成后
- 有一个worker(PUSH)队列,其并发数等于worker数
而工作流程:
-
小任务直接添加到工作队列
-
调度程序任务,无论何时触发,都执行以下操作:
-
估计免费工作者的数量(通过查看工作队列中正在运行的任务的数量)
-
对于任何"free"槽它从中型/大型任务PULL队列中获取任务并将其排入工作者(或者更确切地说:将其添加到工作程序PUSH队列,这将导致它被执行 - 最终 - 在工作者上) .
一旦实施并且至少经过适度测试,我会报告回来 .
3 回答
小/中/大原始任务队列本身无济于事 - 一旦原始任务入队,它们将继续产生工作任务,甚至可能破坏工作人员任务队列大小限制 . 因此,您需要调整/控制原始任务的排队 .
我将跟踪数据存储区/ GCS中的"todo"原始任务,并从定期任务,cron作业或延迟任务中排队这些原始任务 only when the respective queue size is sufficiently low (1 or maybe 2 pending jobs) (取决于执行原始任务排队所需的速率) )它将像推送队列调度程序一样实现所需的调步和优先级逻辑,但没有你提到的额外延迟 .
我没有使用过拉队列,但根据我的理解,它们可以很好地适合您的用例 . 您可以定义3个拉取队列,并让
X
工作人员从他们中拉出任务,首先尝试"small"队列然后继续"medium"如果它是空的(其中X
是您的最大并发) . 你不应该需要一个中央调度员 .但是,即使没有任务(或
X / threadsPerMachine
?),您也可以为X
Worker 付款,或者自己缩小规模 .所以,这是另一个想法:使用正确的
maximum concurrency
创建一个推送队列 . 收到新任务时,将其信息推送到 datastore 并排队 generic 作业 . 然后,该通用作业将查询数据存储区,以优先级顺序查找任务,执行它找到的第一个任务 . 这样,即使该作业已经从大型任务中排队,下一个作业仍将执行短任务 .编辑:我现在迁移到一个更简单的解决方案,类似于@ eric-simonton所描述的:
我有多个PULL队列,每个队列对应一个优先级
许多工作人员拉动 endpoints (处理程序)
处理程序生成一个随机数并执行一个简单的"if less than 0.6, try first the small queue and then the large queue, else vice-versa (large then small)"
如果工作人员没有完成任务或错误,他们会进行半随机指数退避,直到达到最大超时(即,他们每隔1秒开始拉动一次,并在每次空拉动达到30秒后大约加倍超时)
最后一点是必要的 - 除其他原因外 - 因为PULL队列中的拉/秒数限制为10k / s:https://cloud.google.com/appengine/docs/python/taskqueue/overview-pull#Python_Leasing_tasks
我实现了UPDATE中描述的解决方案:
两个PULL队列(中型任务和大型任务)
一个并发为1的调度程序(PUSH)队列
一个并发等于worker数的worker(PUSH)队列
有关详细信息,请参阅问题 . 一些说明:
由于最终的一致性,任务可见性有一些延迟(即,调度员任务有时从拉队列中完成任务,另一个将在以后出现)
确保为每项任务命名,以消除双重调度的可能性
你不能从PULL队列租赁0件物品:-)
批处理操作有一个上限,因此您必须在批处理任务队列调用上进行自己的批处理
似乎没有办法以编程方式获取队列的"maximum parallelism"值,所以我不得不在调度程序中对其进行硬编码(以计算它可以安排多少任务)
如果队列中已经有一些(至少10个),则不添加调度程序任务