首页 文章

你如何等待从另一个线程提交的回调完成?

提问于
浏览
3

我有两个共享某些状态的Python线程, AB . 有一次, A 在其循环上提交一个由 B 运行的回调,例如:

# This line is executed by A
loop.call_soon_threadsafe(callback)

在此之后,我想继续做其他事情,但我想确保 callback 在执行之前已经由 callback 运行 . 有没有办法(除了标准的线程同步原语)使 A 等待回调的完成?我知道 call_soon_threadsafe 返回一个可以取消任务的 asyncio.Handle 对象,但我不确定这是否可以用于等待(我仍然不太了解 asyncio ) .

在这种情况下,此回调调用 loop.close() 并取消剩余的任务,之后,在 B 中,在 loop.run_forever() 之后有一个 loop.close() . 因此,对于这个用例,特别是一个线程安全的机制,允许我从 A 知道循环何时被有效关闭也适用于我 - 再次,不涉及互斥/条件变量/等 .

我知道 asyncio 并不意味着是线程安全的,除了极少数例外,但我想知道是否提供了实现此目的的便捷方法 .


这是一个非常小的片段,我的意思是它有帮助 .

import asyncio
import threading
import time

def thread_A():
    print('Thread A')
    loop = asyncio.new_event_loop()
    threading.Thread(target=thread_B, args=(loop,)).start()
    time.sleep(1)
    handle = loop.call_soon_threadsafe(callback, loop)
    # How do I wait for the callback to complete before continuing?
    print('Thread A out')

def thread_B(loop):
    print('Thread B')
    asyncio.set_event_loop(loop)
    loop.run_forever()
    loop.close()
    print('Thread B out')

def callback(loop):
    print('Stopping loop')
    loop.stop()

thread_A()

我已尝试使用asyncio.run_coroutine_threadsafe进行此变体,但它不起作用,而是线程 A 永远挂起 . 不确定我做错了什么,或者是因为我正在停止循环 .

import asyncio
import threading
import time

def thread_A():
    global future
    print('Thread A')
    loop = asyncio.new_event_loop()
    threading.Thread(target=thread_B, args=(loop,)).start()
    time.sleep(1)
    future = asyncio.run_coroutine_threadsafe(callback(loop), loop)
    future.result()  # Hangs here
    print('Thread A out')

def thread_B(loop):
    print('Thread B')
    asyncio.set_event_loop(loop)
    loop.run_forever()
    loop.close()
    print('Thread B out')

async def callback(loop):
    print('Stopping loop')
    loop.stop()

thread_A()

2 回答

  • 3

    设置回调并且(大部分)忘记 . 它们不适用于从中获取结果所需的内容 . 这就是为什么生成的句柄只允许你取消回调(不再需要这个回调),仅此而已 .

    如果您需要在另一个线程中等待来自asyncio管理的协同程序的结果,请使用协同程序并使用asyncio.run_coroutine_threadsafe()将其安排为任务;这给你一个Future() instance,然后你可以等待完成 .

    但是,使用 run_coroutine_threadsafe() 停止循环确实需要循环来处理比实际能够运行的更多回调;否则 run_coroutine_threadsafe() 返回的 Future 将不会被通知它所安排的任务的状态更改 . 您可以通过在关闭循环之前在线程B中运行 asyncio.sleep(0)loop.run_until_complete() 来解决此问题:

    def thread_A():
        # ... 
        # when done, schedule the asyncio loop to exit
        future = asyncio.run_coroutine_threadsafe(shutdown_loop(loop), loop)
        future.result()  # wait for the shutdown to complete
        print("Thread A out")
    
    def thread_B(loop):
        print("Thread B")
        asyncio.set_event_loop(loop)
        loop.run_forever()
        # run one last noop task in the loop to clear remaining callbacks
        loop.run_until_complete(asyncio.sleep(0))
        loop.close()
        print("Thread B out")
    
    async def shutdown_loop(loop):
        print("Stopping loop")
        loop.stop()
    

    当然,这有点hacky,并且取决于回调管理和跨线程任务调度的内部不会改变 . 作为默认的 asyncio 实现,运行单个noop任务足以进行多轮回调,从而创建更多回调,但是替代循环实现可能会以不同方式处理 .

    因此,对于关闭循环,您可能最好使用基于线程的协调:

    def thread_A():
        # ...
        callback_event = threading.Event()
        loop.call_soon_threadsafe(callback, loop, callback_event)
        callback_event.wait()  # wait for the shutdown to complete
        print("Thread A out")
    
    def thread_B(loop):
        print("Thread B")
        asyncio.set_event_loop(loop)
        loop.run_forever()
        loop.close()
        print("Thread B out")
    
    def callback(loop, callback_event):
        print("Stopping loop")
        loop.stop()
        callback_event.set()
    
  • 1

    有没有办法(除了标准的线程同步原语)让A等待回调的完成?

    通常你会像Martijn最初建议的那样使用 run_coroutine_threadsafe . 但是你使用 loop.stop() 会使回调变得有些具体 . 鉴于此,您可能最好使用标准线程同步原语,在这种情况下非常简单,并且可以完全与回调实现和其余代码分离 . 例如:

    def submit_and_wait(loop, fn, *args):
        "Submit fn(*args) to loop, and wait until the callback executes."
        done = threading.Event()
        def wrap_fn():
            try:
                fn(*args)
            finally:
                done.set()
        loop.call_soon_threadsafe(wrap_fn)
        done.wait()
    

    而不是使用 loop.call_soon_threadsafe(callback) ,而是使用 submit_and_wait(loop, callback) . 线程同步就在那里,但完全隐藏在 submit_and_wait 中 .

相关问题