我有两个共享某些状态的Python线程, A
和 B
. 有一次, A
在其循环上提交一个由 B
运行的回调,例如:
# This line is executed by A
loop.call_soon_threadsafe(callback)
在此之后,我想继续做其他事情,但我想确保 callback
在执行之前已经由 callback
运行 . 有没有办法(除了标准的线程同步原语)使 A
等待回调的完成?我知道 call_soon_threadsafe
返回一个可以取消任务的 asyncio.Handle
对象,但我不确定这是否可以用于等待(我仍然不太了解 asyncio
) .
在这种情况下,此回调调用 loop.close()
并取消剩余的任务,之后,在 B
中,在 loop.run_forever()
之后有一个 loop.close()
. 因此,对于这个用例,特别是一个线程安全的机制,允许我从 A
知道循环何时被有效关闭也适用于我 - 再次,不涉及互斥/条件变量/等 .
我知道 asyncio
并不意味着是线程安全的,除了极少数例外,但我想知道是否提供了实现此目的的便捷方法 .
这是一个非常小的片段,我的意思是它有帮助 .
import asyncio
import threading
import time
def thread_A():
print('Thread A')
loop = asyncio.new_event_loop()
threading.Thread(target=thread_B, args=(loop,)).start()
time.sleep(1)
handle = loop.call_soon_threadsafe(callback, loop)
# How do I wait for the callback to complete before continuing?
print('Thread A out')
def thread_B(loop):
print('Thread B')
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
print('Thread B out')
def callback(loop):
print('Stopping loop')
loop.stop()
thread_A()
我已尝试使用asyncio.run_coroutine_threadsafe进行此变体,但它不起作用,而是线程 A
永远挂起 . 不确定我做错了什么,或者是因为我正在停止循环 .
import asyncio
import threading
import time
def thread_A():
global future
print('Thread A')
loop = asyncio.new_event_loop()
threading.Thread(target=thread_B, args=(loop,)).start()
time.sleep(1)
future = asyncio.run_coroutine_threadsafe(callback(loop), loop)
future.result() # Hangs here
print('Thread A out')
def thread_B(loop):
print('Thread B')
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
print('Thread B out')
async def callback(loop):
print('Stopping loop')
loop.stop()
thread_A()
2 回答
设置回调并且(大部分)忘记 . 它们不适用于从中获取结果所需的内容 . 这就是为什么生成的句柄只允许你取消回调(不再需要这个回调),仅此而已 .
如果您需要在另一个线程中等待来自asyncio管理的协同程序的结果,请使用协同程序并使用asyncio.run_coroutine_threadsafe()将其安排为任务;这给你一个Future() instance,然后你可以等待完成 .
但是,使用
run_coroutine_threadsafe()
停止循环确实需要循环来处理比实际能够运行的更多回调;否则run_coroutine_threadsafe()
返回的Future
将不会被通知它所安排的任务的状态更改 . 您可以通过在关闭循环之前在线程B中运行asyncio.sleep(0)
到loop.run_until_complete()
来解决此问题:当然,这有点hacky,并且取决于回调管理和跨线程任务调度的内部不会改变 . 作为默认的
asyncio
实现,运行单个noop任务足以进行多轮回调,从而创建更多回调,但是替代循环实现可能会以不同方式处理 .因此,对于关闭循环,您可能最好使用基于线程的协调:
通常你会像Martijn最初建议的那样使用
run_coroutine_threadsafe
. 但是你使用loop.stop()
会使回调变得有些具体 . 鉴于此,您可能最好使用标准线程同步原语,在这种情况下非常简单,并且可以完全与回调实现和其余代码分离 . 例如:而不是使用
loop.call_soon_threadsafe(callback)
,而是使用submit_and_wait(loop, callback)
. 线程同步就在那里,但完全隐藏在submit_and_wait
中 .