我有一个Flask应用程序,它应该在指定的路由上向用户显示长时间运行的函数的结果 . 结果即将每小时左右改变一次 . 为了避免用户不得不等待结果,我希望将它缓存在应用程序的某个地方并在后台以特定的时间间隔重新计算它(例如每小时),以便没有用户请求必须等待长期运行的计算功能 .
我想出来解决这个问题的想法如下,但是,我不能完全确定在具有多线程甚至多处理的网络服务器的 生产环境 环境中是否真的"safe",例如 waitress
, eventlet
, gunicorn
或什么不是 .
要在后台重新计算结果,我使用APScheduler library中的 BackgroundScheduler
.
然后将结果左追加到collections.deque对象中,该对象被注册为模块范围的变量(因为据我所知,在Flask应用程序中保存应用程序范围的全局变量的可能性更大?!) . 由于双端队列的最大大小设置为2,因此当新的结果进入时,旧结果将在双端队列的右侧弹出 .
Flask视图现在将 deque[0]
返回给请求者,该请求者应该始终是最新的结果 . 我决定 deque
超过 Queue
,因为后者没有内置的可能性来读取第一个项目而不删除它 .
因此,保证没有用户必须等待结果,因为旧的只在新的进入的那一刻才从“缓存”中消失 .
请参阅下面的最小示例 . 当运行脚本并点击 http://localhost:5000
时,可以看到缓存正在运行 - "Job finished at"应该永远不会超过10秒加上一些非常短的时间来重新计算它"Current time",仍然应该永远不必等待工作的_787759秒函数,直到请求返回 .
这是给定需求的有效实现,它也可以在 生产环境 就绪的WSGI服务器设置中工作,还是应该以不同方式完成?
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import time
import datetime
from collections import deque
# a global deque that is filled by APScheduler and read by a Flask view
deque = deque(maxlen=2)
# a function filling the deque that is executed in regular intervals by APScheduler
def some_long_running_job():
print('complicated long running job started...')
time.sleep(5)
job_finished_at = datetime.datetime.now()
deque.appendleft(job_finished_at)
# a function setting up the scheduler
def start_scheduler():
scheduler = BackgroundScheduler()
scheduler.add_job(some_long_running_job,
trigger='interval',
seconds=10,
next_run_time=datetime.datetime.utcnow(),
id='1',
name='Some Job name'
)
scheduler.start()
# a flask application
app = Flask(__name__)
# a flask route returning an item from the global deque
@app.route('/')
def display_job_result():
current_time = datetime.datetime.now()
job_finished_at = deque[0]
return '''
Current time is: {0} <br>
Job finished at: {1}
'''.format(current_time, job_finished_at)
# start the scheduler and flask server
if __name__ == '__main__':
start_scheduler()
app.run()