我有一个我在GAE上运行的服务 . 应用程序需要每3秒“勾选”一次以执行一系列计算 . 这是一款模拟型游戏 .
我有一个手动扩展的实例,我启动它使用 deferred
API和任务队列像这样(一些错误处理等,为清楚起见删除):
@app.route('/_ah/start')
def start():
log.info('Ticker instance started')
return tick()
@app.route('/tick')
def tick():
_do_tick()
deferred.defer(tick, _countdown=3)
return 'Tick!', 200
问题是,有时我最终因为某种原因(可能是暂时性错误/超时导致任务被重新安排)被安排两次,并且我最终在任务队列中有多个任务,并且游戏每3次滴答多次-第二阶段 .
任何想法如何最好地处理这个?
据我所见,你不能问一个队列'那里有X的任务吗?'或者“目前队列中有多少项目?” .
我知道这用作推送队列,一个想法可能是切换到一个拉取队列,并将自动收报机租赁项目从队列中分组,按标签分组,这将获得所有这些项目,包括重复项目 . 会更好吗?
从本质上讲,我真正想要的只是一个类似cron的调度程序,每3秒安排一次,但我知道GAE上的调度程序可能不会达到该分辨率 .
我可以把所有东西都移到启动处理程序中,例如:
@app.route('/_ah/start')
def start():
log.info('Ticker instance started')
while True:
_do_tick()
sleep(3)
return 200
但是从我看到的情况来看,日志不会像我这样更新,因为它被认为是一个永远不会完成的单个请求 . 这使得在日志中看到发生的事情变得有点困难 . 目前,我将每个单独的tick视为单独的请求日志条目 .
如果以上内容被杀,我还是需要让它重新安排自己 . 这可能不是太麻烦,因为我知道当实例即将关闭时你可以捕获异常,然后我可以触发延迟任务再次启动它 .
或者有没有更好的方法来处理GAE?
2 回答
我看不到检测/消除重复的方法,但现在使用不同的机制解决了这个问题 . 我不是依赖任务队列作为调度程序,而是在手动缩放的实例中运行自己的调度程序循环:
我在
queue.yaml
中定义了自己的队列tickqueue
队列不会重试任务,任何超过一分钟的任务都会被取消 . 我将max concurrency设置为1,这样只会尝试一次处理一个项目 .
如果偶尔的“滴答”时间超过3秒,那么它将备份到队列中,但如果队列再次加速,队列应该清除 . 如果刻度最终平均花费超过3秒,那么队列中已经超过一分钟的任务将被丢弃 .
这样做的好处是我得到了每个tick的日志条目(它被称为
/v1/game/tick
,因此很容易发现,而不是/_ah/deferred
) . 缺点是我需要为调度程序使用一个实例,为工作程序使用一个实例,因为您可以直到/_ah/start
完成,这在此处从未执行过 .您可以将
_retry_options
可选参数中的task_retry_limit
值设置为0,如https://stackoverflow.com/a/36621588/4495081中所述 .麻烦的是,如果存在失败的正当理由,那么滴答作业将永远停止 . 您可能还想跟踪上次执行作业的时间,并有一个基于cron的健全性检查作业,以定期检查滴答仍在运行,如果没有,则重新启动它 .