首页 文章

避免在Google App Engine中的任务队列中执行重复任务(或处理它们)

提问于
浏览
0

我有一个我在GAE上运行的服务 . 应用程序需要每3秒“勾选”一次以执行一系列计算 . 这是一款模拟型游戏 .

我有一个手动扩展的实例,我启动它使用 deferred API和任务队列像这样(一些错误处理等,为清楚起见删除):

@app.route('/_ah/start')
def start():
    log.info('Ticker instance started')
    return tick()

@app.route('/tick')
def tick():
    _do_tick()
    deferred.defer(tick, _countdown=3)
return 'Tick!', 200

问题是,有时我最终因为某种原因(可能是暂时性错误/超时导致任务被重新安排)被安排两次,并且我最终在任务队列中有多个任务,并且游戏每3次滴答多次-第二阶段 .

任何想法如何最好地处理这个?

据我所见,你不能问一个队列'那里有X的任务吗?'或者“目前队列中有多少项目?” .

我知道这用作推送队列,一个想法可能是切换到一个拉取队列,并将自动收报机租赁项目从队列中分组,按标签分组,这将获得所有这些项目,包括重复项目 . 会更好吗?

从本质上讲,我真正想要的只是一个类似cron的调度程序,每3秒安排一次,但我知道GAE上的调度程序可能不会达到该分辨率 .

我可以把所有东西都移到启动处理程序中,例如:

@app.route('/_ah/start')
def start():
    log.info('Ticker instance started')
while True:
        _do_tick()
        sleep(3)

return 200

但是从我看到的情况来看,日志不会像我这样更新,因为它被认为是一个永远不会完成的单个请求 . 这使得在日志中看到发生的事情变得有点困难 . 目前,我将每个单独的tick视为单独的请求日志条目 .

如果以上内容被杀,我还是需要让它重新安排自己 . 这可能不是太麻烦,因为我知道当实例即将关闭时你可以捕获异常,然后我可以触发延迟任务再次启动它 .

或者有没有更好的方法来处理GAE?

2 回答

  • 0

    我看不到检测/消除重复的方法,但现在使用不同的机制解决了这个问题 . 我不是依赖任务队列作为调度程序,而是在手动缩放的实例中运行自己的调度程序循环:

    TICKINTERVAL = 3
    
    @app.route('/_ah/start')
    def scheduler():
        log.info('Ticker instance started')
        while True:
            if game.is_running():
                task = taskqueue.add(
                    url='/v1/game/tick',
                    queue_name='tickqueue',
                    method='PUT',
                    target='tickworker',
                    )
            else:
                log.info('Tick skipped as game stopped')
            db_session.rollback()
            sleep(TICKINTERVAL)
    

    我在 queue.yaml 中定义了自己的队列 tickqueue

    queue:
    - name: tickqueue
      rate: 5/s
      max_concurrent_requests: 1
      retry_parameters:
        task_retry_limit: 0
        task_age_limit: 1m
    

    队列不会重试任务,任何超过一分钟的任务都会被取消 . 我将max concurrency设置为1,这样只会尝试一次处理一个项目 .

    如果偶尔的“滴答”时间超过3秒,那么它将备份到队列中,但如果队列再次加速,队列应该清除 . 如果刻度最终平均花费超过3秒,那么队列中已经超过一分钟的任务将被丢弃 .

    这样做的好处是我得到了每个tick的日志条目(它被称为 /v1/game/tick ,因此很容易发现,而不是 /_ah/deferred ) . 缺点是我需要为调度程序使用一个实例,为工作程序使用一个实例,因为您可以直到 /_ah/start 完成,这在此处从未执行过 .

  • 1

    您可以将 _retry_options 可选参数中的 task_retry_limit 值设置为0,如https://stackoverflow.com/a/36621588/4495081中所述 .

    麻烦的是,如果存在失败的正当理由,那么滴答作业将永远停止 . 您可能还想跟踪上次执行作业的时间,并有一个基于cron的健全性检查作业,以定期检查滴答仍在运行,如果没有,则重新启动它 .

相关问题