首页 文章

有没有办法手动打开asyncio事件循环

提问于
浏览
0

我想使用事件循环来监视任何插入数据到我的asyncio.Queue(你可以在这里找到它的源代码https://github.com/python/cpython/blob/3.6/Lib/asyncio/queues.py),但我遇到了一些问题 . 以下是以下代码:

import asyncio
import threading

async def recv(q):
    while True:
        msg = await q.get()
        print(msg)

async def checking_task():
    while True:
        await asyncio.sleep(0.1)

def loop_in_thread(loop,q):
    asyncio.set_event_loop(loop)
    asyncio.ensure_future(recv(q))
    asyncio.ensure_future(insert(q))
    # asyncio.ensure_future(checking_task()) comment this out, and it will work as intended
    loop.run_forever()

async def insert(q):
    print('invoked')
    await q.put('hello')

q = asyncio.Queue() 
loop = asyncio.get_event_loop()
t = threading.Thread(target=loop_in_thread, args=(loop, q,))
t.start()

该程序已经启动,我们可以看到以下结果

invoked
hello
-> print(asyncio.Task.all_tasks())
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39>
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E215DCFAC8>()]>>}

但是现在如果我们使用 q.put_nowait('test') 手动将数据添加到 q 中,我们会得到以下结果:

q.put_nowait('test') # a non-async way to add data into queue
-> print(asyncio.Task.all_tasks())
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39>
 wait_for=<Future finished result=None>>}

正如您所看到的,未来已经完成,但我们仍然没有打印出新添加的字符串 'test' . 换句话说,即使与q.get()相关的Future完成且没有其他任务正在运行, msg = await q.get() 仍在等待 . 这让我感到困惑,因为在官方文件(https://docs.python.org/3/library/asyncio-task.html)中,它说

result = await future或result = yield from future - 暂停协程直到将来完成,然后返回未来的结果

似乎即使Future已经完成,我们仍然需要在其他异步函数中使用某种 await 来使事件循环保持处理任务 .

我找到了解决此问题的方法,即添加 checking_task() ,并将该协程添加到事件循环中;然后它将按预期工作 .

但是添加一个checking_task()协程对于CPU来说是非常昂贵的,因为它只运行一个while循环 . 我想知道是否有一些手动方式让我们在不使用异步函数的情况下触发 await 事件 . 例如,一些神奇的东西

q.put_nowait('test')
loop.ok_you_can_start_running_other_pending_tasks()

帮助将不胜感激!谢谢 .

1 回答

  • 0

    所以我最终使用了

    loop.call_soon_threadsafe(q.put_nowait, 'test')
    

    它会按预期工作 . 想出这个后,我搜索了一些有关的信息 . 原来这篇文章(Scheduling an asyncio coroutine from another thread)有同样的问题 . 并且@ kfx的答案也会起作用

    loop.call_soon_threadsafe(loop.create_task, q.put('test'))
    

    注意asyncio.Queue.put()是一个协程,但是asyncio.Queue.put_nowait()是一个普通的函数 .

相关问题