首页 文章

python asyncio,如何从另一个线程创建和取消任务

提问于
浏览
22

我有一个python多线程应用程序 . 我想在一个线程中运行一个asyncio循环,并从另一个线程发送回调和协同程序 . 应该很容易,但我无法理解asyncio的东西 .

我提出了以下解决方案,它可以完成我想要的一半,随意评论任何事情:

import asyncio
from threading import Thread

class B(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.loop = None

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop) #why do I need that??
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def add_task(self, coro):
        """this method should return a task object, that I
          can cancel, not a handle"""
        f = functools.partial(self.loop.create_task, coro)
        return self.loop.call_soon_threadsafe(f)

    def cancel_task(self, xx):
        #no idea

@asyncio.coroutine
def test():
    while True:
        print("running")
        yield from asyncio.sleep(1)

b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?

b.stop()

所以启动和停止循环工作正常 . 我考虑过使用create_task创建任务,但该方法不是线程安全的,所以我把它包装在call_soon_threadsafe中 . 但我希望能够获取任务对象以便能够取消任务 . 我可以使用Future和Condition做一些复杂的事情,但必须有一个更简单的方法,不是吗?

4 回答

  • 6

    我想你可能需要让你的 add_task 方法知道是否从同一个线程调用事件循环's. That way, if it'以外的线程调用它,你可以直接调用 asyncio.async ,否则,它可以做一些额外的工作从循环's thread to the calling thread. Here'传递任务的一个例子:

    import time
    import asyncio
    import functools
    from threading import Thread, current_thread, Event
    from concurrent.futures import Future
    
    class B(Thread):
        def __init__(self, start_event):
            Thread.__init__(self)
            self.loop = None
            self.tid = None
            self.event = start_event
    
        def run(self):
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
            self.tid = current_thread()
            self.loop.call_soon(self.event.set)
            self.loop.run_forever()
    
        def stop(self):
            self.loop.call_soon_threadsafe(self.loop.stop)
    
        def add_task(self, coro):
            """this method should return a task object, that I
              can cancel, not a handle"""
            def _async_add(func, fut):
                try:
                    ret = func()
                    fut.set_result(ret)
                except Exception as e:
                    fut.set_exception(e)
    
            f = functools.partial(asyncio.async, coro, loop=self.loop)
            if current_thread() == self.tid:
                return f() # We can call directly if we're not going between threads.
            else:
                # We're in a non-event loop thread so we use a Future
                # to get the task from the event loop thread once
                # it's ready.
                fut = Future()
                self.loop.call_soon_threadsafe(_async_add, f, fut)
                return fut.result()
    
        def cancel_task(self, task):
            self.loop.call_soon_threadsafe(task.cancel)
    
    
    @asyncio.coroutine
    def test():
        while True:
            print("running")
            yield from asyncio.sleep(1)
    
    event = Event()
    b = B(event)
    b.start()
    event.wait() # Let the loop's thread signal us, rather than sleeping
    t = b.add_task(test()) # This is a real task
    time.sleep(10)
    b.stop()
    

    首先,我们在 run 方法中保存事件循环的线程id,因此我们可以确定以后是否来自其他线程的 add_task 调用 . 如果从非事件循环线程调用 add_task ,我们使用 call_soon_threadsafe 调用一个函数来调度协程,然后使用 concurrent.futures.Future 将任务传递回调用线程,调用线程等待 Future 的结果 .

    关于取消任务的注意事项:当您在 Task 上调用 cancel 时,将在下次运行事件循环时在协同程序中引发 CancelledError . 这意味着任务正在包装的协程将在下次达到屈服点时因异常而中止 - 除非协程捕获 CancelledError 并防止其自身中止 . 另请注意,这仅适用于被包装的函数实际上是可中断的协程;例如,由 BaseEventLoop.run_in_executor 返回的 asyncio.Future 实际上't really be cancelled, because it'可以包围 concurrent.futures.Future ,并且一旦它们的基础函数实际开始执行就无法取消 . 在这些情况下, asyncio.Future 会将其取消,但实际运行在执行程序中的函数将继续运行 .

    Edit: 根据Andrew Svetlov的建议,更新了使用 concurrent.futures.Future 而不是 queue.Queue 的第一个示例 .

    注意:不推荐使用asyncio.async,因为版本3.4.4使用了asyncio.ensure_future .

  • 2

    你做的一切都是正确的对于任务停止make方法

    class B(Thread):
        # ...
        def cancel(self, task):
            self.loop.call_soon_threadsafe(task.cancel)
    

    顺便说一下,你必须明确地为创建的线程设置一个事件循环

    self.loop = asyncio.new_event_loop()
    asyncio.set_event_loop(self.loop)
    

    因为 asyncio 仅为主线程创建隐式事件循环 .

  • 16

    这里只是为了参考它我最终基于我在这个网站上获得的帮助实现的代码,它更简单,因为我不需要所有功能 . 再次感谢!

    import asyncio
    from threading import Thread
    from concurrent.futures import Future
    import functools
    
    class B(Thread):
        def __init__(self):
            Thread.__init__(self)
            self.loop = None
    
        def run(self):
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
            self.loop.run_forever()
    
        def stop(self):
            self.loop.call_soon_threadsafe(self.loop.stop)
    
        def _add_task(self, future, coro):
            task = self.loop.create_task(coro)
            future.set_result(task)
    
        def add_task(self, coro):
            future = Future()
            p = functools.partial(self._add_task, future, coro)
            self.loop.call_soon_threadsafe(p)
            return future.result() #block until result is available
    
        def cancel(self, task):
            self.loop.call_soon_threadsafe(task.cancel)
    
  • 5

    从版本3.4.4开始, asyncio 提供了一个名为run_coroutine_threadsafe的函数,用于将一个coroutine对象从一个线程提交到一个事件循环 . 它返回concurrent.futures.Future以访问结果或取消任务 .

    使用你的例子:

    @asyncio.coroutine
    def test(loop):
        try:
            while True:
                print("Running")
                yield from asyncio.sleep(1, loop=loop)
        except asyncio.CancelledError:
            print("Cancelled")
            loop.stop()
            raise
    
    loop = asyncio.new_event_loop()
    thread = threading.Thread(target=loop.run_forever)
    future = asyncio.run_coroutine_threadsafe(test(loop), loop)
    
    thread.start()
    time.sleep(5)
    future.cancel()
    thread.join()
    

相关问题