首页 文章

测试从常规函数调用python协程(async def)

提问于
浏览
1

假设我有一些asyncio协程,它会获取一些数据并返回它 . 像这样:

async def fetch_data(*args):
  result = await some_io()
  return result

基本上这个协同程序是从协程链中调用的,初始协同程序是通过创建任务来运行的 . 但是,如果出于测试目的,我只想在运行某个文件时以这种方式运行一个协程:

if __name__ == '__main__':
  result = await fetch_data(*args)
  print(result)

显然,我可以试图运行并等待协程从非协程功能 . 所以问题是:是否有一些正确的方法从协程获取数据而不调用它的功能?我可以为 result 制作一些 Future 对象并等待它,但也许还有其他一些更简单明了的方法呢?

1 回答

  • 1

    您需要创建一个事件循环来运行您的协同程序:

    import asyncio
    
    async def async_func():
        return "hello"
    
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(async_func())
    loop.close()
    
    print(result)
    

    或者作为一个功能:

    def run_coroutine(f, *args, **kwargs):
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(f(*args, **kwargs))
        loop.close()
        return result
    

    使用这样:

    print(run_coroutine(async_func))
    

    要么:

    assert "expected" == run_coroutine(fetch_data, "param1", param2="foo")
    

相关问题