我有一个python多线程应用程序 . 我想在一个线程中运行一个asyncio循环,并从另一个线程发送回调和协同程序 . 应该很容易,但我无法理解asyncio的东西 .
我提出了以下解决方案,它可以完成我想要的一半,随意评论任何事情:
import asyncio
from threading import Thread
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) #why do I need that??
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
f = functools.partial(self.loop.create_task, coro)
return self.loop.call_soon_threadsafe(f)
def cancel_task(self, xx):
#no idea
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?
b.stop()
所以启动和停止循环工作正常 . 我考虑过使用create_task创建任务,但该方法不是线程安全的,所以我把它包装在call_soon_threadsafe中 . 但我希望能够获取任务对象以便能够取消任务 . 我可以使用Future和Condition做一些复杂的事情,但必须有一个更简单的方法,不是吗?
4 回答
我想你可能需要让你的
add_task
方法知道是否从同一个线程调用事件循环's. That way, if it'以外的线程调用它,你可以直接调用asyncio.async
,否则,它可以做一些额外的工作从循环's thread to the calling thread. Here'传递任务的一个例子:首先,我们在
run
方法中保存事件循环的线程id,因此我们可以确定以后是否来自其他线程的add_task
调用 . 如果从非事件循环线程调用add_task
,我们使用call_soon_threadsafe
调用一个函数来调度协程,然后使用concurrent.futures.Future
将任务传递回调用线程,调用线程等待Future
的结果 .关于取消任务的注意事项:当您在
Task
上调用cancel
时,将在下次运行事件循环时在协同程序中引发CancelledError
. 这意味着任务正在包装的协程将在下次达到屈服点时因异常而中止 - 除非协程捕获CancelledError
并防止其自身中止 . 另请注意,这仅适用于被包装的函数实际上是可中断的协程;例如,由BaseEventLoop.run_in_executor
返回的asyncio.Future
实际上't really be cancelled, because it'可以包围concurrent.futures.Future
,并且一旦它们的基础函数实际开始执行就无法取消 . 在这些情况下,asyncio.Future
会将其取消,但实际运行在执行程序中的函数将继续运行 .Edit: 根据Andrew Svetlov的建议,更新了使用
concurrent.futures.Future
而不是queue.Queue
的第一个示例 .注意:不推荐使用asyncio.async,因为版本3.4.4使用了asyncio.ensure_future .
你做的一切都是正确的对于任务停止make方法
顺便说一下,你必须明确地为创建的线程设置一个事件循环
因为
asyncio
仅为主线程创建隐式事件循环 .这里只是为了参考它我最终基于我在这个网站上获得的帮助实现的代码,它更简单,因为我不需要所有功能 . 再次感谢!
从版本3.4.4开始,
asyncio
提供了一个名为run_coroutine_threadsafe的函数,用于将一个coroutine对象从一个线程提交到一个事件循环 . 它返回concurrent.futures.Future以访问结果或取消任务 .使用你的例子: